{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"scrolled": true,
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"# Analyzing Netflow Data with xGT\n",
"\n",
"Download the jupyter notebook\n",
"for an interactive experience.\n",
"\n",
"This sample script loads raw NetFlow data in an xGT graph structure and queries for a graph pattern.\n",
"\n",
"The dataset used is from the CTU-13 open source project:\n",
"https://mcfp.weebly.com/the-ctu-13-dataset-a-labeled-dataset-with-botnet-normal-and-background-traffic.html\n",
"\n",
"Raw data example:\n",
"\n",
" \tStartTime \tSrcAddr \t DstAddr \t State \tsTos \tdTos \tTotPkts \tTotBytes \n",
" \t2011/08/16 147.32.86.58 77.75.73.9 SR_A 0.0 \t0.0 \t3 \t 182\n",
" \t2011/08/16 147.32.3.51 147.32.84.46 S_RA \t0.0 \t0.0 \t4 \t \t124\n",
"\n",
"Using this dataset, we will create a graph in xGT representing network data flow between IP addresses. Each row of the input file can define a directed edge from a source IP address, identified by the `SrcAddr` field, to a target IP address, identified by the `DstAddr` field. Other fields may represent properties on an edge. For example, the first line represents a netflow edge from IP `147.32.86.58` to IP `77.75.73.9` in which 182 bytes were sent across three packets.\n",
"\n",
"The xGT graph structure needs a list of vertices and a list of edges to create a graph. In the graph created here, vertices represent unique IP addresses. While a separate input file describing the vertices can be used, in this case the vertices will be extracted from the `SrcAddr` and `DstAddr` fields of the edge file. The IP list is cleaned by removing duplicates and records that are not recognized IP addresses. IP addresses can also be enriched by adding information about the geographical location. To improve performance, each vertex will be identified by a unique integer key instead of by the IP address. The IP address will be accessible as a property of the vertex.\n",
"\n",
"The edge list will also require some cleaning. The date format of the input row `StartTime` needs to be modified to match the xGT data type `DateTime` and the IP addresses of the `SrcAddr` and `DstAddr` fields need to be replaced with the corresponding key intergers for those vertices.\n",
"\n",
"Next, the kind of vertices and edges that the xGT graph will store are defined. The graph will have one vertex type, representing IP addresses, and one edge type, representing data flow between IP addresses.\n",
"\n",
"A `VertexFrame`, representing a collection of vertices of the same type, with the same schema and key, is created for our IP data. An `EdgeFrame`, representing a collection of edges of the same type, is created for the netflow edges. In this example we have only one vertex type and one edge type, but if we had multiple types of vertices or edges, we would create more frames and load the appropriate data into each frame.\n",
"\n",
"After defining the graph, edge and vertex data are ingested from pandas DataFrames into the xGT server. If the input list is bigger than 100,000 rows, it is recommended to split the input into smaller sub lists. An example is provided.\n",
"\n",
"After the vertex and edge data has been loaded into xGT, a query can be run against the graph. The sample query is looking for a triangular transaction pattern between three distinct locations, where location A first contacted location B, which then contacted location C, which finally contacted back location A. The three transactions must have occurred in that order.\n",
"\n",
"N.B. requred module to install on the local machine for this example are pandas and xgt.\n",
"\n",
"\n",
"## 1. Import Python modules\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"import sys\n",
"import csv\n",
"import re\n",
"import os\n",
"\n",
"\n",
"import xgt\n",
"\n",
"\n",
"gc = xgt.Connection(host='127.0.0.1', port=4367)\n",
"TQL = gc.run_job"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 2. Read the input netflow file\n",
"\n",
"- Read the input netflow file from the file system into the pandas Dataframe `df_NetFlow`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"inputfilename = 'https://s3-us-west-2.amazonaws.com/gems-datasets/demoData.binetflow'\n",
"#### to use the original dataset uncomment the following the double commented [##] lines; \n",
"##import ssl\n",
"##ssl._create_default_https_context = ssl._create_unverified_context\n",
"##inputfilename = \"https://mcfp.felk.cvut.cz/publicDatasets/CTU-Malware-Capture-Botnet-50/detailed-bidirectional-flow-labels/capture20110817.binetflow\"\n",
"df_NetFlow = pd.read_csv(inputfilename)\n",
"##ssl._create_default_https_context = ssl.create_default_context()\n",
"\n",
"# restrict the file size \n",
"df_NetFlow = df_NetFlow.head(2500)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 3. Extract IP addresses from netflow file\n",
"\n",
"IP addresses will be used to create vertices in the graph.\n",
"- Extract the IP addresses to `UniqueIpList`.\n",
"- Remove duplicated IPs from `UniqueIpList`. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true,
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"# collect all the IP address from source IP and target IP columns\n",
"UniqueIpList = []\n",
"UniqueIpList.extend(df_NetFlow['SrcAddr'])\n",
"UniqueIpList.extend(df_NetFlow['DstAddr'])\n",
"\n",
"# remove duplicate\n",
"UniqueIpList = list(set(UniqueIpList))\n",
"\n",
"print('number of unique IP: ',len(UniqueIpList))"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 4. Cleaning, validation and enhancement of IP addresses\n",
"\n",
"- Optionally, using the Socket module to test each IP address, remove those that were not found from the list.\n",
"- Insert a first-row vertex with an empty IP address `0.0.0.0`. All the edges with invalid IP will be linked to this vertex.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"\n",
"# optional:\n",
"# remove not found IP addresses \n",
"# to validate this example uses the module socket\n",
"import socket\n",
"for IP in UniqueIpList:\n",
" try:\n",
" socket.inet_aton(IP)\n",
" except socket.error:\n",
" UniqueIpList.remove(IP)\n",
" print ('IP not Valid ', IP)\n",
"\n",
"\n",
"for IP in UniqueIpList:\n",
" if(re.match('\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}', IP) == None):\n",
" UniqueIpList.remove(IP)\n",
" print ('IP not Valid ', IP)\n",
"\n",
"print('number of valid unique IP: ', len(UniqueIpList))\n",
"\n",
"# all edges with empty/invalid source or target IP will be pointing to vertex 0\n",
"UniqueIpList.insert(0,'0.0.0.0')\n",
"\n",
"addLatLong = False\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"\n",
"## 5. (Optional) Enrich IP addresses\n",
"\n",
"- Using the module `geolite2`, find the recorded latitude and the longitude of each IP and add it to `UniqueIpList`.\n",
"- Change dropdown to code and uncomment the following block to add the longitude and the latitude to the vertices.\n",
"\n",
"If you dont have/want to install the mandatory module geolite2, skip the next block of code.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"# REQUIRED MODULE: install with: pip3 install maxminddb-geolite2\n",
"# see https://github.com/rr2do2/maxminddb-geolite2 for more information\n",
"\"\"\"\n",
"from geolite2 import geolite2\n",
"\n",
"addLatLong = True\n",
"\n",
"##### find the latitude and longitude of each IP\n",
"reader = geolite2.reader()\n",
"\n",
"latList = []\n",
"lonList = []\n",
"noInfoFoundList = []\n",
"\n",
"for ele in UniqueIpList:\n",
" d = reader.get(ele)\n",
" if (isinstance(d, dict)):\n",
" l = d.get('location')\n",
" if (isinstance(l, dict)):\n",
" latList.append(l.get('latitude'))\n",
" lonList.append(l.get('longitude'))\n",
" else:\n",
" noInfoFoundList.append('no latitude and longitude for' + ele)\n",
" #print(d)\n",
" latList.append(0.0)\n",
" lonList.append(0.0)\n",
" else:\n",
" noInfoFoundList.append('no location found for' + ele)\n",
" latList.append(0.0)\n",
" lonList.append(0.0)\n",
"\n",
"geolite2.close()\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 6. Create IP address DataFrame\n",
"\n",
"- Create a pandas DataFrame `df_IP` with the IP lists: `UniqueIpList`, `latList` and `lonList`.\n",
"- To improve performance, each vertex will be identified by a unique integer key. The IP address will be a property of the vertex.\n",
"- Add a unique integer key to each row of the pandas DataFrame as the `ID` property."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"# add ID row of unique consecutive integer starting at 0.\n",
"df_IP = pd.DataFrame({'IP':UniqueIpList})\n",
"df_IP.insert(0, 'ID', range(0, len(df_IP)))\n",
"print(df_IP.head(n=3))\n",
"print(df_IP.tail(n=3))"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 7. Clean the edge data\n",
"\n",
"Each edge in the graph will be created from a row of the `df_NetFlow` DataFrame, but this data must first be cleaned.\n",
"- Remove the column `Dir`.\n",
"- Fix datetime format for column `StartTime`.\n",
"- Reduce float precision to 5 decimals for the column `Dur`, which represents netflow duration.\n",
"- Replace all `NaN` data with zeros with the pandas function`fillna`.\n",
"- Force the columns `Sport`, `Dport` and `State` to be in a string format."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"print('peak at input data:')\n",
"print(df_NetFlow.head(n=3))\n",
"\n",
"# remove column 'Dir'\n",
"df_NetFlow.drop(['Dir'], axis = 1, inplace = True)\n",
"\n",
"# change the date format\n",
"from datetime import datetime, date, time\n",
"def FormatDate(x):\n",
" return datetime.strptime(x, '%Y/%m/%d %H:%M:%S.%f').strftime(\"%Y-%m-%dT%H:%M:%S\")\n",
"\n",
"# add the date %Y-%m-%dT to output when using datetime format instead of time\n",
"\n",
"V_FormatDate = np.vectorize(FormatDate) \n",
"df_NetFlow['StartTime'] = V_FormatDate(df_NetFlow['StartTime']) \n",
"\n",
"# round float to 5 decimal\n",
"df_NetFlow['Dur'] = np.round(df_NetFlow['Dur'],5)\n",
"\n",
"# replace NaN with 0\n",
"df_NetFlow.fillna(0, inplace=True)\n",
"\n",
"df_NetFlow[[\"Sport\", \"Dport\", \"State\"]] = df_NetFlow[[\"Sport\", \"Dport\", \"State\"]].astype(str)\n",
"\n",
"# check data type\n",
"#print(df_NetFlow.dtypes)\n",
"\n",
"print('\\npeak at cleaned data: ')\n",
"print(df_NetFlow.head(n=3))"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 8. Update the source and target vertex keys for each edge\n",
"\n",
"The edge data in `df_NetFlow` identifies the source and target vertices by the IP addresses in `SrcAddr` and `DstAddr`, respectively. To optimize the search engine, these IP addresses are replaced by the unique integer keys in `df_IP`. \n",
"- Convert the `SrcAddr` and `DstAddr` fields of the edge data from an IP address to the corresponding unique integer key.\n",
"\n",
"To find the source IP address of a netflow edge, first the source vertex is identified by its integer key and then the vertex's IP address property can be obtained."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a hash table dictionary with the vertex table key and IPAddresses\n",
"# and set a default value\n",
"from collections import defaultdict\n",
"d = dict(zip(df_IP['IP'],df_IP['ID']))\n",
"\n",
"# if the IP is not found ID is set to default (0)\n",
"d = defaultdict(lambda: 0, d)\n",
"\n",
"# function that replace the IP with its key\n",
"def IPToID(x):\n",
" return (d[x])\n",
"\n",
"# run the function on the source and the target Address \n",
"V_IPToID = np.vectorize(IPToID) \n",
"df_NetFlow['SrcAddr'] = V_IPToID(df_NetFlow['SrcAddr']) \n",
"df_NetFlow['DstAddr'] = V_IPToID(df_NetFlow['DstAddr']) \n",
"\n",
"print('\\npeak at edge table with SrcAddr and DstAddr replaced by key: ')\n",
"print(df_NetFlow.head(3))"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 9. Create IP vertex frame\n",
"\n",
"Next, we define the kind of vertices and edges that the xGT graph will store. The graph will have one vertex type, representing IP addresses, and one edge type, representing data flow between IP addresses. \n",
"\n",
"We create a `VertexFrame` object for our IP data. The `create_vertex_frame()` method requires the following parameters:\n",
"- `name`: The frame name, used to identify and retrieve the frame from the xGT server. This name is also used to identify the vertex type in `MATCH` queries.\n",
"- `schema`: The vertex schema, listing the property names and types for each vertex in the frame.\n",
"- `key`: The vertex property that is the key, uniquely identifying each vertex in the frame.\n",
"\n",
"`ip_vtx` is a `VertexFrame` object."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"# create a new vertex frame on the xGT server\n",
"ip_schema = [['ID', xgt.INT], ['IP', xgt.TEXT]]\n",
"if(addLatLong):\n",
" ip_schema = [['ID', xgt.INT], ['IP', xgt.TEXT], ['Lat', xgt.FLOAT], ['Long', xgt.FLOAT]]\n",
" df_IP = pd.DataFrame({'IP':UniqueIpList,'Lat':latList,'Long':lonList})\n",
"\n",
"ip_vtx = gc.create_vertex_frame(\n",
" name = 'IP',\n",
" schema = ip_schema,\n",
" key = 'ID')"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 10. Create NetFlow edge frame\n",
"\n",
"We create an `EdgeFrame` object for netflow edges. The `create_edge_frame()` method requires the following parameters:\n",
"- `name`: The frame name, used to identify and retrieve the frame from the xGT server. This name is also used to identify the edge type in `MATCH` queries.\n",
"- `schema`: The edge schema, listing the property names and types for each edge in the frame.\n",
"- `source`: The source vertex frame name is `IP`.\n",
"- `target`: The target vertex frame name is `IP`.\n",
"- `source_key`: The edge property from the edge schema that identifies the source vertex. Here this is `ScrAddr`.\n",
"- `target_key`: The edge property from the edge schema that identifies the target vertex. Here this is `DstAddr`.\n",
"\n",
"`netflow_edg` is an `EdgeFrame` object."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"# create edge table schema\n",
"netflow_schema = [['StartTime', xgt.DATETIME], ['Dur', xgt.FLOAT], ['Proto', xgt.TEXT], ['SrcAddr', xgt.INT], \n",
" ['Sport', xgt.TEXT],['DstAddr', xgt.INT], ['Dport', xgt.TEXT], ['State', xgt.TEXT], \n",
" ['sTos', xgt.FLOAT], ['dTos', xgt.FLOAT],['TotPkts', xgt.INT], ['TotBytes', xgt.INT], \n",
" ['SrcBytes', xgt.INT], ['Label', xgt.TEXT]]\n",
"\n",
"netflow_edg = gc.create_edge_frame(\n",
" name = 'netflow',\n",
" schema = netflow_schema,\n",
" source = 'IP',\n",
" target = 'IP',\n",
" source_key = 'SrcAddr',\n",
" target_key = 'DstAddr')"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 11. Load data\n",
"\n",
"Insert the vertex and edge data from the pandas frames into the xGT `VertexFrame` and `EdgeFrame`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"ip_vtx.insert(df_IP)\n",
"\n",
"netflow_edg.insert(df_NetFlow)"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 12. Run a query\n",
"\n",
"Run a `MATCH` query and save the results in a table named `LatTrianges`. The query is looking for a triangular relationship pattern, where the transactions occur in order and the protocol changes from UDP to TCP. \n",
"\n",
"The vertices and edges in the `MATCH` pattern are identified by their vertex and edge frame names. This allows the search to distinguish between vertices and edges of different types. In this case, there is only one vertex type and one edge type."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"# delete older result table\n",
"gc.drop_frame(\"LatTriangles\")\n",
"\n",
"TQL(\"\"\"\n",
" MATCH (a:IP)-[d:netflow]->\n",
" (b:IP)-[e:netflow]->\n",
" (c:IP)-[f:netflow]->\n",
" (x:IP) \n",
" WHERE a.ID <> b.ID AND b.ID <> c.ID AND c.ID <> a.ID\n",
" AND d.StartTime <= e.StartTime\n",
" AND e.StartTime <= f.StartTime\n",
" AND d.Dur < e.Dur AND e.Dur < f.Dur\n",
" AND d.Proto = 'udp'\n",
" AND f.Proto = 'icmp'\n",
" RETURN\n",
" a.ID AS AID, d.StartTime AS Dtimestamp, d.Dur AS Ddur, d.Proto AS DProto,\n",
" b.ID AS BID, e.StartTime AS Etimestamp,\n",
" c.ID AS CID, f.StartTime AS Ftimestamp, f.Proto AS FProto\n",
" INTO LatTriangles\n",
"\"\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"slideshow": {
"slide_type": "slide"
}
},
"source": [
"## 13. Print results and clean up\n",
"\n",
"- Get a proxy to the results table used in the query. \n",
"- Get the table data as a pandas DataFrame and print its contents.\n",
"- Remove all frames from the xGT server."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true,
"slideshow": {
"slide_type": "subslide"
}
},
"outputs": [],
"source": [
"tb= gc.get_table_frame('LatTriangles')\n",
"\n",
"print(\"row: \" + str(tb.num_rows))\n",
"print(\"columns: \"+ str(len(tb.schema)))\n",
"\n",
"# 0 is the offset\n",
"# 20 is the max number of elements to return\n",
"df = tb.get_data_pandas(0, 16)\n",
"print(type(df))\n",
"print(df)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#remove all frames from the xGT server\n",
"\n",
"for e in gc.get_edge_frames():\n",
" gc.drop_frame(e.name)\n",
"for v in gc.get_vertex_frames():\n",
" gc.drop_frame(v.name)\n",
"for t in gc.get_table_frames():\n",
" gc.drop_frame(t.name)\n",
"print(gc.get_table_frames())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
}
],
"metadata": {
"celltoolbar": "Slideshow",
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.10"
}
},
"nbformat": 4,
"nbformat_minor": 2
}