GraphX: Graph Processing in a Distributed Dataflow Framework
02 Nov 2015Takeaways from GraphX: Graph Processing in a Distributed Dataflow Framework, Gonzalez et al, OSDI, 2014.
This paper explores the design of a graph processing system (GraphX) on top of general purpose distributed dataflow system (Spark).
Existing Approach
- Existing graph processing systems evolved separately from distributed dataflow frameworks (as MR) because:
- Existing systems before Spark (as MR) emphasized single-stage on-disk computation, not particularly suitable for iterative graph algorithms.
- Did not offer fine-grained control over data-partitioning, hindering application of graph-partitioning techniques.
- Existing specialized graph processing systems have the following limitations:
- Graph computation occur as part of large analytic tasks involving unstructured data. Inflexibility in data reuse across various analytic tasks leads to higher communication and data replication.
- They typically express graph as a single property graph of vertices and edges, complicating the expression of analytics tasks spanning multiple graphs and sub-graphs.
- They typically express graph computations as vertex programs performing iterative local transformations. This is not well suited to express computations where disconnected vertices interact.
GraphX Details.
- Graph Expression.
- Property graphs are expressed as collections. The vertex collection contains the vertices keyed by unique 64-bit integer id. The edge collection contains the edge properties keyed by the source and destination vertex identifiers.
- Graph Computations.
- Dataflow operators are used to express the Gather-Apply-Scatter decomposition typically encountered in graph processing. Scatter is done by the join operator to get triplets of source vertex, edge and destination vertex. Apply is done by data-parallel map stages. Gather is done by groupBy operator.
- Optimizations.
- Two-level hash partitioning of vertex ids.
- Edge collection partitioning for efficient lookup of edges.
- Index reuse across derived vertex and edge collections.
- Multicast join via a routing table, sending vertex properties only to relevant join partitions of the edge collection.
- Partial materialization of changed vertex properties across join sites.
- Incremental view maintenance of triplets.
- Filtered index scanning restricting computation on subgraphs of active vertex sets.
- Automatic join elimination using JVM bytecode analysis to avoid joins according to property usage across vertex and edge collections.