Analyzing Netflow Data with xGT

This sample script loads raw NetFlow data in a xGT graph structure and query for a graph pattern.

The dataset used is from the CTU-13 open source project: https://mcfp.weebly.com/the-ctu-13-dataset-a-labeled-dataset-with-botnet-normal-and-background-traffic.html

Raw data reduce example:

StartTime   SrcAddr       DstAddr      State    sTos    dTos    TotPkts     TotBytes 
2011/08/16  147.32.86.58  77.75.73.9   SR_A     0.0     0.0     3           182
2011/08/16  147.32.3.51   147.32.84.46 S_RA     0.0     0.0     4           124

Important fields are source IP address "SrcAddr" and target IP address "DstAddr", they define the edges of the graph, each IP address is a vertex.

The xGT graph structure needs a list of vertices and a list of edges to create the Graph.

The unique IP address list is compiled from the NetFlow data fields SrcAddr and DstAddr. The IP list is cleaned: removal of the the duplicates and the record that are not recognized IP addresses. The edge data can also be enriched by adding information about the geographical location of the IP addresses. This is done in the fourth script below, which adds latitude and longitude. Then each row will get an integer unique key. Using an integer as a key instead of a IP address speeds up TQL queries.

The edge list will also require some cleaning. The date format of the input row StartTime needs to be modified to match the xGT data type DateTime, and the IP addresses of the SrcAddr and the DstAddr fields need to be replaced with the corresponding key numbers in the vertex.

The vertex table and the edge table will be created in xGT with their respective schema, before the data can be loaded. Edge and vertex data are ingested in the tables. To do so, DataFrames are transformed into lists of strings, where each string is a concatenation of all the columns of the rows. The list of strings is the input of the the ingestion function, if the input list is bigger than 100,000 rows is it recommended to split the input file in smaller sub lists. An example is provided.

After the vertex table and the edge table have been loaded in xGT, the graph can be generated, and a query can be run against the graph.

The query sample is looking for a triangular transaction pattern between 3 distinct locations where location A first contacted location B which then contacted location C that finally contacted back location A, the 3 transactions occurred in order.

Pandas DataFrame are used to manipulate the data, but Python List or Numpy Array can do the same.

N.B. requred module to install on the local machine for this example are pandas and xgt.

1. Import Python modules

import numpy as np
import pandas as pd
import sys
import csv
import re
import os


import xgt


gc = xgt.connect(host='127.0.0.1', port=4367)
TQL = gc.run_job

2. Extract IP addresses from netflow file

# Net Flow file name
inputfilename = './demoData.binetflow'
#### to use the original dataset uncomment the following the double commented [##] lines; 
##import ssl
##ssl._create_default_https_context = ssl._create_unverified_context
##inputfilename = "https://mcfp.felk.cvut.cz/publicDatasets/CTU-Malware-Capture-Botnet-50/detailed-bidirectional-flow-labels/capture20110817.binetflow"
df_NetFlow = pd.read_csv(inputfilename)
##ssl._create_default_https_context = ssl.create_default_context()

# restrict the file size 
df_NetFlow = df_NetFlow.head(2500)

# collect all the IP address from source IP and target IP columns
UniqueIpList = []
UniqueIpList.extend(df_NetFlow['SrcAddr'])
UniqueIpList.extend(df_NetFlow['DstAddr'])

# remove duplicate
UniqueIpList = list(set(UniqueIpList))

print('number of unique IP: ',len(UniqueIpList))

3. Cleaning, validation and enhancement


# optional:
# remove not found IP addresses 
# to validate this example uses the module socket
import socket
for IP in UniqueIpList:
    try:
        socket.inet_aton(IP)
    except socket.error:
        UniqueIpList.remove(IP)
        print ('IP not Valid ', IP)


for IP in UniqueIpList:
    if(re.match('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', IP) == None):
        UniqueIpList.remove(IP)
        print ('IP not Valid ', IP)

print('number of valid unique IP: ', len(UniqueIpList))

# all edges with empty/invalid source or target IP will be pointing to vertex 0
UniqueIpList.insert(0,'0.0.0.0')

addLatLong = False

4. (Optional) Enrich dataset

If you dont have/want to install the mandatory module geolite2, skip the next block of code.

# REQUIRED MODULE: install with: pip3 install maxminddb-geolite2
# see https://github.com/rr2do2/maxminddb-geolite2 for more information
"""
from geolite2 import geolite2

addLatLong = True

##### find the latitude and longitude of each IP
reader = geolite2.reader()

latList = []
lonList = []
noInfoFoundList = []

for ele in UniqueIpList:
    d = reader.get(ele)
    if (isinstance(d, dict)):
        l = d.get('location')
        if (isinstance(l, dict)):
            latList.append(l.get('latitude'))
            lonList.append(l.get('longitude'))
        else:
            noInfoFoundList.append('no latitude and longitude for' + ele)
            #print(d)
            latList.append(0.0)
            lonList.append(0.0)
    else:
        noInfoFoundList.append('no location found for' + ele)
        latList.append(0.0)
        lonList.append(0.0)

geolite2.close()
"""

5. Create the vertex table with it's schema

# simplify vertex schema and dataframe
ip_schema =  [['ID', xgt.INT], ['IP', xgt.TEXT]]
df_IP = pd.DataFrame({'IP':UniqueIpList})

