Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement][Spark] Provide APIs for data transformation at the graph level #113

Merged
merged 6 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/user-guide/spark-lib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,38 @@ To utilize the GAR Spark reader, please refer to the following example code.

See `TestReader.scala`_ for the complete example.


Graph Transformer
``````````````````
The Graph Transformer is a helper object in the GraphAr Spark library, designed to assist with data transformation at the graph level. It takes two GraphInfo objects (or paths of two yaml files) as inputs: one for the source graph, and one for the destination graph. The transformer will then load data from existing GAR files for the source graph, utilizing the GraphAr Spark Reader and the meta data defined in the source GraphInfo. After reorganizing the data according to the destination GraphInfo, it generates new GAR chunk files with the GraphAr Spark Writer.

.. code:: scala

// transform graphs by yaml paths
val spark = ... // the Spark session
val source_path = ... // e.g., /tmp/source.graph.yml
val dest_path = ... // e.g., /tmp/dest.graph.yml
GraphTransformer.transform(source_path, dest_path, spark)

// transform graphs by information objects
val source_info = ...
val dest_info = ...
GraphTransformer.transform(source_info, dest_info, spark)


We provide an example in `TestGraphTransformer.scala`_, which demonstrates how to conduct data transformation from the `source graph <https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/parquet/ldbc_sample.graph.yml>`_ to the `destination graph <https://github.com/GraphScope/gar-test/blob/main/transformer/ldbc_sample.graph.yml>`_.

The Graph Transformer can be used for various purposes, including transforming GAR data between different file types (e.g. from ORC to Parquet), transforming between different adjList types (e.g. from COO to CSR), selecting properties or regrouping them, and setting a new chunk size.

.. note::
There are certain limitations while using the Graph Transformer:

- The vertices (or edges) of the source and destination graphs are aligned by labels, meaning each vertex/edge label included in the destination graph must have an equivalent in the source graph, in order for the related chunks to be loaded as the data source.
- For each group of vertices/edges (i.e., each single label), each property included in the destination graph (defined in the relevant VertexInfo/EdgeInfo) must also be present in the source graph.

In addition, users can use the GraphAr Spark Reader/Writer to conduct data transformation more flexibly at the vertex/edge table level, as opposed to the graph level. This allows for a more granular approach to transforming data, as `TransformExample.scala`_ shows.


More examples
``````````````````
For more information on usage, please refer to the examples:
Expand All @@ -186,6 +218,8 @@ For more information on usage, please refer to the examples:

.. _TestReader.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/TestReader.scala

.. _TestGraphTransformer.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala

.. _ComputeExample.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala

.. _TransformExample.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala
194 changes: 194 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.graph

import com.alibaba.graphar.{GeneralParams, AdjListType, GraphInfo, VertexInfo, EdgeInfo}
import com.alibaba.graphar.reader.{VertexReader, EdgeReader}
import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter}

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

