3.1. Wrappers to the Python API¶
The xgt
client provides a minimal Python API to the xGT server.
It may be helpful for a user to implement site-specific Python functions that serve as their local interface to the xGT server.
We provide some examples.
3.1.1. Frame Drop¶
Dropping a frame may seem like it should always work, but there is a small chance that other transactions may cause a drop_frame()
request to rollback.
Rather than deal with catching exceptions on every drop frame request, one can call the following wrapper function:
def drop_frame(connection, frame, max_retry = None):
"""
Try to drop a frame until one of these outcomes occurs:
- The drop_frame succeeds.
- A max_retry was provided by the caller and that many retries were attempted.
In this situation, an exception is thrown back to the caller.
"""
import time
wait_time = 1
retry = 0
done = False
while not done:
try:
connection.drop_frame(frame)
done = True
except xgt.XgtTransactionError:
if max_retry is not None and retry >= max_retry:
raise
time.sleep(wait_time)
wait_time *= 2 # use exponential backoff on the wait time
retry += 1
There is an order dependency when dropping multiple frames. When trying to drop a vertex frame, all the incident edge frames must be dropped first. This next function will drop a list of frames after determining an order of dropping them that will succeed.
def drop_frames(connection, frame_list, max_retry = None):
"""
Drop a list of frames in an order that will allow them all to be dropped.
This means that all Edge frames will be dropped first, followed by the
other frame types.
"""
all_edge_frames = [_.name for _ in connection.get_edge_frames()]
frame_names = [_.name if isinstance(_, xgt.TableFrame) else _ for _ in frame_list]
edge_frames = [_ for _ in frame_names if _ in all_edge_frames]
non_edge_frames = [_ for _ in frame_names if _ not in all_edge_frames]
for frame in edge_frames:
drop_frame(connection, frame, max_retry)
for frame in non_edge_frames:
drop_frame(connection, frame, max_retry)
3.1.2. Run Query¶
The Python API already has two methods of submitting a query for execution by the server: run_job()
and schedule_job()
.
In both of these cases, a Job
object is returned.
For queries that have no INTO table_name
clause, the user may wish to simply get the data back.
This is particularly useful for queries that do only RETURN COUNT(*)
for the returned result.
Here is an example of a run_query
function to perform that task:
def run_query(connection, query, max_retry = 5):
"""
Run a query, waiting for it to complete, and return a tuple with these
items: (data, job)
If the query fails to complete (e.g., it rolls back), retry up to
max_retry times.
If the data is a single row and a single column, return the data as a scalar,
otherwise return it as a list of lists (array of arrays).
"""
retry = 0
while (True):
try:
job = connection.run_job(query)
data = job.get_data()
if data is not None and len(data) == 1 and len(job.schema) == 1:
datum = data[0][0]
if job.schema[0][1] == xgt.INT:
return (int(datum), job)
elif job.schema[0][1] == xgt.FLOAT:
return (float(datum), job)
else:
return (str(datum), job)
return (data, job)
except xgt.XgtTransactionError:
if retry >= max_retry:
raise
retry += 1