2.6. Job Management¶
Given the size and scope of data that xGT is designed for, many tasks could take a significant amount of time. Some of these tasks include long-running queries, as well as data ingest and egest to and from the server. To improve server utilization and avoid blocking on these long-running tasks, xGT uses asynchronous execution, in which several pending operations can be scheduled for later completion.
A job is the unit of asynchronous execution inside xGT. It encompasses a unit of work that will execute asynchronously with respect to its submission by the user. In many cases, the submission of a job to the xGT server will allow the Python client to continue execution while the server is processing the long-running task.
Currently, the xGT client presents a job submission interface to the user for TQL queries (The Trovares Query Language (TQL)).
Queries can be scheduled to run on the server via schedule_job()
in a deferred manner.
They can also be run in a synchronous manner via run_job()
for which the Python client waits until the query is finished:
job = server.run_job("MATCH (a)-[e:EdgeFrame]->(b) RETURN COUNT(*)")
The status of jobs in the system can be queried via get_jobs()
, which returns a job status object for each job requested in the call (or all jobs that have been executed if the request is empty).
Each running job encapsulates an atomic unit of execution called a transaction.
Asynchronous jobs can be canceled via cancel_job()
.
It is possible to cancel any job that has not reached the committing stage of its transaction.
Cancellation is allowed only during non-committing phases of the transaction to guarantee data integrity and atomicity.
The wait_for_job()
method blocks the client until the specified job has finished its asynchronous execution.
If the job had already completed execution (successfully or unsuccessfully) the method returns immediately.
2.6.1. Job Object¶
Several methods in the xGT Python interface return an xgt.Job
object.
An instance of the Job
class represents the current state of a running or completed job that was executed by the xGT server.
Job execution is used for several long-running functionalities in xGT including running TQL queries, as well as loading or saving data from and to the server.
The following functions return Job
instances that indicate the status of the execution:
2.6.2. Storing Query Results in Jobs¶
As indicated in the previous sections, it is possible to run a query on the xGT server that does not need to use a named results table frame to produce its results. The results are stored in the job instead of a frame. Short-running queries that produce just a few rows of data can be run in this way to avoid the overhead of creating and managing a named results table frame.
To run a query in this manner, simply omit the INTO
clause from the query.
For example:
MATCH (a:VertexFrame)-[e:EdgeFrame]->(b)
WHERE a.id = 1 AND e.property > 10.5
RETURN count(*)
This query will return a single row with the count of the paths in the graph that match the specified conditions.
The maximum number of rows that can be returned as a job result is 1,000 to limit the potential memory overhead of managing very large results outside of table frames. However, the total number of matches produced internally by the query is stored as a property in the job object. In this way, if the results are of interest, the query can be re-executed using a named results table frame.
Both the run_job()
and schedule_job()
methods can be used to execute a query that stores results in the job.
Note that for the schedule_job()
asynchronous method the results of the query will not be available right away.
wait_for_job()
must be called for the corresponding job to obtain the results of the query.
The following examples show how to run queries that store results in the job instead of creating an output table frame.
job1 = conn.schedule_job("MATCH (a)-[]->(b) RETURN count(*)")
conn.wait_for_job(job1)
count = job1.get_data()[0][0]
job2 = conn.run_job("MATCH (a)-[]->(b) RETURN count(*)")
count = job2.get_data()[0][0]
In addition to the resulting data rows and total number of matches produced by the query, the implied schema of the resulting rows is also returned as a property of the job object.
To retrieve the results, get_data()
, get_data_pandas()
, and get_data_arrow()
are provided as methods of the xgt.Job
class with a similar interface to the same functions on the frame classes (e.g., xgt.TableFrame.get_data()
).
If the query had generated row security labels on the resulting rows, those labels would not be included in the row data stored in the job.
Note that the jobs that store results are stored in the job history frame described in Section Job History. Up to 1,000 rows will be stored per job. Thus, if many jobs are executed storing results, the memory resources used to store them in the job history can grow to a non-trivial level.
2.6.3. Job History¶
The get_jobs()
method provides all the details of the jobs specified in an input parameter list or of all jobs that have been executed on the xGT server.
The number of executed jobs can be large if the xGT server has been running for quite a while or it is being used by many users.
The resulting list of Job
objects returned by get_jobs()
may include jobs that are not relevant.
xGT supports querying the jobs in the system by using TQL. The xGT server uses two special system frames to hold information for the jobs in the system:
xgt__Running_Jobs
which holds information on currently running jobs.xgt__Job_History
which holds information on jobs that have reached a final execution state (completed, failed, canceled or rolled back).
Both frames are queryable via TQL for users having the right security permissions to read them.
Since these are special system frames held by xGT, their schemas and other properties are not accessible via Python proxy frame objects (TableFrame
, VertexFrame
or EdgeFrame
).
However, both jobs frames have a name, a schema and rows storing relevant data for jobs.
The schema for the xgt__Running_Jobs
frame is as follows:
Field |
Type |
Description |
---|---|---|
job_id |
int |
A unique integer identifying a job. |
user |
string |
The name of the user who submitted this job. |
exception_what |
string |
A description of the exception that was thrown (many of which come from the C++ code in the xGT server). |
exception_trace |
string |
If available, this contains the text of a stack trace. |
is_user_job |
bool |
Indicates whether this job was user-submitted, or created as part of some requested operation. An example of a requested operation that may lead to a job is dropping a frame. |
current_status |
string |
See |
start_time |
datetime |
The time at which the job was launched to be run by the server. Note that, particularly when the server is busy, this may be later than the time when a user submitted the job. |
end_time |
datetime |
This is really just a placeholder for when a job reaches a completion state. It holds a value of datetime(epochSeconds = 0). |
Here is the schema for the xgt__Job_History
frame:
Field |
Type |
Description |
---|---|---|
job_id |
int |
A unique integer identifying a job. |
user |
string |
The name of the user who submitted this job. |
exception_what |
string |
A description of the exception that was thrown (many of which come from the C++ code in the xGT server). |
exception_trace |
string |
If available, this contains the text of a stack trace. |
start_time |
datetime |
The time at which the job was launched to be run by the server. Note that, particularly when the server is busy, this may be later than the time when a user submitted the job. |
end_time |
datetime |
The time at which the job reached its final state. If the transaction ended with a rollback, this is the time at which that decision was made. It may be the time that a user canceled the job, or the time it completed successfully. |
final_status |
string |
See |
error_code |
int |
If this is a positive number, it indicates the exception type inside the C++ code of the server that initiated the completion of the transaction. |
MATCH
and DELETE
TQL operations are supported on the xgt__Job_History
frame, while only MATCH
operations are supported on the xgt__Running_Jobs
frame.
Note that CREATE
and SET
TQL operations are NOT supported for either due to the special nature of creating a job.
The running jobs frame could be queried as follows:
MATCH (a:xgt__Running_Jobs)
WHERE a.user = "alice" AND a.start_time >= datetime("2020-07-27T00:00:00")
RETURN a.current_status
This query returns the current status of jobs started by the user alice, which have been executing since midnight on the specified date.
Similarly, the job history frame can be queried as follows:
MATCH (a:xgt__Job_History)
WHERE a.user = "alice" AND a.end_time <= datetime("2020-07-27T00:00:00")
RETURN a.job_id, a.final_status
This query returns the job ID and the final status of jobs ran by alice that completed before the specified date.
Deletions from the job history can be executed as follows, provided that the user has delete permissions on the job history frame:
MATCH (a:xgt__Job_History)
WHERE a.user = "alice" AND a.end_time <= datetime("2020-07-27T00:00:00")
DELETE a
This query deletes jobs from the job history that were run by the user alice and completed before the specified date. Deletion queries can be used to free up memory space held by those job entries as well as reduce the overall system job history size.