6.4. Using Apache Arrow Flight with Trovares xGT

The outline of this notebook:

  1. Generate a table of edges with random data.

  2. Bring edge table into an Arrow format.

  3. Send Arrow Flight to Trovares xGT.

  4. Request to run a graph query on xGT.

  5. Bring results back as an Arrow Flight.

  6. Visualize results.

6.4.1. Generate edges with random data.

First, set up parameters for edge table.

The table will have four columns. The first two columns representing source and target of each edge. The other two columns representing edge attributes.

The values of the first two columns are drawn randomly in the range of 1 to NUM_NODES.

# Metadata of random data table
NUM_ROWS=100
NUM_NODES=10
# Generate table
import pandas as pd
import random
random.seed()
alpha='abcdefghijklmnopqrstuvwxyz'

data = {
    'source': [random.randint(1, NUM_NODES) for _ in range(NUM_ROWS)],
    'target': [random.randint(1, NUM_NODES) for _ in range(NUM_ROWS)],
    'attr1' : [random.choice(alpha) for _ in range(NUM_ROWS)],
    'attr2' : [random.choice(alpha) for _ in range(NUM_ROWS)],
}
df = pd.DataFrame(data)

6.4.2. Bring edge table into Arrow format.

import pyarrow as pa
import pyarrow.flight

arrow_edges = pa.Table.from_pandas(df)
arrow_edges
  pyarrow.Table
  source: int64
  target: int64
  attr1: string
  attr2: string
  ----
  source: [[6,1,6,10,2,8,8,10,3,6,...,2,10,6,6,1,10,8,2,6,6]]
  target: [[10,3,5,7,10,5,9,8,8,2,...,6,9,4,3,9,7,4,5,9,3]]
  attr1: [["i","s","r","c","k","k","c","b","x","y",...,"a","s","c","u","c","w","f","e","n","c"]]
  attr2: [["i","k","t","r","e","i","k","w","g","h",...,"m","f","k","a","q","m","z","s","v","p"]]
arrow_edges.column_names
  ['source', 'target', 'attr1', 'attr2']

6.4.3. Send Arrow Flight to Trovares xGT.

First create frames on xGT to hold the graph data.

Then establish a PyArrow Flight connection to the xGT server and send the data.

import xgt
server = xgt.Connection()
server.set_default_namespace('arrow')

# Establish vertex frame schema
try:
    node = server.get_vertex_frame('Node')
except:
    node = server.create_vertex_frame(name = "Node", schema = [['id', xgt.INT]], key='id')

def pyarrow_type_to_xgt_type(pyarrow_type):
    if pa.types.is_integer(pyarrow_type):
        return xgt.INT
    if pa.types.is_floating(pyarrow_type):
        return xgt.FLOAT
    if pa.types.is_boolean(pyarrow_type):
        return xgt.BOOLEAN
    return xgt.TEXT

# Establish edge frame schema
xgt_schema = [[c.name, pyarrow_type_to_xgt_type(c.type)] for c in arrow_edges.schema]
edge = server.create_edge_frame(
    name = "Edge",
    schema = xgt_schema,
    source = node, source_key = "source",
    target = node, target_key = "target",
)
# Establish a PyArrow Flight connection to xGT server

import pyarrow.flight
class BasicClientAuthHandler(pyarrow.flight.ClientAuthHandler):
  def __init__(self, username, password):
    super().__init__()
    self.basic_auth = pyarrow.flight.BasicAuth(username, password)
    self.token = None
  def __init__(self):
    super().__init__()
    self.basic_auth = pyarrow.flight.BasicAuth()
    self.token = None
  def authenticate(self, outgoing, incoming):
    auth = self.basic_auth.serialize()
    outgoing.write(auth)
    self.token = incoming.read()
  def get_token(self):
    return self.token

arrow_conn = pyarrow.flight.FlightClient(("localhost", 4367))
arrow_conn.authenticate(BasicClientAuthHandler())

# send pyarrow edge table to xGT server as a pyarrow.flight
writer, _ = arrow_conn.do_put(
        pyarrow.flight.FlightDescriptor.for_path('arrow', 'Edge'), arrow_edges.schema)
writer.write_table(arrow_edges)
writer.close()

6.4.4. Request to run a graph query on xGT.

query = """
MATCH (a)-[e1:Edge]->(b)-[e2:Edge]->(c)
WHERE a <> b and b <> c and a <> c
  AND e1.attr1 = e2.attr1
RETURN DISTINCT a.id AS a, c.id AS c, e1.attr1 AS attr1
INTO answer_set
"""

job = server.run_job(query)
data = job.get_data_pandas()
data

6.4.5. Bring results back as an Arrow Flight.

answers = arrow_conn.do_get(pyarrow.flight.Ticket(b"arrow__answer_set")).read_all()
answers
  pyarrow.Table
  a: int64
  c: int64
  attr1: string
  ----
  a: [[9,10,2,10,1,1,9,1,2,6,...,9,4,2,10,6,9,4,8,5,2]]
  c: [[2,3,5,4,5,9,2,6,1,9,...,1,8,9,2,10,3,7,6,9,4]]
  attr1: [["c","c","x","c","k","h","l","h","e","y",...,"l","j","x","c","y","c","m","c","y","t"]]

6.4.6. Visualize results.

try:
    import graphviz
    from IPython.display import Image

    gr = graphviz.Digraph()
    # Draw nodes for the union of all source and target nodes in the two paths
    for node in set(answers.column('a')) | set(answers.column('c')):
        gr.node(str(node))
    # Draw edges for each returned edge
    for batch in answers.to_batches():
        d = batch.to_pydict()
        for a, c, attr1 in zip(d['a'], d['c'], d['attr1']):
            # Do something with the row
            gr.edge(str(a), str(c), label=f'"{str(attr1)}"')

except ImportError:
    print("You must install graphviz in order to vizualize the results as a graph")
    print("\n!python3 -m pip install graphviz")
    gr = None

gr
../../_images/arrow_flight_15_0.svg