6.2. Analyzing LANL Data using Advanced xGT Features

After learning how to use the basic functionality of xGT by following the Analyzing LANL Netflow Data with xGT notebook, you may wish to see some more advanced features and how to apply them to solving the same problem. This notebook demonstrates advanced techniques in the use of graph ideas as well as improved use of python.

import xgt
conn = xgt.Connection()
conn
  <xgt.connection.Connection at 0x108d34190>

6.2.1. Establish Graph Schemas

We first try to retrieve the graph component schemas from xGT server. If that should fail, we create an empty component (vertex or edge frame) for the missing component.

try:
  devices = conn.get_vertex_frame('lanl__Devices')
except xgt.XgtNameError:
  devices = conn.create_vertex_frame(
      name='lanl__Devices',
      schema=[['device', xgt.TEXT]],
      key='device')

devices
  <xgt.graph.VertexFrame at 0x10a4618e0>
try:
  netflow = conn.get_edge_frame('lanl__Netflow')
except xgt.XgtNameError:
  netflow = conn.create_edge_frame(
      name='lanl__Netflow',
      schema=[['epoch_time', xgt.INT],
              ['duration', xgt.INT],
              ['src_device', xgt.TEXT],
              ['dst_device', xgt.TEXT],
              ['protocol', xgt.INT],
              ['src_port', xgt.INT],
              ['dst_port', xgt.INT],
              ['src_packets', xgt.INT],
              ['dst_packets', xgt.INT],
              ['src_bytes', xgt.INT],
              ['dst_bytes', xgt.INT]],
      source=devices,
      target=devices,
      source_key='src_device',
      target_key='dst_device')

netflow
  <xgt.graph.EdgeFrame at 0x10a461ca0>

Edges: The LANL dataset contains two types of data: netflow and windows log events. Of the log events recorded, some describe events within a host/device (e.g., reboots), and some describe authentication events that may be between devices (e.g., login from device A to B). We’ll call the authentication events AuthEvents and the others we’ll call HostEvents. In this notebook we load only the Netflow data and HostEvents.

try:
  host_events = conn.get_edge_frame('lanl__HostEvents')
except xgt.XgtNameError:
  host_events = conn.create_edge_frame(
      name='lanl__HostEvents',
      schema=[['epoch_time', xgt.INT],
              ['event_id', xgt.INT],
              ['log_host', xgt.TEXT],
              ['user_name', xgt.TEXT],
              ['domain_name', xgt.TEXT],
              ['logon_id', xgt.INT],
              ['process_name', xgt.TEXT],
              ['process_id', xgt.INT],
              ['parent_process_name', xgt.TEXT],
              ['parent_process_id', xgt.INT]],
      source=devices,
      target=devices,
      source_key='log_host',
      target_key='log_host')

host_events
  <xgt.graph.EdgeFrame at 0x108d34b80>
# Utility to print the data sizes currently in xGT
def print_data_summary():
  print('Devices (vertices): {:,}'.format(devices.num_vertices))
  print('Netflow (edges): {:,}'.format(netflow.num_edges))
  print('Host event (edges): {:,}'.format(host_events.num_edges))

print_data_summary()
  Devices (vertices): 933,714
  Netflow (edges): 17,882,795,024
  Host event (edges): 1,468,936,024

6.2.2. Load Data

We show how to load data only if the current data frames are empty.

%%time
if host_events.num_edges == 0:
    urls = ["https://datasets.trovares.com/LANL/xgt/wls_day-02_1v.csv"]
    host_events.load(urls)
  CPU times: user 1.46 ms, sys: 2 ms, total: 3.46 ms
  Wall time: 94.7 ms
%%time
if netflow.num_edges == 0:
    urls = ["https://datasets.trovares.com/LANL/xgt/nf_day-02.csv"]
    netflow.load(urls)
  CPU times: user 1.54 ms, sys: 1.91 ms, total: 3.45 ms
  Wall time: 94.4 ms
print_data_summary()
  Devices (vertices): 933,714
  Netflow (edges): 17,882,795,024
  Host event (edges): 1,468,936,024

6.2.3. Preprocess Graph

The LANL dataset consists of netflow edges whose direction is established using an algorithm that provides some consistency, but it is not based on the originator of the session as source as the destination of the session as the target.

Since this is the case, we first build a new edge frame containing all command and control (C2) edges. We call the new frame C2Flow.

To construct the C2Flow edge frame we need to look through all Netflow edges for records where either:

  • dst_port = 3128, in which case we copy the edge to the C2Flow frame verbatim

  • src_port = 3128, in which case we “reverse” the edge (swap src_X and dst_X, for all of the directional fields) and insert that into the C2Flow frame

# Generate a new edge frame for holding only the C2 edges
conn.drop_frame('lanl__C2Flow')
c2flow = conn.create_edge_frame(
            name='lanl__C2Flow',
            schema=netflow.schema,
            source=devices,
            target=devices,
            source_key='src_device',
            target_key='dst_device')

c2flow
  <xgt.graph.EdgeFrame at 0x108d34dc0>

6.2.3.1. Extract Forward “Command-and-control” Edges

A “forward” edge is one where the dst_port = 3128. This edge is copied verbatim to the C2Flow edge frame.