/** The helper object for transforming graphs through the definitions of their infos. */
object GraphTransformer {
/** Construct the map of (vertex label -> VertexInfo) for a graph. */
private def constructVertexInfoMap(prefix: String, graphInfo: GraphInfo, spark: SparkSession): Map[String, VertexInfo] = {
var vertex_infos_map: Map[String, VertexInfo] = Map()
val vertices_yaml = graphInfo.getVertices
val vertices_it = vertices_yaml.iterator
while (vertices_it.hasNext()) {
val file_name = vertices_it.next()
val path = prefix + file_name
val vertex_info = VertexInfo.loadVertexInfo(path, spark)
vertex_infos_map += (vertex_info.getLabel -> vertex_info)
}
return vertex_infos_map
}

/** Construct the map of (edge label -> EdgeInfo) for a graph. */
private def constructEdgeInfoMap(prefix: String, graphInfo: GraphInfo, spark: SparkSession): Map[String, EdgeInfo] = {
var edge_infos_map: Map[String, EdgeInfo] = Map()
val edges_yaml = graphInfo.getEdges
val edges_it = edges_yaml.iterator
while (edges_it.hasNext()) {
val file_name = edges_it.next()
val path = prefix + file_name
val edge_info = EdgeInfo.loadEdgeInfo(path, spark)
val key = edge_info.getSrc_label + GeneralParams.regularSeperator + edge_info.getEdge_label + GeneralParams.regularSeperator + edge_info.getDst_label
edge_infos_map += (key -> edge_info)
}
return edge_infos_map
}

/** Transform the vertex chunks following the meta data defined in graph info objects.
*
* @param sourceGraphInfo The info object for the source graph.
* @param destGraphInfo The info object for the destination graph.
* @param sourceVertexInfosMap The map of (vertex label -> VertexInfo) for the source graph.
* @param spark The Spark session for the transformer.
*/
private def transformAllVertices(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, sourceVertexInfosMap: Map[String, VertexInfo], spark: SparkSession): Unit = {
val source_prefix = sourceGraphInfo.getPrefix
val dest_prefix = destGraphInfo.getPrefix

// traverse vertex infos of the destination graph
val dest_vertices_it = destGraphInfo.getVertices.iterator
while (dest_vertices_it.hasNext()) {
// load dest edge info
val path = dest_prefix + dest_vertices_it.next()
val dest_vertex_info = VertexInfo.loadVertexInfo(path, spark)
// load source vertex info
val label = dest_vertex_info.getLabel()
if (!sourceVertexInfosMap.contains(label)) {
throw new IllegalArgumentException
}
val source_vertex_info = sourceVertexInfosMap(label)
// read vertex chunks from the source graph
val reader = new VertexReader(source_prefix, source_vertex_info, spark)
val df = reader.readAllVertexPropertyGroups(true)
// write vertex chunks for the dest graph
val writer = new VertexWriter(dest_prefix, dest_vertex_info, df)
writer.writeVertexProperties()
}
}

/** Transform the edge chunks following the meta data defined in graph info objects.
*
* @param sourceGraphInfo The info object for the source graph.
* @param destGraphInfo The info object for the destination graph.
* @param sourceVertexInfosMap The map of (vertex label -> VertexInfo) for the source graph.
* @param sourceEdgeInfosMap The map of (edge label -> EdgeInfo) for the source graph.
* @param spark The Spark session for the transformer.
*/
private def transformAllEdges(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, sourceVertexInfosMap: Map[String, VertexInfo], sourceEdgeInfosMap: Map[String, EdgeInfo], spark: SparkSession): Unit = {
val source_prefix = sourceGraphInfo.getPrefix
val dest_prefix = destGraphInfo.getPrefix

// traverse edge infos of the destination graph
val dest_edges_it = destGraphInfo.getEdges.iterator
while (dest_edges_it.hasNext()) {
// load dest edge info
val path = dest_prefix + dest_edges_it.next()
val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark)
// load source edge info
val key = dest_edge_info.getSrc_label + GeneralParams.regularSeperator + dest_edge_info.getEdge_label + GeneralParams.regularSeperator + dest_edge_info.getDst_label
if (!sourceEdgeInfosMap.contains(key)) {
throw new IllegalArgumentException
}
val source_edge_info = sourceEdgeInfosMap(key)
var has_loaded = false
var df = spark.emptyDataFrame

// traverse all adjList types
val dest_adj_lists = dest_edge_info.getAdj_lists
val adj_list_it = dest_adj_lists.iterator
while (adj_list_it.hasNext()) {
val dest_adj_list_type = adj_list_it.next().getAdjList_type_in_gar

// load edge DataFrame from the source graph
if (!has_loaded) {
val source_adj_lists = source_edge_info.getAdj_lists
var source_adj_list_type = dest_adj_list_type
if (!source_edge_info.containAdjList(dest_adj_list_type))
if (source_adj_lists.size() > 0)
source_adj_list_type = source_adj_lists.get(0).getAdjList_type_in_gar
// read edge chunks from source graph
val reader = new EdgeReader(source_prefix, source_edge_info, source_adj_list_type, spark)
df = reader.readEdges(false)
has_loaded = true
}

// read vertices number
val vertex_label = {
if (dest_adj_list_type == AdjListType.ordered_by_source || dest_adj_list_type == AdjListType.unordered_by_source)
dest_edge_info.getSrc_label
else
dest_edge_info.getDst_label
}
if (!sourceVertexInfosMap.contains(vertex_label)) {
throw new IllegalArgumentException
}
val vertex_info = sourceVertexInfosMap(vertex_label)
val reader = new VertexReader(source_prefix, vertex_info, spark)
val vertex_num = reader.readVerticesNumber()

// write edge chunks for dest graph
val writer = new EdgeWriter(dest_prefix, dest_edge_info, dest_adj_list_type, vertex_num, df)
writer.writeEdges()
}
}
}

