Analyzing Netflow Data with xGT

Download the jupyter notebook for an interactive experience.

This sample script loads raw NetFlow data in an xGT graph structure and queries for a graph pattern.

The dataset used is from the CTU-13 open source project: https://mcfp.weebly.com/the-ctu-13-dataset-a-labeled-dataset-with-botnet-normal-and-background-traffic.html

Raw data example:

StartTime   SrcAddr       DstAddr      State    sTos    dTos    TotPkts     TotBytes 
2011/08/16  147.32.86.58  77.75.73.9   SR_A     0.0     0.0     3           182
2011/08/16  147.32.3.51   147.32.84.46 S_RA     0.0     0.0     4           124

Using this dataset, we will create a graph in xGT representing network data flow between IP addresses. Each row of the input file can define a directed edge from a source IP address, identified by the SrcAddr field, to a target IP address, identified by the DstAddr field. Other fields may represent properties on an edge. For example, the first line represents a netflow edge from IP 147.32.86.58 to IP 77.75.73.9 in which 182 bytes were sent across three packets.

The xGT graph structure needs a list of vertices and a list of edges to create a graph. In the graph created here, vertices represent unique IP addresses. While a separate input file describing the vertices can be used, in this case the vertices will be extracted from the SrcAddr and DstAddr fields of the edge file. The IP list is cleaned by removing duplicates and records that are not recognized IP addresses. IP addresses can also be enriched by adding information about the geographical location. To improve performance, each vertex will be identified by a unique integer key instead of by the IP address. The IP address will be accessible as a property of the vertex.

The edge list will also require some cleaning. The date format of the input row StartTime needs to be modified to match the xGT data type DateTime and the IP addresses of the SrcAddr and DstAddr fields need to be replaced with the corresponding key intergers for those vertices.

Next, the kind of vertices and edges that the xGT graph will store are defined. The graph will have one vertex type, representing IP addresses, and one edge type, representing data flow between IP addresses.

A VertexFrame, representing a collection of vertices of the same type, with the same schema and key, is created for our IP data. An EdgeFrame, representing a collection of edges of the same type, is created for the netflow edges. In this example we have only one vertex type and one edge type, but if we had multiple types of vertices or edges, we would create more frames and load the appropriate data into each frame.

After defining the graph, edge and vertex data are ingested from pandas DataFrames into the xGT server. If the input list is bigger than 100,000 rows, it is recommended to split the input into smaller sub lists. An example is provided.

After the vertex and edge data has been loaded into xGT, a query can be run against the graph. The sample query is looking for a triangular transaction pattern between three distinct locations, where location A first contacted location B, which then contacted location C, which finally contacted back location A. The three transactions must have occurred in that order.

N.B. requred module to install on the local machine for this example are pandas and xgt.

1. Import Python modules

import numpy as np
import pandas as pd
import sys
import csv
import re
import os


import xgt


gc = xgt.Connection(host='127.0.0.1', port=4367)
TQL = gc.run_job

2. Read the input netflow file

inputfilename = 'https://s3-us-west-2.amazonaws.com/gems-datasets/demoData.binetflow'
#### to use the original dataset uncomment the following the double commented [##] lines; 
##import ssl
##ssl._create_default_https_context = ssl._create_unverified_context
##inputfilename = "https://mcfp.felk.cvut.cz/publicDatasets/CTU-Malware-Capture-Botnet-50/detailed-bidirectional-flow-labels/capture20110817.binetflow"
df_NetFlow = pd.read_csv(inputfilename)
##ssl._create_default_https_context = ssl.create_default_context()

# restrict the file size 
df_NetFlow = df_NetFlow.head(2500)

3. Extract IP addresses from netflow file

IP addresses will be used to create vertices in the graph. - Extract the IP addresses to UniqueIpList. - Remove duplicated IPs from UniqueIpList.

# collect all the IP address from source IP and target IP columns
UniqueIpList = []
UniqueIpList.extend(df_NetFlow['SrcAddr'])
UniqueIpList.extend(df_NetFlow['DstAddr'])

# remove duplicate
UniqueIpList = list(set(UniqueIpList))

print('number of unique IP: ',len(UniqueIpList))

4. Cleaning, validation and enhancement of IP addresses


# optional:
# remove not found IP addresses 
# to validate this example uses the module socket
import socket
for IP in UniqueIpList:
    try:
        socket.inet_aton(IP)
    except socket.error:
        UniqueIpList.remove(IP)
        print ('IP not Valid ', IP)


