Analyzing LANL Data using Advanced xGT Features
Download this jupyter notebook for an interactive experience.
After learning how to use the basic functionality of xGT by following the Analyzing LANL Netflow Data with xGT notebook, you may wish to see some more advanced features and how to apply them to solving the same problem. This notebook demonstrates advanced techniques in the use of graph ideas as well as improved use of python.
import xgt
conn = xgt.Connection()
conn
<xgt.connection.Connection at 0x10331b290>
Establish Graph Schemas
We first try to retrieve the graph component schemas from xGT server. If that should fail, we create an empty component (vertex or edge frame) for the missing component.
try:
devices = conn.get_vertex_frame('Devices')
except xgt.XgtNameError:
devices = conn.create_vertex_frame(
name='Devices',
schema=[['device', xgt.TEXT]],
key='device')
devices
<xgt.graph.VertexFrame at 0x103effe10>
try:
netflow = conn.get_edge_frame('Netflow')
except xgt.XgtNameError:
netflow = conn.create_edge_frame(
name='Netflow',
schema=[['epochtime', xgt.INT],
['duration', xgt.INT],
['srcDevice', xgt.TEXT],
['dstDevice', xgt.TEXT],
['protocol', xgt.INT],
['srcPort', xgt.INT],
['dstPort', xgt.INT],
['srcPackets', xgt.INT],
['dstPackets', xgt.INT],
['srcBytes', xgt.INT],
['dstBytes', xgt.INT]],
source=devices,
target=devices,
source_key='srcDevice',
target_key='dstDevice')
netflow
<xgt.graph.EdgeFrame at 0x103f16a90>
Edges: The LANL dataset contains two types of data: netflow and host events. Of the host events recorded, some describe events within a device (e.g., reboots), and some describe events between devices (e.g., login attempts). We'll only be loading the netflow data and in-device events. We call these events "one-sided", since we describe them as graph edges from one vertex to itself.
try:
events1v = conn.get_edge_frame('Events1v')
except xgt.XgtNameError:
events1v = conn.create_edge_frame(
name='Events1v',
schema=[['epochtime', xgt.INT],
['eventID', xgt.INT],
['logHost', xgt.TEXT],
['userName', xgt.TEXT],
['domainName', xgt.TEXT],
['logonID', xgt.INT],
['processName', xgt.TEXT],
['processID', xgt.INT],
['parentProcessName', xgt.TEXT],
['parentProcessID', xgt.INT]],
source=devices,
target=devices,
source_key='logHost',
target_key='logHost')
events1v
<xgt.graph.EdgeFrame at 0x103f16a50>
# Utility to print the data sizes currently in xGT
def print_data_summary():
print('Devices (vertices): {:,}'.format(devices.num_vertices))
print('Netflow (edges): {:,}'.format(netflow.num_edges))
print('Host event (edges): {:,}'.format(events1v.num_edges))
print_data_summary()
Devices (vertices): 0
Netflow (edges): 0
Host event (edges): 0
Load data
We show how to load data only if the current data frames are empty.
%%time
if events1v.num_edges == 0:
urls = ["https://datasets.trovares.com/LANL/xgt/wls_day-04_1v.csv"]
events1v.load(urls)
CPU times: user 7.29 ms, sys: 9.66 ms, total: 16.9 ms
Wall time: 32 s
%%time
if netflow.num_edges == 0:
urls = ["https://datasets.trovares.com/LANL/xgt/nf_day-04.csv"]
netflow.load(urls)
CPU times: user 60.2 ms, sys: 84.6 ms, total: 145 ms
Wall time: 5min 19s
print_data_summary()
Devices (vertices): 157,949
Netflow (edges): 222,323,503
Host event (edges): 16,402,438
Preprocess graph
The LANL dataset consists of netflow edges whose direction is established using an algorithm that provides some consistency, but it is not based on the originator of the session as source as the destination of the session as the target.
Since this is the case, we first build a new edge frame containing all command and control (C2) edges. We call the new frame C2flow
.
To construact the C2flow
edge frame we need to look through all Netflow
edges for records where either:
dstPort = 3128
, in which case we copy the edge to theC2flow
frame verbatimsrcPort = 3128
, in which case we "reverse" the edge (swap srcX and dstX, for all of the directional fields) and insert that into theC2flow
frame
# Generate a new edge frame for holding only the C2 edges
conn.drop_frame('C2flow')
c2flow = conn.create_edge_frame(
name='C2flow',
schema=netflow.schema,
source=devices,
target=devices,
source_key='srcDevice',
target_key='dstDevice')
c2flow
<xgt.graph.EdgeFrame at 0x103f16710>
Extract forward "Command-and-control" edges
A "forward" edge is one where the dstPort = 3128
.
This edge is copied verbatim to the C2flow
edge frame.
def run_query(query, table_name = "answers", show_query=False):
conn.drop_frame(table_name)
if query[-1] != '\n':
query += '\n'
query += 'INTO {}'.format(table_name)
if show_query:
print("Query:\n" + query)
job = conn.schedule_job(query)
print("Launched job {}".format(job.id))
conn.wait_for_job(job)
table = conn.get_table_frame(table_name)
return table
%%time
q = """
MATCH (v0)-[edge:Netflow]->(v1)
WHERE edge.dstPort=3128
CREATE (v0)-[e:C2flow {epochtime : edge.epochtime,
duration : edge.duration, protocol : edge.protocol,
srcPort : edge.srcPort, dstPort : edge.dstPort,
srcPackets : edge.srcPackets, dstPackets : edge.dstPackets,
srcBytes : edge.srcBytes, dstBytes : edge.dstBytes}]->(v1)
RETURN count(*)
"""
r = run_query(q)
print('Number of answers: ' + '{:,}'.format(r.get_data()[0][0]))
Launched job 7
Number of answers: 72
CPU times: user 14.3 ms, sys: 8.44 ms, total: 22.7 ms
Wall time: 4.81 s
Extract reverse "Command-and-control" edges
A "reverse" edge is one where the srcPort = 3128
.
These edges are copied to the C2flow
frame but reversed in transit.
The reversal process involves swapping the: srcDevice
and dstDevice
;
srcPort
and dstPort
; srcPackets
and dstPackets
; and srcBytes
and dstBytes
.
%%time
q = """
MATCH (v0)-[edge:Netflow]->(v1)
WHERE edge.srcPort=3128
CREATE (v1)-[e:C2flow {epochtime : edge.epochtime,
duration : edge.duration, protocol : edge.protocol,
srcPort : edge.dstPort, dstPort : edge.srcPort,
srcPackets : edge.dstPackets, dstPackets : edge.srcPackets,
srcBytes : edge.dstBytes, dstBytes : edge.srcBytes}]->(v0)
RETURN count(*)
"""
r = run_query(q)
print('Number of answers: ' + '{:,}'.format(r.get_data()[0][0]))
Launched job 10
Number of answers: 3,224
CPU times: user 14.7 ms, sys: 5.85 ms, total: 20.5 ms
Wall time: 4.79 s
Querying our graph
We'll be looking for a mock pattern, similar to one that might be used to detect bot-net behavior. The pattern reflects an infected host (A) which is connecting up to a bot-net command and control node (B) with an exfiltration connection to a collection node (C).
-
Some device A boots up and, within a short amount of time, starts up a program.
-
Shortly afterwards, device A sends a message to some other device B.
-
Device B has a long-standing connection to another device C, which has been open for at least an hour, started before A booted, and remained open after A sent a message to B.
Query 1: Boot, program start, and connection
We begin the pattern description with the boot and program start events followed by a C2flow
edge from our preprocessing. We'll restrict it such that all the pieces (boot, program start, and C2 flow) happen within 4 seconds.
%%time
q = """
MATCH (A)-[boot:Events1v]->(A)-[program:Events1v]->(A)
-[c2:C2flow]->(B)
WHERE A <> B
AND boot.eventID = 4608
AND program.eventID = 4688
AND program.epochtime >= boot.epochtime
AND c2.epochtime >= program.epochtime
AND c2.epochtime - boot.epochtime < 4
RETURN COUNT(*)
"""
# Note the overall time limit on the sequence of the three events
r = run_query(q)
print('Number of boot, programstart, & c2 events: ' + '{:,}'.format(r.get_data()[0][0]))
Launched job 13
Number of boot, programstart, & c2 events: 109
CPU times: user 9.31 ms, sys: 4.03 ms, total: 13.3 ms
Wall time: 748 ms
Query 2: Full zombie reboot pattern
Finally, we add in the last network connection and match the full pattern.
%%time
q = """
MATCH (A)-[boot:Events1v]->(A)-[program:Events1v]->(A)
-[c2:C2flow]->(B)-[nf2:Netflow]->(C)
WHERE A <> B AND B <> C AND A <> C
AND boot.eventID = 4608
AND program.eventID = 4688
AND program.epochtime >= boot.epochtime
AND c2.epochtime >= program.epochtime
AND c2.epochtime - boot.epochtime < 4
AND nf2.duration >= 3600
AND nf2.epochtime < boot.epochtime
AND nf2.epochtime + nf2.duration >= c2.epochtime
RETURN COUNT(*)
"""
r = run_query(q)
print('Number of zombie reboot events: ' + '{:,}'.format(r.get_data()[0][0]))
Launched job 16
Number of zombie reboot events: 981
CPU times: user 17.4 ms, sys: 6.53 ms, total: 24 ms
Wall time: 9.44 s