Data Loading and Export

Generally, users load data into a type that Metagraph is aware of, and has a translation path to the desired graph type via the plugins that have been installed. Metagraph also provides special support for loading data into a distributed graph structure (see Distributed COO to CSR conversion).

Loading

For example, to load an edge list from a CSV file, a user might use pandas and build a DataFrame.

import pandas as pd
df = pd.read_csv(csv_file)

To be used by Metagraph, this needs to be a known data object. Pandas DataFrames are known in Metagraph, but they are interpreted as a DataFrame. If we wanted this edge list to be interpreted as an EdgeSet, we would need to construct it ourselves.

es = mg.wrappers.EdgeSet.PandasEdgeSet(df, src_label="Column1", dst_label="Column2")

Now es can be used within Metagraph and passed to algorithms which expect an EdgeSet. Translation and all the other Metagraph magic will work.

Exporting

When the final result is computed and a user wants to retrieve the results from the data object, again the process is mostly manual for the user.

The first step is to get the data in the right data format. Calling translate on the resolver is the easiest way to convert to the desired type.

If the data object is a Wrapper object, the actual data object will be need to be retrieved. The typical convention for this is to have a .value attribute holding on to the actual data object. This is only a convention and may not be honored by all plugins. Additionally, some wrappers hold multiple data objects and there is no convention for secondary data object attribute names. Looking at the wrapper’s docstring or inspecting the code is the preferred way to understand how the internal pieces are stored.

Mutating Objects

In general, wrappers in Metagraph assume a non-mutating data object as input. If a wrapper is constructed from an object which is then mutated, the wrapper will almost certainly see the mutation. This is problematic because Metagraph caches properties which may not be valid after the mutation.

Even without using a wrapper, Metagraph will cache the properties of raw data objects used in Metagraph translations and algorithms. Mutating the raw data object may cause unexpected behavior because the reported properties may be invalid.

In short, do not mutate objects which are still in active use within Metagraph. There is a way of forcing the property cache to clear if mutation is impossible to avoid, but this is generally discouraged.

Distributed COO to CSR conversion

Metagraph has a special interface for loading COO graph data in a Dask Dataframe into a distributed CSR graph data structure. Metagraph does not provide any distributed graph data structures itself, but a Metagraph plugin may offer a CSR graph object that is distributed across the Dask worker systems and is therefore visible to all workers. In this situation, Metagraph offers support for parallel loading of data into the distributed CSR object.

metagraph.core.dask.loader.load_coo_to_csr(coo: dask.dataframe.core.DataFrame, shape: Tuple[int, int], loader: metagraph.core.dask.loader.CSRLoader, row='row', col='col', value='value', client=None)

Parallel conversion of COO graph in a Dask dataframe to a CSR graph.

The Dask dataframe coo will be interpreted as a graph in COO format where the row, column, and edge value column names are given by row, col, and value. The dimensions of the final CSR sparse adjacency matrix are given by shape.

Creation and management of the target CSR graph object is handled by the loader class, which must be a subclass of CSRLoader.

Note that the algorithm used by this function for parallel translation only makes sense for distributed CSR data structures that can be accessed directly by all Dask workers in the cluster. A loader for a CSR matrix stored in POSIX shared memory is provided as an example (SharedCSRLoader) that runs on single system, multi-process Dask clusters.

The return value from this function is a Dask future for a CSR object of the type created by loader.

Because of the variety of potential distributed CSR implementations, load_coo_to_csr requires a loader class that implements all of the following methods:

class metagraph.core.dask.loader.CSRLoader
static allocate(shape, nvalues, pointers_dtype, indices_dtype, values_dtype)

Return a new, empty distributed CSR object.

The 2D shape, number of values, and dtypes of the pointers, indices, and values arrays are needed.

static dask_incref(csr)

Perform whatever actions are required to incref the shared resources associated with this CSR object.

This method should communicate with the Dask scheduler plugin to indicate the Dask keys associated with cluster-wide shared resources.

classmethod finalize(csr, plan, chunks: List)

Perform any final CSR construction tasks based on the csr returned by allocate() and the chunk data returned by load_chunk().

This is run immediately by the loader and not inside a delayed() function.

Return the final CSR data structure to be used.

static load_chunk(csr, row_offset: int, pointers: numpy.ndarray, value_offset: int, indices: numpy.ndarray, values: numpy.ndarray)

Copy a chunk of CSR data into the distributed CSR object at the given offset.

Optionally return data that will be passed to finalize().

static register_dask_scheduler_plugin(client: distributed.client.Client)

Register any required scheduler plugins with the Dask client.

These plugins should cooperate with the dask_incref method to track the lifetime of any cluster-wide shared resources and free them when they have been forgotten by the Dask cluster.

Scheduler plugins receive a callback for every state transition in the DAG, so the plugin should free resources as appropriate when Dask keys are forgotten.

For details on how to create a new loader (especially how to manage shared resource lifetimes), see the implementation of the SharedCSRLoader class.