/** Transform the graphs following the meta data defined in graph info objects.
*
* @param sourceGraphInfo The info object for the source graph.
* @param destGraphInfo The info object for the destination graph.
* @param spark The Spark session for the transformer.
*/
def transform(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, spark: SparkSession): Unit = {
val source_prefix = sourceGraphInfo.getPrefix
val dest_prefix = destGraphInfo.getPrefix

// construct the (vertex label -> vertex info) map for the source graph
val source_vertex_infos_map = constructVertexInfoMap(source_prefix, sourceGraphInfo, spark)
// construct the (edge label -> edge info) map for the source graph
val source_edge_infos_map = constructEdgeInfoMap(source_prefix, sourceGraphInfo, spark)

// transform and generate vertex data chunks
transformAllVertices(sourceGraphInfo, destGraphInfo, source_vertex_infos_map, spark)

// transform and generate edge data chunks
transformAllEdges(sourceGraphInfo, destGraphInfo, source_vertex_infos_map, source_edge_infos_map, spark)
}

/** Transform the graphs following the meta data defined in info files.
*
* @param sourceGraphInfoPath The path of the graph info yaml file for the source graph.
* @param destGraphInfoPath The path of the graph info yaml file for the destination graph.
* @param spark The Spark session for the transformer.
*/
def transform(sourceGraphInfoPath: String, destGraphInfoPath: String, spark: SparkSession): Unit = {
// load source graph info
val source_graph_info = GraphInfo.loadGraphInfo(sourceGraphInfoPath, spark)

// load dest graph info
val dest_graph_info = GraphInfo.loadGraphInfo(destGraphInfoPath, spark)

// conduct transformation
transform(source_graph_info, dest_graph_info, spark)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import com.alibaba.graphar.GraphInfo
import com.alibaba.graphar.graph.GraphTransformer

import java.io.{File, FileInputStream}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.hadoop.fs.{Path, FileSystem}
import org.scalatest.funsuite.AnyFunSuite

class TestGraphTransformerSuite extends AnyFunSuite {
val spark = SparkSession.builder()
.enableHiveSupport()
.master("local[*]")
.getOrCreate()

test("transform graphs by yaml paths") {
// conduct transformation
val source_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath
val dest_path = getClass.getClassLoader.getResource("gar-test/transformer/ldbc_sample.graph.yml").getPath
GraphTransformer.transform(source_path, dest_path, spark)

val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark)
val prefix = dest_graph_info.getPrefix
val fs = FileSystem.get(new Path(prefix).toUri(), spark.sparkContext.hadoopConfiguration)

// validate vertex chunks
val vertex_chunk_path = new Path(prefix + "vertex/person/" + "*/*")
val vertex_chunk_files = fs.globStatus(vertex_chunk_path)
assert(vertex_chunk_files.length == 38)
// validate edge chunks
val adj_list_chunk_path = new Path(prefix + "edge/person_knows_person/unordered_by_dest/adj_list/" + "*/*")
val adj_list_chunk_files = fs.globStatus(adj_list_chunk_path)
assert(adj_list_chunk_files.length == 20)
val edge_chunk_path = new Path(prefix + "edge/person_knows_person/ordered_by_source/creationDate/" + "*/*")
val edge_chunk_files = fs.globStatus(edge_chunk_path)
assert(edge_chunk_files.length == 20)

// clean generated files and close FileSystem instance
fs.delete(new Path(prefix + "vertex"))
fs.delete(new Path(prefix + "edge"))
fs.close()
}

test("transform graphs by graph infos") {
// load source graph info
val source_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath
val source_graph_info = GraphInfo.loadGraphInfo(source_path, spark)

// load dest graph info
val dest_path = getClass.getClassLoader.getResource("gar-test/transformer/ldbc_sample.graph.yml").getPath
val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark)

// conduct transformation
GraphTransformer.transform(source_graph_info, dest_graph_info, spark)

val prefix = dest_graph_info.getPrefix
val fs = FileSystem.get(new Path(prefix).toUri(), spark.sparkContext.hadoopConfiguration)

// validate vertex chunks
val vertex_chunk_path = new Path(prefix + "vertex/person/" + "*/*")
val vertex_chunk_files = fs.globStatus(vertex_chunk_path)
assert(vertex_chunk_files.length == 38)
// validate edge chunks
val adj_list_chunk_path = new Path(prefix + "edge/person_knows_person/unordered_by_dest/adj_list/" + "*/*")
val adj_list_chunk_files = fs.globStatus(adj_list_chunk_path)
assert(adj_list_chunk_files.length == 20)
val edge_chunk_path = new Path(prefix + "edge/person_knows_person/ordered_by_source/creationDate/" + "*/*")
val edge_chunk_files = fs.globStatus(edge_chunk_path)
assert(edge_chunk_files.length == 20)

// clean generated files and close FileSystem instance
fs.delete(new Path(prefix + "vertex"))
fs.delete(new Path(prefix + "edge"))
fs.close()
}
}
2 changes: 1 addition & 1 deletion test/gar-test