diff --git a/spark/pom.xml b/spark/pom.xml index 375d4250a..b4eb28b6d 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -15,10 +15,11 @@ 2.12 512m 1024m - 3.1.1 + 3.2.0 8 1.8 1.8 + 3.3.14-SNAPSHOT @@ -68,6 +69,34 @@ snakeyaml 1.26 + + com.aliyun.odps + hadoop-fs-oss + ${cupid.sdk.version} + + + org.apache.hadoop + hadoop-common + + + + + com.aliyun.odps + odps-spark-datasource_${scala.binary.version} + ${cupid.sdk.version} + + + net.jpountz.lz4 + lz4 + + + + + com.aliyun.odps + cupid-sdk + ${cupid.sdk.version} + provided + @@ -119,6 +148,47 @@ + + org.apache.maven.plugins + maven-shade-plugin + 2.1 + + + package + + shade + + + false + true + + + + *:* + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + **/log4j.properties + + + + + + reference.conf + + + + + + jar diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala index 842d126a5..2edcb0faf 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala @@ -1,5 +1,6 @@ package com.alibaba.graphar.utils +import java.net.URI import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrame import org.apache.hadoop.fs @@ -7,7 +8,7 @@ import org.apache.hadoop.fs object FileSystem { private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = { val sc = spark.sparkContext - val file_system = fs.FileSystem.get(spark.sparkContext.hadoopConfiguration) + val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration) val path_pattern = new fs.Path(filePrefix + "part*") val files = file_system.globStatus(path_pattern) for (i <- 0 until files.length) { diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala index cbb0c574a..bc669ecfc 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala @@ -80,11 +80,11 @@ object IndexGenerator { def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = { val spark = edgeDf.sparkSession dstIndexMapping.createOrReplaceTempView("dst_vertex") - edgeDf.createOrReplaceTempView("edge") + edgeDf.createOrReplaceTempView("edges") val dstCol = GeneralParams.dstIndexCol; val indexCol = GeneralParams.vertexIndexCol; val dstPrimaryKey = GeneralParams.primaryCol; - val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s") + val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edges.* from edges inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edges.$dstColumnName%s") // drop the old dst id col trans_df.drop(dstColumnName) } diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala index d27907ed1..6d9dc4852 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala @@ -35,8 +35,7 @@ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) def writeVertexProperties(propertyGroup: PropertyGroup): Unit = { if (chunks.isEmpty) { - val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertexDf) - chunks = VertexWriter.repartitionAndSort(vertex_df_with_index, vertexInfo.getChunk_size()) + chunks = VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size()) } // write out the chunks diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index d1e4db582..fd2352a51 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -1,17 +1,25 @@ package com.alibaba.graphar -import java.io.{File, FileInputStream} import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import scala.beans.BeanProperty import org.scalatest.funsuite.AnyFunSuite +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.sql.SparkSession class GraphInfoSuite extends AnyFunSuite { + val spark = SparkSession.builder() + .enableHiveSupport() + .master("local[*]") + .getOrCreate() test("load graph info") { - val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml") - val yaml = new Yaml(new Constructor(classOf[GraphInfo])) - val graph_info = yaml.load(input).asInstanceOf[GraphInfo] + // read graph yaml + val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) + val input = fs.open(yaml_path) + val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) + val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo] assert(graph_info.getName == "ldbc_sample") assert(graph_info.getPrefix == "" ) @@ -27,7 +35,9 @@ class GraphInfoSuite extends AnyFunSuite { } test("load vertex info") { - val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml") + val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) + val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) + val input = fs.open(yaml_path) val yaml = new Yaml(new Constructor(classOf[VertexInfo])) val vertex_info = yaml.load(input).asInstanceOf[VertexInfo] @@ -74,7 +84,9 @@ class GraphInfoSuite extends AnyFunSuite { } test("load edge info") { - val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml") + val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) + val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) + val input = fs.open(yaml_path) val yaml = new Yaml(new Constructor(classOf[EdgeInfo])) val edge_info = yaml.load(input).asInstanceOf[EdgeInfo] diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index 0b0d69443..bd3fd525c 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -7,6 +7,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.funsuite.AnyFunSuite import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor +import org.apache.hadoop.fs.{Path, FileSystem} class WriterSuite extends AnyFunSuite { val spark = SparkSession.builder() @@ -14,37 +15,104 @@ class WriterSuite extends AnyFunSuite { .master("local[*]") .getOrCreate() - test("test vertex writer") { + test("test vertex writer with only vertex table") { + // read vertex dataframe val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) - val graph_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml") + // read graph yaml + val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) + val graph_input = fs.open(graph_yaml_path) val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo] - val vertex_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml") + // read vertex yaml + val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) + val vertex_input = fs.open(vertex_yaml_path) val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] - val writer = new VertexWriter(graph_info.getPrefix(), vertex_info, vertex_df) + // generate vertex index column for vertex dataframe + val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df) + + // create writer object for person and generate the properties with GAR format + val writer = new VertexWriter(graph_info.getPrefix(), vertex_info, vertex_df_with_index) writer.writeVertexProperties() + + // close FileSystem instance + fs.close() } - test("test edge writer") { + test("test edge writer with only edge table") { + // read edge dataframe val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) - val graph_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml") + // read graph yaml + val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) + val graph_input = fs.open(graph_yaml_path) val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo] - val edge_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml") + // read edge yaml + val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) + val edge_input = fs.open(edge_yaml_path) val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + // generate vertex index for edge dataframe val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst") + + // create writer object for person_knows_person and generate the adj list and properties with GAR format val writer = new EdgeWriter(graph_info.getPrefix(), edge_info, AdjListType.ordered_by_source, edge_df_with_index) writer.writeEdges() + // close FileSystem instance + fs.close() + } + + test("test edge writer with vertex table and edge table") { + // read vertex dataframe + val vertex_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath + val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(vertex_file_path) + + // read edge dataframe + val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath + val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) + + + // read graph yaml + val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) + val graph_input = fs.open(graph_yaml_path) + val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) + val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo] + + // read vertex yaml + val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) + val vertex_input = fs.open(vertex_yaml_path) + val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) + val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] + + // read edge yaml + val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) + val edge_input = fs.open(edge_yaml_path) + val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) + val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] + + // construct person vertex mapping with dataframe + val vertex_mapping = IndexGenerator.constructVertexIndexMapping(vertex_df, vertex_info.getPrimaryKey()) + // generate src index and dst index for edge datafram with vertex mapping + val edge_df_with_src_index = IndexGenerator.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping) + val edge_df_with_src_dst_index = IndexGenerator.generateDstIndexForEdgesFromMapping(edge_df_with_src_index, "dst", vertex_mapping) + + // create writer object for person_knows_person and generate the adj list and properties with GAR format + val writer = new EdgeWriter(graph_info.getPrefix(), edge_info, AdjListType.ordered_by_source, edge_df_with_src_dst_index) + writer.writeEdges() + + // close FileSystem instance + fs.close() } }