for IP in UniqueIpList:
    if(re.match('\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', IP) == None):
        UniqueIpList.remove(IP)
        print ('IP not Valid ', IP)

print('number of valid unique IP: ', len(UniqueIpList))

# all edges with empty/invalid source or target IP will be pointing to vertex 0
UniqueIpList.insert(0,'0.0.0.0')

addLatLong = False

5. (Optional) Enrich IP addresses

If you dont have/want to install the mandatory module geolite2, skip the next block of code.

# REQUIRED MODULE: install with: pip3 install maxminddb-geolite2
# see https://github.com/rr2do2/maxminddb-geolite2 for more information
"""
from geolite2 import geolite2

addLatLong = True

##### find the latitude and longitude of each IP
reader = geolite2.reader()

latList = []
lonList = []
noInfoFoundList = []

for ele in UniqueIpList:
    d = reader.get(ele)
    if (isinstance(d, dict)):
        l = d.get('location')
        if (isinstance(l, dict)):
            latList.append(l.get('latitude'))
            lonList.append(l.get('longitude'))
        else:
            noInfoFoundList.append('no latitude and longitude for' + ele)
            #print(d)
            latList.append(0.0)
            lonList.append(0.0)
    else:
        noInfoFoundList.append('no location found for' + ele)
        latList.append(0.0)
        lonList.append(0.0)

geolite2.close()
"""

6. Create IP address DataFrame

# add ID row of unique consecutive integer starting at 0.
df_IP = pd.DataFrame({'IP':UniqueIpList})
df_IP.insert(0, 'ID', range(0, len(df_IP)))
print(df_IP.head(n=3))
print(df_IP.tail(n=3))

7. Clean the edge data

Each edge in the graph will be created from a row of the df_NetFlow DataFrame, but this data must first be cleaned. - Remove the column Dir. - Fix datetime format for column StartTime. - Reduce float precision to 5 decimals for the column Dur, which represents netflow duration. - Replace all NaN data with zeros with the pandas functionfillna. - Force the columns Sport, Dport and State to be in a string format.

print('peak at input data:')
print(df_NetFlow.head(n=3))

# remove column 'Dir'
df_NetFlow.drop(['Dir'], axis = 1, inplace = True)

# change the date format
from datetime import datetime, date, time
def FormatDate(x):
    return datetime.strptime(x, '%Y/%m/%d %H:%M:%S.%f').strftime("%Y-%m-%dT%H:%M:%S")

# add the date %Y-%m-%dT to output when using datetime format instead of time

V_FormatDate = np.vectorize(FormatDate)  
df_NetFlow['StartTime'] = V_FormatDate(df_NetFlow['StartTime']) 

# round float to 5 decimal
df_NetFlow['Dur'] = np.round(df_NetFlow['Dur'],5)

# replace NaN with 0
df_NetFlow.fillna(0, inplace=True)

df_NetFlow[["Sport", "Dport", "State"]] = df_NetFlow[["Sport", "Dport", "State"]].astype(str)

# check data type
#print(df_NetFlow.dtypes)

print('\npeak at cleaned data: ')
print(df_NetFlow.head(n=3))

8. Update the source and target vertex keys for each edge

The edge data in df_NetFlow identifies the source and target vertices by the IP addresses in SrcAddr and DstAddr, respectively. To optimize the search engine, these IP addresses are replaced by the unique integer keys in df_IP. - Convert the SrcAddr and DstAddr fields of the edge data from an IP address to the corresponding unique integer key.

To find the source IP address of a netflow edge, first the source vertex is identified by its integer key and then the vertex's IP address property can be obtained.

# Create a hash table dictionary with the vertex table key and IPAddresses
# and set a default value
from collections import defaultdict
d = dict(zip(df_IP['IP'],df_IP['ID']))

# if the IP is not found ID is set to default (0)
d = defaultdict(lambda: 0, d)

# function that replace the IP with its key
def IPToID(x):
    return (d[x])

# run the function on the source and the target Address 
V_IPToID = np.vectorize(IPToID)  
df_NetFlow['SrcAddr'] = V_IPToID(df_NetFlow['SrcAddr']) 
df_NetFlow['DstAddr'] = V_IPToID(df_NetFlow['DstAddr']) 

print('\npeak at edge table with SrcAddr and DstAddr replaced by key: ')
print(df_NetFlow.head(3))

9. Create IP vertex frame

Next, we define the kind of vertices and edges that the xGT graph will store. The graph will have one vertex type, representing IP addresses, and one edge type, representing data flow between IP addresses.

