2.6. Job Management

Given the size and scope of data that xGT is designed for, many tasks could take a significant amount of time. Some of these tasks include long-running queries, as well as data ingest and egest to and from the server. To improve server utilization and avoid blocking on these long-running tasks, xGT uses asynchronous execution, in which several pending operations can be scheduled for later completion.

A job is the unit of asynchronous execution inside xGT. It encompasses a unit of work that will execute asynchronously with respect to its submission by the user. In many cases, the submission of a job to the xGT server will allow the Python client to continue execution while the server is processing the long-running task.

Currently, the xGT client presents a job submission interface to the user for TQL queries (The Trovares Query Language (TQL)). Queries can be scheduled to run on the server via schedule_job() in a deferred manner. They can also be run in a synchronous manner via run_job() for which the Python client waits until the query is finished:

job = server.run_job("MATCH (a)-[e:EdgeFrame]->(b) RETURN COUNT(*)")

The status of jobs in the system can be queried via get_jobs(), which returns a job status object for each job requested in the call (or all jobs that have been executed if the request is empty).

Each running job encapsulates an atomic unit of execution called a transaction. Asynchronous jobs can be canceled via cancel_job(). It is possible to cancel any job that has not reached the committing stage of its transaction. Cancellation is allowed only during non-committing phases of the transaction to guarantee data integrity and atomicity.

The wait_for_job() method blocks the client until the specified job has finished its asynchronous execution. If the job had already completed execution (successfully or unsuccessfully) the method returns immediately.

2.6.1. Job Object

Several methods in the xGT Python interface return an xgt.Job object. An instance of the Job class represents the current state of a running or completed job that was executed by the xGT server.

Job execution is used for several long-running functionalities in xGT including running TQL queries, as well as loading or saving data from and to the server. The following functions return Job instances that indicate the status of the execution:

2.6.2. Storing Query Results in Jobs

As indicated in the previous sections, it is possible to run a query on the xGT server that does not need to use a named results table frame to produce its results. The results are stored in the job instead of a frame. Short-running queries that produce just a few rows of data can be run in this way to avoid the overhead of creating and managing a named results table frame.

To run a query in this manner, simply omit the INTO clause from the query. For example:

MATCH (a:VertexFrame)-[e:EdgeFrame]->(b)
WHERE a.id = 1 AND e.property > 10.5
RETURN count(*)

This query will return a single row with the count of the paths in the graph that match the specified conditions.

The maximum number of rows that can be returned as a job result is 1,000 to limit the potential memory overhead of managing very large results outside of table frames. However, the total number of matches produced internally by the query is stored as a property in the job object. In this way, if the results are of interest, the query can be re-executed using a named results table frame.

Both the run_job() and schedule_job() methods can be used to execute a query that stores results in the job. Note that for the schedule_job() asynchronous method the results of the query will not be available right away. wait_for_job() must be called for the corresponding job to obtain the results of the query.

The following examples show how to run queries that store results in the job instead of creating an output table frame.

job1 = conn.schedule_job("MATCH (a)-[]->(b) RETURN count(*)")
conn.wait_for_job(job1)
count = job1.get_data()[0][0]

job2 = conn.run_job("MATCH (a)-[]->(b) RETURN count(*)")
count = job2.get_data()[0][0]

In addition to the resulting data rows and total number of matches produced by the query, the implied schema of the resulting rows is also returned as a property of the job object.

To retrieve the results, get_data(), get_data_pandas(), and get_data_arrow() are provided as methods of the xgt.Job class with a similar interface to the same functions on the frame classes (e.g., xgt.TableFrame.get_data()).

If the query had generated row security labels on the resulting rows, those labels would not be included in the row data stored in the job.

Note that the jobs that store results are stored in the job history frame described in Section Job History. Up to 1,000 rows will be stored per job. Thus, if many jobs are executed storing results, the memory resources used to store them in the job history can grow to a non-trivial level.

2.6.3. Job History

The get_jobs() method provides all the details of the jobs specified in an input parameter list or of all jobs that have been executed on the xGT server. The number of executed jobs can be large if the xGT server has been running for quite a while or it is being used by many users. The resulting list of Job objects returned by get_jobs() may include jobs that are not relevant.

xGT supports querying the jobs in the system by using TQL. The xGT server uses two special system frames to hold information for the jobs in the system:

  • xgt__Running_Jobs which holds information on currently running jobs.

  • xgt__Job_History which holds information on jobs that have reached a final execution state (completed, failed, canceled or rolled back).

Both frames are queryable via TQL for users having the right security permissions to read them. Since these are special system frames held by xGT, their schemas and other properties are not accessible via Python proxy frame objects (TableFrame, VertexFrame or EdgeFrame). However, both jobs frames have a name, a schema and rows storing relevant data for jobs.

The schema for the xgt__Running_Jobs frame is as follows:

Field

Type

Description

job_id

int

A unique integer identifying a job.

user

string

The name of the user who submitted this job.

exception_what

string

A description of the exception that was thrown (many of which come from the C++ code in the xGT server).

exception_trace

string

If available, this contains the text of a stack trace.

is_user_job

bool

Indicates whether this job was user-submitted, or created as part of some requested operation. An example of a requested operation that may lead to a job is dropping a frame.

current_status

string

See xgt.Job.status.

start_time

datetime

The time at which the job was launched to be run by the server. Note that, particularly when the server is busy, this may be later than the time when a user submitted the job.

end_time

datetime

This is really just a placeholder for when a job reaches a completion state. It holds a value of datetime(epochSeconds = 0).

Here is the schema for the xgt__Job_History frame:

Field

Type

Description

job_id

int

A unique integer identifying a job.

user

string

The name of the user who submitted this job.

exception_what

string

A description of the exception that was thrown (many of which come from the C++ code in the xGT server).

exception_trace

string

If available, this contains the text of a stack trace.

start_time

datetime

The time at which the job was launched to be run by the server. Note that, particularly when the server is busy, this may be later than the time when a user submitted the job.

end_time

datetime

The time at which the job reached its final state. If the transaction ended with a rollback, this is the time at which that decision was made. It may be the time that a user canceled the job, or the time it completed successfully.

final_status

string

See xgt.Job.status.

error_code

int

If this is a positive number, it indicates the exception type inside the C++ code of the server that initiated the completion of the transaction.

MATCH and DELETE TQL operations are supported on the xgt__Job_History frame, while only MATCH operations are supported on the xgt__Running_Jobs frame. Note that CREATE and SET TQL operations are NOT supported for either due to the special nature of creating a job.

The running jobs frame could be queried as follows:

MATCH (a:xgt__Running_Jobs)
WHERE a.user = "alice" AND a.start_time >= datetime("2020-07-27T00:00:00")
RETURN a.current_status

This query returns the current status of jobs started by the user alice, which have been executing since midnight on the specified date.

Similarly, the job history frame can be queried as follows:

MATCH (a:xgt__Job_History)
WHERE a.user = "alice" AND a.end_time <= datetime("2020-07-27T00:00:00")
RETURN a.job_id, a.final_status

This query returns the job ID and the final status of jobs ran by alice that completed before the specified date.

Deletions from the job history can be executed as follows, provided that the user has delete permissions on the job history frame:

MATCH (a:xgt__Job_History)
WHERE a.user = "alice" AND a.end_time <= datetime("2020-07-27T00:00:00")
DELETE a

This query deletes jobs from the job history that were run by the user alice and completed before the specified date. Deletion queries can be used to free up memory space held by those job entries as well as reduce the overall system job history size.