import time
def run_query(query, table_name = "lanl__Answers", show_query=False, parameters = None):
    conn.drop_frame(table_name)
    if query[-1] != '\n':
        query += '\n'
    query += 'INTO {}'.format(table_name)
    if show_query:
        print("Query:\n" + query)
    start_time = time.time()
    conn.wait_for_metrics()
    wait_time = time.time() - start_time
    if wait_time > 30:
      print('Time to wait for metrics: {:3,.2f}'.format(wait_time))
    job = conn.schedule_job(query, parameters = parameters)
    print("Launched job {}".format(job.id))
    conn.wait_for_job(job)
    table = conn.get_table_frame(table_name)
    return table
%%time
q = """
MATCH (v0)-[edge:lanl__Netflow]->(v1)
WHERE edge.dst_port=3128
CREATE (v0)-[e:lanl__C2Flow {epoch_time : edge.epoch_time,
  duration : edge.duration, protocol : edge.protocol,
  src_port : edge.src_port, dst_port : edge.dst_port,
  src_packets : edge.src_packets, dst_packets : edge.dst_packets,
  src_bytes : edge.src_bytes, dst_bytes : edge.dst_bytes}]->(v1)
RETURN count(*)
"""
r = run_query(q)
print('Number of answers: ' + '{:,}'.format(r.get_data()[0][0]))
  Launched job 9
  Number of answers: 4,249
  CPU times: user 14.4 ms, sys: 17.9 ms, total: 32.3 ms
  Wall time: 28.7 s

6.2.3.2. Extract Reverse “Command-and-control” Edges

A “reverse” edge is one where the src_port = 3128. These edges are copied to the C2Flow frame but reversed in transit. The reversal process involves swapping the: src_device and dst_device; src_port and dst_port; src_packets and dst_packets; and src_bytes and dst_bytes.

%%time
q = """
MATCH (v0)-[edge:lanl__Netflow]->(v1)
WHERE edge.src_port=3128
CREATE (v1)-[e:lanl__C2Flow {epoch_time : edge.epoch_time,
  duration : edge.duration, protocol : edge.protocol,
  src_port : edge.dst_port, dst_port : edge.src_port,
  src_packets : edge.dst_packets, dst_packets : edge.src_packets,
  src_bytes : edge.dst_bytes, dst_bytes : edge.src_bytes}]->(v0)
RETURN count(*)
"""
r = run_query(q)
print('Number of answers: ' + '{:,}'.format(r.get_data()[0][0]))
  Launched job 12
  Number of answers: 261,834
  CPU times: user 11.4 ms, sys: 10.1 ms, total: 21.5 ms
  Wall time: 29.2 s

6.2.4. Querying Our Graph

We’ll be looking for a mock pattern, similar to one that might be used to detect bot-net behavior. The pattern reflects an infected host (a) which is connecting up to a bot-net command and control node (b) with an exfiltration connection to a collection node (c).

Zombie Reboot

Zombie Reboot

  • Some device A boots up and, within a short amount of time, starts up a program.

  • Shortly afterwards, device A sends a message to some other device B.

  • Device B has a long-standing connection to another device C, which has been open for at least an hour, started before A booted, and remained open after A sent a message to B.

6.2.4.1. Query 1: Boot, Program Start, and Connection

We begin the pattern description with the boot and program start events followed by a C2Flow edge from our preprocessing. We’ll restrict it such that all the pieces (boot, program start, and C2 flow) happen within 4 seconds.

Boot, Program Start, and Connection

Boot, Program Start, and Connection

%%time
q = """
MATCH (a)-[boot:lanl__HostEvents]->(a)-[program:lanl__HostEvents]->(a)
         -[c2:lanl__C2Flow]->(b)
WHERE a <> b
  AND boot.event_id = 4608
  AND program.event_id = 4688
  AND program.epoch_time >= boot.epoch_time
  AND c2.epoch_time >= program.epoch_time
  AND c2.epoch_time - boot.epoch_time < $max_time_window
RETURN count(*)
"""
# Note the overall time limit on the sequence of the three events

r = run_query(q, parameters = {'max_time_window':4})
print('Number of boot, programstart, & c2 events: ' + '{:,}'.format(r.get_data()[0][0]))
  Launched job 15
  Number of boot, programstart, & c2 events: 5,084
  CPU times: user 29 ms, sys: 35.2 ms, total: 64.2 ms
  Wall time: 2min 2s

6.2.4.2. Query 2: Full Zombie Reboot Pattern

Finally, we add in the last network connection and match the full pattern.

Zombie Reboot

Zombie Reboot

%%time
q = """
MATCH (a)-[boot:lanl__HostEvents]->(a)-[program:lanl__HostEvents]->(a)
         -[c2:lanl__C2Flow]->(b)-[nf2:lanl__Netflow]->(c)
WHERE a <> b AND b <> c AND a <> c
  AND boot.event_id = 4608
  AND program.event_id = 4688
  AND program.epoch_time >= boot.epoch_time
  AND c2.epoch_time >= program.epoch_time
  AND c2.epoch_time - boot.epoch_time < $max_time_window
  AND nf2.duration >= $min_session_duration
  AND nf2.epoch_time < boot.epoch_time
  AND nf2.epoch_time + nf2.duration >= c2.epoch_time
RETURN count(*)
"""

r = run_query(q, parameters = {'max_time_window':4, 'min_session_duration':3600})
print('Number of zombie reboot events: ' + '{:,}'.format(r.get_data()[0][0]))
  Launched job 18
  Number of zombie reboot events: 986,440
  CPU times: user 379 ms, sys: 593 ms, total: 973 ms
  Wall time: 35min 23s

Copyright © 2017-2021 Trovares Inc