From 8629b9a48615a0df01e7b030e2db4f483b30c842 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Mon, 27 Feb 2023 16:42:49 +0800 Subject: [PATCH 1/6] Implement GraphTransformer --- .../graphar/graph/GraphTransformer.scala | 165 ++++++++++++++++++ .../graphar/TestGraphTransformer.scala | 95 ++++++++++ 2 files changed, 260 insertions(+) create mode 100644 spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala create mode 100644 spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala new file mode 100644 index 000000000..54762a0a3 --- /dev/null +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala @@ -0,0 +1,165 @@ +/** Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphar.graph + +import com.alibaba.graphar.{GeneralParams, AdjListType, GraphInfo, VertexInfo, EdgeInfo} +import com.alibaba.graphar.reader.{VertexReader, EdgeReader} +import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter} +import com.alibaba.graphar.datasources._ + +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.functions._ +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor + +/** The helper object for transforming graphs through the definitions of their infos. */ +object GraphTransformer { + /** Construct the map of (vertex label -> VertexInfo) for a graph. */ + private def constructVertexInfoMap(prefix: String, graphInfo:GraphInfo, spark: SparkSession): Map[String, VertexInfo] = { + var vertex_infos_map: Map[String, VertexInfo] = Map() + val vertices_yaml = graphInfo.getVertices + val vertices_it = vertices_yaml.iterator + while (vertices_it.hasNext()) { + val file_name = vertices_it.next() + val path = prefix + file_name + val vertex_info = VertexInfo.loadVertexInfo(path, spark) + vertex_infos_map += (vertex_info.getLabel -> vertex_info) + } + return vertex_infos_map + } + + /** Construct the map of (edge label -> EdgeInfo) for a graph. */ + private def constructEdgeInfoMap(prefix: String, graphInfo:GraphInfo, spark: SparkSession): Map[String, EdgeInfo] = { + var edge_infos_map: Map[String, EdgeInfo] = Map() + val edges_yaml = graphInfo.getEdges + val edges_it = edges_yaml.iterator + while (edges_it.hasNext()) { + val file_name = edges_it.next() + val path = prefix + file_name + val edge_info = EdgeInfo.loadEdgeInfo(path, spark) + val key = edge_info.getSrc_label + GeneralParams.regularSeperator + edge_info.getEdge_label + GeneralParams.regularSeperator + edge_info.getDst_label + edge_infos_map += (key -> edge_info) + } + return edge_infos_map + } + + /** Transform the graphs following the meta defined in info files. + * + * @param sourceGraphInfoPath The path of the graph info yaml file for the source graph. + * @param destGraphInfoPath The path of the graph info yaml file for the destination graph. + * @param spark The Spark session for the transformer. + */ + def transform(sourceGraphInfoPath: String, destGraphInfoPath: String, spark: SparkSession): Unit = { + // load source graph info + val source_graph_info = GraphInfo.loadGraphInfo(sourceGraphInfoPath, spark) + + // load dest graph info + val dest_graph_info = GraphInfo.loadGraphInfo(destGraphInfoPath, spark) + + // conduct transforming + transform(source_graph_info, dest_graph_info, spark) + } + + /** Transform the graphs following the meta defined in graph info objects. + * + * @param sourceGraphInfo The info object for the source graph. + * @param destGraphInfo The info object for the destination graph. + * @param spark The Spark session for the transformer. + */ + def transform(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, spark: SparkSession): Unit = { + val source_prefix = sourceGraphInfo.getPrefix + val dest_prefix = destGraphInfo.getPrefix + + // construct the (vertex label -> vertex info) map for the source graph + val source_vertex_infos_map = constructVertexInfoMap(source_prefix, sourceGraphInfo, spark) + // construct the (edge label -> edge info) map for the source graph + val source_edge_infos_map = constructEdgeInfoMap(source_prefix, sourceGraphInfo, spark) + + // transform and generate vertex data chunks + // traverse vertex infos of the destination graph + val dest_vertices_it = destGraphInfo.getVertices.iterator + while (dest_vertices_it.hasNext()) { + // get dest edge info + val path = dest_prefix + dest_vertices_it.next() + val dest_vertex_info = VertexInfo.loadVertexInfo(path, spark) + // get source vertex info + val label = dest_vertex_info.getLabel() + if (!source_vertex_infos_map.contains(label)) { + throw new IllegalArgumentException + } + val source_vertex_info = source_vertex_infos_map(label) + // read vertex chunks from source graph + val reader = new VertexReader(source_prefix, source_vertex_info, spark) + val df = reader.readAllVertexPropertyGroups(true) + // write vertex chunks for dest graph + val writer = new VertexWriter(dest_prefix, dest_vertex_info, df) + writer.writeVertexProperties() + } + + // transform and generate edge data chunks + // traverse edge infos of the destination graph + val dest_edges_it = destGraphInfo.getEdges.iterator + while (dest_edges_it.hasNext()) { + // get dest edge info + val path = dest_prefix + dest_edges_it.next() + val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark) + // get source edge info + val key = dest_edge_info.getSrc_label + GeneralParams.regularSeperator + dest_edge_info.getEdge_label + GeneralParams.regularSeperator + dest_edge_info.getDst_label + if (!source_edge_infos_map.contains(key)) { + throw new IllegalArgumentException + } + val source_edge_info = source_edge_infos_map(key) + var has_loaded = false + var df = spark.emptyDataFrame + + // traverse all adjList types + val dest_adj_lists = dest_edge_info.getAdj_lists + val adj_list_it = dest_adj_lists.iterator + while (adj_list_it.hasNext()) { + val dest_adj_list_type = adj_list_it.next().getAdjList_type_in_gar + + // load edge DataFrame + if (!has_loaded) { + val source_adj_lists = source_edge_info.getAdj_lists + var source_adj_list_type = dest_adj_list_type + if (!source_edge_info.containAdjList(dest_adj_list_type) && source_adj_lists.size() > 0) + source_adj_list_type = source_adj_lists.get(0).getAdjList_type_in_gar + // read edge chunks from source graph + val reader = new EdgeReader(source_prefix, source_edge_info, source_adj_list_type, spark) + df = reader.readEdges(false) + has_loaded = true + } + + // read vertices number + val vertex_label = if (dest_adj_list_type == AdjListType.ordered_by_source || dest_adj_list_type == AdjListType.unordered_by_source) dest_edge_info.getSrc_label else dest_edge_info.getDst_label + if (!source_vertex_infos_map.contains(vertex_label)) { + throw new IllegalArgumentException + } + val vertex_info = source_vertex_infos_map(vertex_label) + val reader = new VertexReader(source_prefix, vertex_info, spark) + val vertex_num = reader.readVerticesNumber() + println(vertex_label) + println(vertex_num) + + // write edge chunks for dest graph + val writer = new EdgeWriter(dest_prefix, dest_edge_info, dest_adj_list_type, vertex_num, df) + writer.writeEdges() + } + } + } +} diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala new file mode 100644 index 000000000..617bc20b6 --- /dev/null +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala @@ -0,0 +1,95 @@ +/** Copyright 2022 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.graphar + +import com.alibaba.graphar.graph.GraphTransformer +import com.alibaba.graphar.GraphInfo + +import java.io.{File, FileInputStream} +import scala.beans.BeanProperty +import org.yaml.snakeyaml.Yaml +import org.yaml.snakeyaml.constructor.Constructor +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.hadoop.fs.{Path, FileSystem} +import org.scalatest.funsuite.AnyFunSuite + +class TestGraphTransformerSuite extends AnyFunSuite { + val spark = SparkSession.builder() + .enableHiveSupport() + .master("local[*]") + .getOrCreate() + + test("transform graphs by yaml paths") { + // conduct transformation + val source_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath + val dest_path = getClass.getClassLoader.getResource("gar-test/transform/ldbc_sample.graph.yml").getPath + GraphTransformer.transform(source_path, dest_path, spark) + + val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark) + val prefix = dest_graph_info.getPrefix + val fs = FileSystem.get(new Path(prefix).toUri(), spark.sparkContext.hadoopConfiguration) + + // validate vertex chunks + val vertex_chunk_path = new Path(prefix + "vertex/person/" + "*/*") + val vertex_chunk_files = fs.globStatus(vertex_chunk_path) + assert(vertex_chunk_files.length == 38) + // validate edge chunks + val adj_list_chunk_path = new Path(prefix + "edge/person_knows_person/unordered_by_dest/adj_list/" + "*/*") + val adj_list_chunk_files = fs.globStatus(adj_list_chunk_path) + assert(adj_list_chunk_files.length == 20) + val edge_chunk_path = new Path(prefix + "edge/person_knows_person/ordered_by_source/creationDate/" + "*/*") + val edge_chunk_files = fs.globStatus(edge_chunk_path) + assert(edge_chunk_files.length == 20) + + // clean generated files and close FileSystem instance + fs.delete(new Path(prefix + "vertex")) + fs.delete(new Path(prefix + "edge")) + fs.close() + } + + test("transform graphs by graph infos") { + // load source graph info + val source_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath + val source_graph_info = GraphInfo.loadGraphInfo(source_path, spark) + + // load dest graph info + val dest_path = getClass.getClassLoader.getResource("gar-test/transform/ldbc_sample.graph.yml").getPath + val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark) + + // conduct transforming + GraphTransformer.transform(source_graph_info, dest_graph_info, spark) + + val prefix = dest_graph_info.getPrefix + val fs = FileSystem.get(new Path(prefix).toUri(), spark.sparkContext.hadoopConfiguration) + + // validate vertex chunks + val vertex_chunk_path = new Path(prefix + "vertex/person/" + "*/*") + val vertex_chunk_files = fs.globStatus(vertex_chunk_path) + assert(vertex_chunk_files.length == 38) + // validate edge chunks + val adj_list_chunk_path = new Path(prefix + "edge/person_knows_person/unordered_by_dest/adj_list/" + "*/*") + val adj_list_chunk_files = fs.globStatus(adj_list_chunk_path) + assert(adj_list_chunk_files.length == 20) + val edge_chunk_path = new Path(prefix + "edge/person_knows_person/ordered_by_source/creationDate/" + "*/*") + val edge_chunk_files = fs.globStatus(edge_chunk_path) + assert(edge_chunk_files.length == 20) + + // clean generated files and close FileSystem instance + fs.delete(new Path(prefix + "vertex")) + fs.delete(new Path(prefix + "edge")) + fs.close() + } +} From a31b3abbded6a72d32c0df9b7a66059a2517a429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Tue, 28 Feb 2023 10:51:37 +0800 Subject: [PATCH 2/6] Update --- .../graphar/graph/GraphTransformer.scala | 42 +++++++++---------- .../graphar/TestGraphTransformer.scala | 7 +--- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala index 54762a0a3..5d07737f1 100644 --- a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala @@ -18,19 +18,15 @@ package com.alibaba.graphar.graph import com.alibaba.graphar.{GeneralParams, AdjListType, GraphInfo, VertexInfo, EdgeInfo} import com.alibaba.graphar.reader.{VertexReader, EdgeReader} import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter} -import com.alibaba.graphar.datasources._ -import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ -import org.yaml.snakeyaml.Yaml -import org.yaml.snakeyaml.constructor.Constructor /** The helper object for transforming graphs through the definitions of their infos. */ object GraphTransformer { /** Construct the map of (vertex label -> VertexInfo) for a graph. */ - private def constructVertexInfoMap(prefix: String, graphInfo:GraphInfo, spark: SparkSession): Map[String, VertexInfo] = { + private def constructVertexInfoMap(prefix: String, graphInfo: GraphInfo, spark: SparkSession): Map[String, VertexInfo] = { var vertex_infos_map: Map[String, VertexInfo] = Map() val vertices_yaml = graphInfo.getVertices val vertices_it = vertices_yaml.iterator @@ -44,7 +40,7 @@ object GraphTransformer { } /** Construct the map of (edge label -> EdgeInfo) for a graph. */ - private def constructEdgeInfoMap(prefix: String, graphInfo:GraphInfo, spark: SparkSession): Map[String, EdgeInfo] = { + private def constructEdgeInfoMap(prefix: String, graphInfo: GraphInfo, spark: SparkSession): Map[String, EdgeInfo] = { var edge_infos_map: Map[String, EdgeInfo] = Map() val edges_yaml = graphInfo.getEdges val edges_it = edges_yaml.iterator @@ -58,7 +54,7 @@ object GraphTransformer { return edge_infos_map } - /** Transform the graphs following the meta defined in info files. + /** Transform the graphs following the meta data defined in info files. * * @param sourceGraphInfoPath The path of the graph info yaml file for the source graph. * @param destGraphInfoPath The path of the graph info yaml file for the destination graph. @@ -71,11 +67,11 @@ object GraphTransformer { // load dest graph info val dest_graph_info = GraphInfo.loadGraphInfo(destGraphInfoPath, spark) - // conduct transforming + // conduct transformation transform(source_graph_info, dest_graph_info, spark) } - /** Transform the graphs following the meta defined in graph info objects. + /** Transform the graphs following the meta data defined in graph info objects. * * @param sourceGraphInfo The info object for the source graph. * @param destGraphInfo The info object for the destination graph. @@ -94,19 +90,19 @@ object GraphTransformer { // traverse vertex infos of the destination graph val dest_vertices_it = destGraphInfo.getVertices.iterator while (dest_vertices_it.hasNext()) { - // get dest edge info + // load dest edge info val path = dest_prefix + dest_vertices_it.next() val dest_vertex_info = VertexInfo.loadVertexInfo(path, spark) - // get source vertex info + // load source vertex info val label = dest_vertex_info.getLabel() if (!source_vertex_infos_map.contains(label)) { throw new IllegalArgumentException } val source_vertex_info = source_vertex_infos_map(label) - // read vertex chunks from source graph + // read vertex chunks from the source graph val reader = new VertexReader(source_prefix, source_vertex_info, spark) val df = reader.readAllVertexPropertyGroups(true) - // write vertex chunks for dest graph + // write vertex chunks for the dest graph val writer = new VertexWriter(dest_prefix, dest_vertex_info, df) writer.writeVertexProperties() } @@ -115,10 +111,10 @@ object GraphTransformer { // traverse edge infos of the destination graph val dest_edges_it = destGraphInfo.getEdges.iterator while (dest_edges_it.hasNext()) { - // get dest edge info + // load dest edge info val path = dest_prefix + dest_edges_it.next() val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark) - // get source edge info + // load source edge info val key = dest_edge_info.getSrc_label + GeneralParams.regularSeperator + dest_edge_info.getEdge_label + GeneralParams.regularSeperator + dest_edge_info.getDst_label if (!source_edge_infos_map.contains(key)) { throw new IllegalArgumentException @@ -133,12 +129,13 @@ object GraphTransformer { while (adj_list_it.hasNext()) { val dest_adj_list_type = adj_list_it.next().getAdjList_type_in_gar - // load edge DataFrame + // load edge DataFrame from the source graph if (!has_loaded) { val source_adj_lists = source_edge_info.getAdj_lists var source_adj_list_type = dest_adj_list_type - if (!source_edge_info.containAdjList(dest_adj_list_type) && source_adj_lists.size() > 0) - source_adj_list_type = source_adj_lists.get(0).getAdjList_type_in_gar + if (!source_edge_info.containAdjList(dest_adj_list_type)) + if (source_adj_lists.size() > 0) + source_adj_list_type = source_adj_lists.get(0).getAdjList_type_in_gar // read edge chunks from source graph val reader = new EdgeReader(source_prefix, source_edge_info, source_adj_list_type, spark) df = reader.readEdges(false) @@ -146,15 +143,18 @@ object GraphTransformer { } // read vertices number - val vertex_label = if (dest_adj_list_type == AdjListType.ordered_by_source || dest_adj_list_type == AdjListType.unordered_by_source) dest_edge_info.getSrc_label else dest_edge_info.getDst_label + val vertex_label = { + if (dest_adj_list_type == AdjListType.ordered_by_source || dest_adj_list_type == AdjListType.unordered_by_source) + dest_edge_info.getSrc_label + else + dest_edge_info.getDst_label + } if (!source_vertex_infos_map.contains(vertex_label)) { throw new IllegalArgumentException } val vertex_info = source_vertex_infos_map(vertex_label) val reader = new VertexReader(source_prefix, vertex_info, spark) val vertex_num = reader.readVerticesNumber() - println(vertex_label) - println(vertex_num) // write edge chunks for dest graph val writer = new EdgeWriter(dest_prefix, dest_edge_info, dest_adj_list_type, vertex_num, df) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala index 617bc20b6..a89a2344e 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala @@ -15,13 +15,10 @@ package com.alibaba.graphar -import com.alibaba.graphar.graph.GraphTransformer import com.alibaba.graphar.GraphInfo +import com.alibaba.graphar.graph.GraphTransformer import java.io.{File, FileInputStream} -import scala.beans.BeanProperty -import org.yaml.snakeyaml.Yaml -import org.yaml.snakeyaml.constructor.Constructor import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.hadoop.fs.{Path, FileSystem} import org.scalatest.funsuite.AnyFunSuite @@ -69,7 +66,7 @@ class TestGraphTransformerSuite extends AnyFunSuite { val dest_path = getClass.getClassLoader.getResource("gar-test/transform/ldbc_sample.graph.yml").getPath val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark) - // conduct transforming + // conduct transformation GraphTransformer.transform(source_graph_info, dest_graph_info, spark) val prefix = dest_graph_info.getPrefix From 998e0aeef3aa7faa2fd8ffa79843ecbef558fe85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Tue, 28 Feb 2023 11:10:51 +0800 Subject: [PATCH 3/6] Update --- .../graphar/graph/GraphTransformer.scala | 93 ++++++++++++------- 1 file changed, 61 insertions(+), 32 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala index 5d07737f1..bb2345ed2 100644 --- a/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphTransformer.scala @@ -54,39 +54,17 @@ object GraphTransformer { return edge_infos_map } - /** Transform the graphs following the meta data defined in info files. - * - * @param sourceGraphInfoPath The path of the graph info yaml file for the source graph. - * @param destGraphInfoPath The path of the graph info yaml file for the destination graph. - * @param spark The Spark session for the transformer. - */ - def transform(sourceGraphInfoPath: String, destGraphInfoPath: String, spark: SparkSession): Unit = { - // load source graph info - val source_graph_info = GraphInfo.loadGraphInfo(sourceGraphInfoPath, spark) - - // load dest graph info - val dest_graph_info = GraphInfo.loadGraphInfo(destGraphInfoPath, spark) - - // conduct transformation - transform(source_graph_info, dest_graph_info, spark) - } - - /** Transform the graphs following the meta data defined in graph info objects. + /** Transform the vertex chunks following the meta data defined in graph info objects. * * @param sourceGraphInfo The info object for the source graph. * @param destGraphInfo The info object for the destination graph. + * @param sourceVertexInfosMap The map of (vertex label -> VertexInfo) for the source graph. * @param spark The Spark session for the transformer. */ - def transform(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, spark: SparkSession): Unit = { + private def transformAllVertices(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, sourceVertexInfosMap: Map[String, VertexInfo], spark: SparkSession): Unit = { val source_prefix = sourceGraphInfo.getPrefix val dest_prefix = destGraphInfo.getPrefix - // construct the (vertex label -> vertex info) map for the source graph - val source_vertex_infos_map = constructVertexInfoMap(source_prefix, sourceGraphInfo, spark) - // construct the (edge label -> edge info) map for the source graph - val source_edge_infos_map = constructEdgeInfoMap(source_prefix, sourceGraphInfo, spark) - - // transform and generate vertex data chunks // traverse vertex infos of the destination graph val dest_vertices_it = destGraphInfo.getVertices.iterator while (dest_vertices_it.hasNext()) { @@ -95,10 +73,10 @@ object GraphTransformer { val dest_vertex_info = VertexInfo.loadVertexInfo(path, spark) // load source vertex info val label = dest_vertex_info.getLabel() - if (!source_vertex_infos_map.contains(label)) { + if (!sourceVertexInfosMap.contains(label)) { throw new IllegalArgumentException } - val source_vertex_info = source_vertex_infos_map(label) + val source_vertex_info = sourceVertexInfosMap(label) // read vertex chunks from the source graph val reader = new VertexReader(source_prefix, source_vertex_info, spark) val df = reader.readAllVertexPropertyGroups(true) @@ -106,8 +84,20 @@ object GraphTransformer { val writer = new VertexWriter(dest_prefix, dest_vertex_info, df) writer.writeVertexProperties() } + } + + /** Transform the edge chunks following the meta data defined in graph info objects. + * + * @param sourceGraphInfo The info object for the source graph. + * @param destGraphInfo The info object for the destination graph. + * @param sourceVertexInfosMap The map of (vertex label -> VertexInfo) for the source graph. + * @param sourceEdgeInfosMap The map of (edge label -> EdgeInfo) for the source graph. + * @param spark The Spark session for the transformer. + */ + private def transformAllEdges(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, sourceVertexInfosMap: Map[String, VertexInfo], sourceEdgeInfosMap: Map[String, EdgeInfo], spark: SparkSession): Unit = { + val source_prefix = sourceGraphInfo.getPrefix + val dest_prefix = destGraphInfo.getPrefix - // transform and generate edge data chunks // traverse edge infos of the destination graph val dest_edges_it = destGraphInfo.getEdges.iterator while (dest_edges_it.hasNext()) { @@ -116,10 +106,10 @@ object GraphTransformer { val dest_edge_info = EdgeInfo.loadEdgeInfo(path, spark) // load source edge info val key = dest_edge_info.getSrc_label + GeneralParams.regularSeperator + dest_edge_info.getEdge_label + GeneralParams.regularSeperator + dest_edge_info.getDst_label - if (!source_edge_infos_map.contains(key)) { + if (!sourceEdgeInfosMap.contains(key)) { throw new IllegalArgumentException } - val source_edge_info = source_edge_infos_map(key) + val source_edge_info = sourceEdgeInfosMap(key) var has_loaded = false var df = spark.emptyDataFrame @@ -149,10 +139,10 @@ object GraphTransformer { else dest_edge_info.getDst_label } - if (!source_vertex_infos_map.contains(vertex_label)) { + if (!sourceVertexInfosMap.contains(vertex_label)) { throw new IllegalArgumentException } - val vertex_info = source_vertex_infos_map(vertex_label) + val vertex_info = sourceVertexInfosMap(vertex_label) val reader = new VertexReader(source_prefix, vertex_info, spark) val vertex_num = reader.readVerticesNumber() @@ -162,4 +152,43 @@ object GraphTransformer { } } } + + /** Transform the graphs following the meta data defined in graph info objects. + * + * @param sourceGraphInfo The info object for the source graph. + * @param destGraphInfo The info object for the destination graph. + * @param spark The Spark session for the transformer. + */ + def transform(sourceGraphInfo: GraphInfo, destGraphInfo: GraphInfo, spark: SparkSession): Unit = { + val source_prefix = sourceGraphInfo.getPrefix + val dest_prefix = destGraphInfo.getPrefix + + // construct the (vertex label -> vertex info) map for the source graph + val source_vertex_infos_map = constructVertexInfoMap(source_prefix, sourceGraphInfo, spark) + // construct the (edge label -> edge info) map for the source graph + val source_edge_infos_map = constructEdgeInfoMap(source_prefix, sourceGraphInfo, spark) + + // transform and generate vertex data chunks + transformAllVertices(sourceGraphInfo, destGraphInfo, source_vertex_infos_map, spark) + + // transform and generate edge data chunks + transformAllEdges(sourceGraphInfo, destGraphInfo, source_vertex_infos_map, source_edge_infos_map, spark) + } + + /** Transform the graphs following the meta data defined in info files. + * + * @param sourceGraphInfoPath The path of the graph info yaml file for the source graph. + * @param destGraphInfoPath The path of the graph info yaml file for the destination graph. + * @param spark The Spark session for the transformer. + */ + def transform(sourceGraphInfoPath: String, destGraphInfoPath: String, spark: SparkSession): Unit = { + // load source graph info + val source_graph_info = GraphInfo.loadGraphInfo(sourceGraphInfoPath, spark) + + // load dest graph info + val dest_graph_info = GraphInfo.loadGraphInfo(destGraphInfoPath, spark) + + // conduct transformation + transform(source_graph_info, dest_graph_info, spark) + } } From b5e0b9ad9347246f582b5284c6043d756f5c2dcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Tue, 28 Feb 2023 11:30:39 +0800 Subject: [PATCH 4/6] Update --- .../test/scala/com/alibaba/graphar/TestGraphTransformer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala index a89a2344e..5a32d1ccb 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala @@ -32,7 +32,7 @@ class TestGraphTransformerSuite extends AnyFunSuite { test("transform graphs by yaml paths") { // conduct transformation val source_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath - val dest_path = getClass.getClassLoader.getResource("gar-test/transform/ldbc_sample.graph.yml").getPath + val dest_path = getClass.getClassLoader.getResource("gar-test/transformer/ldbc_sample.graph.yml").getPath GraphTransformer.transform(source_path, dest_path, spark) val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark) @@ -63,7 +63,7 @@ class TestGraphTransformerSuite extends AnyFunSuite { val source_graph_info = GraphInfo.loadGraphInfo(source_path, spark) // load dest graph info - val dest_path = getClass.getClassLoader.getResource("gar-test/transform/ldbc_sample.graph.yml").getPath + val dest_path = getClass.getClassLoader.getResource("gar-test/transformer/ldbc_sample.graph.yml").getPath val dest_graph_info = GraphInfo.loadGraphInfo(dest_path, spark) // conduct transformation From 1a12dedc2c70f0932962bab2486b42c153fb6cec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Tue, 28 Feb 2023 11:47:53 +0800 Subject: [PATCH 5/6] Update submodule --- test/gar-test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/gar-test b/test/gar-test index e035d7b70..ec57df504 160000 --- a/test/gar-test +++ b/test/gar-test @@ -1 +1 @@ -Subproject commit e035d7b70cc5b2a00d3b0a90ffc0d4317a91de73 +Subproject commit ec57df504cbed19e98dc1ffbae056cc0bb36475a From 5ee8447c1bd66eac63a4b17c31fa6f2622c1f8e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=89=E7=90=86?= Date: Tue, 28 Feb 2023 16:24:18 +0800 Subject: [PATCH 6/6] Update documentation --- docs/user-guide/spark-lib.rst | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/docs/user-guide/spark-lib.rst b/docs/user-guide/spark-lib.rst index c5dd15441..0703dda40 100644 --- a/docs/user-guide/spark-lib.rst +++ b/docs/user-guide/spark-lib.rst @@ -170,6 +170,38 @@ To utilize the GAR Spark reader, please refer to the following example code. See `TestReader.scala`_ for the complete example. + +Graph Transformer +`````````````````` +The Graph Transformer is a helper object in the GraphAr Spark library, designed to assist with data transformation at the graph level. It takes two GraphInfo objects (or paths of two yaml files) as inputs: one for the source graph, and one for the destination graph. The transformer will then load data from existing GAR files for the source graph, utilizing the GraphAr Spark Reader and the meta data defined in the source GraphInfo. After reorganizing the data according to the destination GraphInfo, it generates new GAR chunk files with the GraphAr Spark Writer. + +.. code:: scala + + // transform graphs by yaml paths + val spark = ... // the Spark session + val source_path = ... // e.g., /tmp/source.graph.yml + val dest_path = ... // e.g., /tmp/dest.graph.yml + GraphTransformer.transform(source_path, dest_path, spark) + + // transform graphs by information objects + val source_info = ... + val dest_info = ... + GraphTransformer.transform(source_info, dest_info, spark) + + +We provide an example in `TestGraphTransformer.scala`_, which demonstrates how to conduct data transformation from the `source graph `_ to the `destination graph `_. + +The Graph Transformer can be used for various purposes, including transforming GAR data between different file types (e.g. from ORC to Parquet), transforming between different adjList types (e.g. from COO to CSR), selecting properties or regrouping them, and setting a new chunk size. + +.. note:: + There are certain limitations while using the Graph Transformer: + + - The vertices (or edges) of the source and destination graphs are aligned by labels, meaning each vertex/edge label included in the destination graph must have an equivalent in the source graph, in order for the related chunks to be loaded as the data source. + - For each group of vertices/edges (i.e., each single label), each property included in the destination graph (defined in the relevant VertexInfo/EdgeInfo) must also be present in the source graph. + + In addition, users can use the GraphAr Spark Reader/Writer to conduct data transformation more flexibly at the vertex/edge table level, as opposed to the graph level. This allows for a more granular approach to transforming data, as `TransformExample.scala`_ shows. + + More examples `````````````````` For more information on usage, please refer to the examples: @@ -186,6 +218,8 @@ For more information on usage, please refer to the examples: .. _TestReader.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/TestReader.scala +.. _TestGraphTransformer.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/TestGraphTransformer.scala + .. _ComputeExample.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala .. _TransformExample.scala: https://github.com/alibaba/GraphAr/blob/main/spark/src/test/scala/com/alibaba/graphar/TransformExample.scala