6.4. Using Apache Arrow Flight with Trovares xGT¶
The outline of this notebook:
Generate a table of edges with random data.
Bring edge table into an Arrow format.
Send Arrow Flight to Trovares xGT.
Request to run a graph query on xGT.
Bring results back as an Arrow Flight.
Visualize results.
6.4.1. Generate edges with random data.¶
First, set up parameters for edge table.
The table will have four columns. The first two columns representing source and target of each edge. The other two columns representing edge attributes.
The values of the first two columns are drawn randomly in the range of
1
to NUM_NODES
.
# Metadata of random data table
NUM_ROWS=100
NUM_NODES=10
# Generate table
import pandas as pd
import random
random.seed()
alpha='abcdefghijklmnopqrstuvwxyz'
data = {
'source': [random.randint(1, NUM_NODES) for _ in range(NUM_ROWS)],
'target': [random.randint(1, NUM_NODES) for _ in range(NUM_ROWS)],
'attr1' : [random.choice(alpha) for _ in range(NUM_ROWS)],
'attr2' : [random.choice(alpha) for _ in range(NUM_ROWS)],
}
df = pd.DataFrame(data)
6.4.2. Bring edge table into Arrow format.¶
import pyarrow as pa
import pyarrow.flight
arrow_edges = pa.Table.from_pandas(df)
arrow_edges
pyarrow.Table
source: int64
target: int64
attr1: string
attr2: string
----
source: [[6,1,6,10,2,8,8,10,3,6,...,2,10,6,6,1,10,8,2,6,6]]
target: [[10,3,5,7,10,5,9,8,8,2,...,6,9,4,3,9,7,4,5,9,3]]
attr1: [["i","s","r","c","k","k","c","b","x","y",...,"a","s","c","u","c","w","f","e","n","c"]]
attr2: [["i","k","t","r","e","i","k","w","g","h",...,"m","f","k","a","q","m","z","s","v","p"]]
arrow_edges.column_names
['source', 'target', 'attr1', 'attr2']
6.4.3. Send Arrow Flight to Trovares xGT.¶
First create frames on xGT to hold the graph data.
Then establish a PyArrow Flight connection to the xGT server and send the data.
import xgt
server = xgt.Connection()
server.set_default_namespace('arrow')
# Establish vertex frame schema
try:
node = server.get_vertex_frame('Node')
except:
node = server.create_vertex_frame(name = "Node", schema = [['id', xgt.INT]], key='id')
def pyarrow_type_to_xgt_type(pyarrow_type):
if pa.types.is_integer(pyarrow_type):
return xgt.INT
if pa.types.is_floating(pyarrow_type):
return xgt.FLOAT
if pa.types.is_boolean(pyarrow_type):
return xgt.BOOLEAN
return xgt.TEXT
# Establish edge frame schema
xgt_schema = [[c.name, pyarrow_type_to_xgt_type(c.type)] for c in arrow_edges.schema]
edge = server.create_edge_frame(
name = "Edge",
schema = xgt_schema,
source = node, source_key = "source",
target = node, target_key = "target",
)
# Establish a PyArrow Flight connection to xGT server
import pyarrow.flight
class BasicClientAuthHandler(pyarrow.flight.ClientAuthHandler):
def __init__(self, username, password):
super().__init__()
self.basic_auth = pyarrow.flight.BasicAuth(username, password)
self.token = None
def __init__(self):
super().__init__()
self.basic_auth = pyarrow.flight.BasicAuth()
self.token = None
def authenticate(self, outgoing, incoming):
auth = self.basic_auth.serialize()
outgoing.write(auth)
self.token = incoming.read()
def get_token(self):
return self.token
arrow_conn = pyarrow.flight.FlightClient(("localhost", 4367))
arrow_conn.authenticate(BasicClientAuthHandler())
# send pyarrow edge table to xGT server as a pyarrow.flight
writer, _ = arrow_conn.do_put(
pyarrow.flight.FlightDescriptor.for_path('arrow', 'Edge'), arrow_edges.schema)
writer.write_table(arrow_edges)
writer.close()
6.4.4. Request to run a graph query on xGT.¶
query = """
MATCH (a)-[e1:Edge]->(b)-[e2:Edge]->(c)
WHERE a <> b and b <> c and a <> c
AND e1.attr1 = e2.attr1
RETURN DISTINCT a.id AS a, c.id AS c, e1.attr1 AS attr1
INTO answer_set
"""
job = server.run_job(query)
data = job.get_data_pandas()
data
6.4.5. Bring results back as an Arrow Flight.¶
answers = arrow_conn.do_get(pyarrow.flight.Ticket(b"arrow__answer_set")).read_all()
answers
pyarrow.Table
a: int64
c: int64
attr1: string
----
a: [[9,10,2,10,1,1,9,1,2,6,...,9,4,2,10,6,9,4,8,5,2]]
c: [[2,3,5,4,5,9,2,6,1,9,...,1,8,9,2,10,3,7,6,9,4]]
attr1: [["c","c","x","c","k","h","l","h","e","y",...,"l","j","x","c","y","c","m","c","y","t"]]
6.4.6. Visualize results.¶
try:
import graphviz
from IPython.display import Image
gr = graphviz.Digraph()
# Draw nodes for the union of all source and target nodes in the two paths
for node in set(answers.column('a')) | set(answers.column('c')):
gr.node(str(node))
# Draw edges for each returned edge
for batch in answers.to_batches():
d = batch.to_pydict()
for a, c, attr1 in zip(d['a'], d['c'], d['attr1']):
# Do something with the row
gr.edge(str(a), str(c), label=f'"{str(attr1)}"')
except ImportError:
print("You must install graphviz in order to vizualize the results as a graph")
print("\n!python3 -m pip install graphviz")
gr = None
gr