We create a VertexFrame object for our IP data. The create_vertex_frame() method requires the following parameters: - name: The frame name, used to identify and retrieve the frame from the xGT server. This name is also used to identify the vertex type in MATCH queries. - schema: The vertex schema, listing the property names and types for each vertex in the frame. - key: The vertex property that is the key, uniquely identifying each vertex in the frame.

ip_vtx is a VertexFrame object.

# create a new vertex frame on the xGT server
ip_schema =  [['ID', xgt.INT], ['IP', xgt.TEXT]]
if(addLatLong):
    ip_schema = [['ID', xgt.INT], ['IP', xgt.TEXT], ['Lat', xgt.FLOAT], ['Long', xgt.FLOAT]]
    df_IP = pd.DataFrame({'IP':UniqueIpList,'Lat':latList,'Long':lonList})

ip_vtx = gc.create_vertex_frame(
             name = 'IP',
             schema = ip_schema,
             key = 'ID')

10. Create NetFlow edge frame

We create an EdgeFrame object for netflow edges. The create_edge_frame() method requires the following parameters: - name: The frame name, used to identify and retrieve the frame from the xGT server. This name is also used to identify the edge type in MATCH queries. - schema: The edge schema, listing the property names and types for each edge in the frame. - source: The source vertex frame name is IP. - target: The target vertex frame name is IP. - source_key: The edge property from the edge schema that identifies the source vertex. Here this is ScrAddr. - target_key: The edge property from the edge schema that identifies the target vertex. Here this is DstAddr.

netflow_edg is an EdgeFrame object.

# create edge table schema
netflow_schema = [['StartTime', xgt.DATETIME], ['Dur', xgt.FLOAT], ['Proto', xgt.TEXT], ['SrcAddr', xgt.INT], 
                  ['Sport', xgt.TEXT],['DstAddr', xgt.INT], ['Dport', xgt.TEXT], ['State', xgt.TEXT], 
                  ['sTos', xgt.FLOAT], ['dTos', xgt.FLOAT],['TotPkts', xgt.INT], ['TotBytes', xgt.INT], 
                  ['SrcBytes', xgt.INT], ['Label', xgt.TEXT]]

netflow_edg = gc.create_edge_frame(
                  name = 'netflow',
                  schema = netflow_schema,
                  source = 'IP',
                  target = 'IP',
                  source_key = 'SrcAddr',
                  target_key = 'DstAddr')

11. Load data

Insert the vertex and edge data from the pandas frames into the xGT VertexFrame and EdgeFrame.

ip_vtx.insert(df_IP)

netflow_edg.insert(df_NetFlow)

12. Run a query

Run a MATCH query and save the results in a table named LatTrianges. The query is looking for a triangular relationship pattern, where the transactions occur in order and the protocol changes from UDP to TCP.

The vertices and edges in the MATCH pattern are identified by their vertex and edge frame names. This allows the search to distinguish between vertices and edges of different types. In this case, there is only one vertex type and one edge type.

# delete older result table
gc.drop_frame("LatTriangles")

TQL("""
    MATCH (a:IP)-[d:netflow]->
      (b:IP)-[e:netflow]->
      (c:IP)-[f:netflow]->
      (x:IP) 
    WHERE a.ID <> b.ID AND b.ID <> c.ID AND c.ID <> a.ID
      AND d.StartTime <= e.StartTime
      AND e.StartTime <= f.StartTime
      AND d.Dur < e.Dur AND e.Dur < f.Dur
      AND d.Proto = 'udp'
      AND f.Proto = 'icmp'
    RETURN
      a.ID AS AID, d.StartTime AS Dtimestamp, d.Dur AS Ddur, d.Proto AS DProto,
      b.ID AS BID, e.StartTime AS Etimestamp,
      c.ID AS CID, f.StartTime AS Ftimestamp, f.Proto AS FProto
    INTO LatTriangles
""")

13. Print results and clean up

tb= gc.get_table_frame('LatTriangles')

print("row: " + str(tb.num_rows))
print("columns: "+ str(len(tb.schema)))

# 0 is the offset
# 20 is the max number of elements to return
df  = tb.get_data_pandas(0, 16)
print(type(df))
print(df)
#remove all frames from the xGT server

for e in gc.get_edge_frames():
    gc.drop_frame(e.name)
for v in gc.get_vertex_frames():
    gc.drop_frame(v.name)
for t in gc.get_table_frames():
    gc.drop_frame(t.name)
print(gc.get_table_frames())