# enriched vertex schema and dataset
if(addLatLong):
    ip_schema = [['ID', xgt.INT], ['IP', xgt.TEXT], ['Lat', xgt.FLOAT], ['Long', xgt.FLOAT]]
    df_IP = pd.DataFrame({'IP':UniqueIpList,'Lat':latList,'Long':lonList})

ip_vtx = xgt.Vertex(name = 'IP',
                   schema = ip_schema,
                   key = ['ID'])

# add ID row of unique consecutive integer starting at 0.
df_IP.insert(0, 'ID', range(0, len(df_IP)))
print(df_IP.head(n=3))
print(df_IP.tail(n=3))

6. Load and clean the edge data

print('peak at input data:')
print(df_NetFlow.head(n=3))

# remove column 'Dir'
df_NetFlow.drop(['Dir'], axis = 1, inplace = True)

# change the date format
from datetime import datetime, date, time
def FormatDate(x):
    return datetime.strptime(x, '%Y/%m/%d %H:%M:%S.%f').strftime("%Y-%m-%dT%H:%M:%S")

# add the date %Y-%m-%dT to output when using datetime format instead of time

V_FormatDate = np.vectorize(FormatDate)  
df_NetFlow['StartTime'] = V_FormatDate(df_NetFlow['StartTime']) 

# round float to 5 decimal
df_NetFlow['Dur'] = np.round(df_NetFlow['Dur'],5)

# replace NaN with 0
df_NetFlow.fillna(0, inplace=True)

df_NetFlow[["Sport", "Dport", "State"]] = df_NetFlow[["Sport", "Dport", "State"]].astype(str)

# check data type
#print(df_NetFlow.dtypes)

print('\npeak at cleaned data: ')
print(df_NetFlow.head(n=3))

7. Choose a better key

The data set is using the IP address as the keys, to optimize the search engine the IP addresses are replaced with the vertex table key.

print('show vertices:')
print(df_IP.head(n=3))

# Create a hash table dictionary with the vertex table key and IPAddresses
# and set a default value
from collections import defaultdict
d = dict(zip(df_IP['IP'],df_IP['ID']))

# if the IP is not found ID is set to default (0)
d = defaultdict(lambda: 0, d)

# function that replace the IP with its key
def IPToID(x):
    return (d[x])

# run the function on the source and the target Address 
V_IPToID = np.vectorize(IPToID)  
df_NetFlow['SrcAddr'] = V_IPToID(df_NetFlow['SrcAddr']) 
df_NetFlow['DstAddr'] = V_IPToID(df_NetFlow['DstAddr']) 

print('\npeak at edge table with SrcAddr and DstAddr replaced by key: ')
print(df_NetFlow.head(3))

8. Create NetFlow edge type

# create edge table schema
netflow_schema = [['StartTime', xgt.DATETIME], ['Dur', xgt.FLOAT], ['Proto', xgt.TEXT], ['SrcAddr', xgt.INT], 
               ['Sport', xgt.TEXT],['DstAddr', xgt.INT], ['Dport', xgt.TEXT], ['State', xgt.TEXT], 
               ['sTos', xgt.FLOAT], ['dTos', xgt.FLOAT],['TotPkts', xgt.INT], ['TotBytes', xgt.INT], 
               ['SrcBytes', xgt.INT], ['Label', xgt.TEXT]]

netflow_edg = xgt.Edge(name   = 'netflow',
                      schema = netflow_schema,
                      source = [['SrcAddr', ip_vtx.key.ID]],
                      target = [['DstAddr', ip_vtx.key.ID]])

9. Create the graph

ng = xgt.Graph('CT13_Graph')

ng.add(ip_vtx).add(netflow_edg)

#-- Create the graph in xGT --
gc.drop_graph('CT13_Graph')
gc.create(ng)

#-- Load data to the graph in xGT --
cg = gc.get_graph('CT13_Graph')

v = cg.vertices.IP
v.insert(df_IP)

e = cg.edges.netflow
e.insert(df_NetFlow)

10. Run a query

The query is looking for a triangular relationship pattern, where the transaction occur in order and the protocol change from UDP to TCP.

# delete older result table
gc.drop_table("LatTriangles")

TQL("""
    MATCH (a:IP)-[d:netflow]->
      (b:IP)-[e:netflow]->
      (c:IP)-[f:netflow]->
      (x:IP) 
    WHERE a.ID <> b.ID AND b.ID <> c.ID AND c.ID <> a.ID
      AND d.StartTime <= e.StartTime
      AND e.StartTime <= f.StartTime
      AND d.Dur < e.Dur AND e.Dur < f.Dur
      AND d.Proto = 'udp'
      AND f.Proto = 'icmp'
    RETURN
      a.ID AS AID, d.StartTime AS Dtimestamp, d.Dur AS Ddur, d.Proto AS DProto,
      b.ID AS BID, e.StartTime AS Etimestamp,
      c.ID AS CID, f.StartTime AS Ftimestamp, f.Proto AS FProto
    INTO LatTriangles
""")

11. Print results

GetTableData function can return a Pandas dataframe or a list with the table content.

tb= gc.get_table('LatTriangles')

print("row: " + str(tb.num_rows()))
print("columns: "+ str(tb.num_cols()))

# 0 is the offset
# 20 is the max number of elements to return
df  = tb.get_data_pandas(0, 16)
print(type(df))
print(df)

#remove all tables

gc.drop_table("LatTriangles")
gc.drop_graph('CT13_Graph')
for tablename in gc.get_tables():
    gc.drop_table(tablename)
print(gc.get_tables())