diff --git a/docs/applications/spark.rst b/docs/applications/spark.rst index dc8446eaa..9b983824c 100644 --- a/docs/applications/spark.rst +++ b/docs/applications/spark.rst @@ -170,7 +170,7 @@ The `putVertexDataIntoNeo4j` and `putEdgeDataIntoNeo4j` methods creates or updat val sourceDf = vertexData(sourceLabel) val targetDf = vertexData(targetLabel) // convert the source and target index column to the primary key column - val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first dataframe of (adj_list_type_str, dataframe) map + val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first DataFrame of (adj_list_type_str, DataFrame) map val properties = if (edgeLabel == "REVIEWED") "rating,summary" else "" diff --git a/docs/user-guide/spark-lib.rst b/docs/user-guide/spark-lib.rst index 4823e228c..5d463730a 100644 --- a/docs/user-guide/spark-lib.rst +++ b/docs/user-guide/spark-lib.rst @@ -132,7 +132,7 @@ To utilize the GAR Spark writer, please refer to the following example code. .. code:: scala - // generate the vertex index column for vertex dataframe + // generate the vertex index column for vertex DataFrame val vertex_df = ... val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df) // construct the vertex writer @@ -145,7 +145,7 @@ To utilize the GAR Spark writer, please refer to the following example code. // write all properties writer.writeVertexProperties() - // generate vertex index for edge dataframe + // generate vertex index for edge DataFrame val edge_df = ... val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst") // construct the edge writer diff --git a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala index 7138d19bc..33b6c70d5 100644 --- a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala +++ b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Nebula.scala @@ -97,7 +97,7 @@ object GraphAr2Nebula { targetDF, sourcePrimaryKey, targetPrimaryKey - ) // use the first dataframe of (adj_list_type_str, dataframe) map + ) // use the first DataFrame of (adj_list_type_str, DataFrame) map writeEdge(edgeType, "src", "dst", "_rank", df) } diff --git a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala index 7bade18e7..8e21fa6cc 100644 --- a/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala +++ b/spark/src/main/scala/com/alibaba/graphar/example/GraphAr2Neo4j.scala @@ -103,7 +103,7 @@ object GraphAr2Neo4j { targetDf, sourcePrimaryKey, targetPrimaryKey - ) // use the first dataframe of (adj_list_type_str, dataframe) map + ) // use the first DataFrame of (adj_list_type_str, DataFrame) map // FIXME: use properties message in edge info val properties = if (edgeLabel == "REVIEWED") "rating,summary" else "" diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala index 8702039a2..d10e1a181 100644 --- a/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala @@ -112,8 +112,8 @@ object GraphReader { * @param addVertexIndex * Whether to add index for the vertex DataFrames. * @return - * Pair of vertex dataframes and edge dataframes, the vertex dataframes are - * stored as the map of (vertex_label -> DataFrame) the edge dataframes are + * Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are + * stored as the map of (vertex_label -> DataFrame) the edge DataFrames are * stored as a map of ((srcLabel, edgeLabel, dstLabel) -> (adj_list_type_str * -> DataFrame)) */ @@ -145,8 +145,8 @@ object GraphReader { * @param addVertexIndex * Whether to add index for the vertex DataFrames. * @return - * Pair of vertex dataframes and edge dataframes, the vertex dataframes are - * stored as the map of (vertex_label -> DataFrame) the edge dataframes are + * Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are + * stored as the map of (vertex_label -> DataFrame) the edge DataFrames are * stored as a map of (srcLabel_edgeLabel_dstLabel -> (adj_list_type_str -> * DataFrame)) */ diff --git a/spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala b/spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala index 0f5a99661..39490c581 100644 --- a/spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala @@ -31,12 +31,12 @@ import java.io.{BufferedWriter, OutputStreamWriter} class GraphWriter() { /** - * Put the vertex dataframe into writer. + * Put the vertex DataFrame into writer. * * @param label * label of vertex. * @param df - * dataframe of the vertex type. + * DataFrame of the vertex type. * @param primaryKey * primary key of the vertex type, default is empty, which take the first * property column as primary key. diff --git a/spark/src/main/scala/com/alibaba/graphar/util/DataFrameConcat.scala b/spark/src/main/scala/com/alibaba/graphar/util/DataFrameConcat.scala index a093eff09..6f6953926 100644 --- a/spark/src/main/scala/com/alibaba/graphar/util/DataFrameConcat.scala +++ b/spark/src/main/scala/com/alibaba/graphar/util/DataFrameConcat.scala @@ -32,7 +32,7 @@ object DataFrameConcat { * @param df2 * The second DataFrame. * @return - * The result DataFrame that concats the two DataFrames. + * The result DataFrame that concatenate the two DataFrames. */ def concat(df1: DataFrame, df2: DataFrame): DataFrame = { val spark = df1.sparkSession diff --git a/spark/src/main/scala/com/alibaba/graphar/util/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/util/FileSystem.scala index 0575e104f..96afcfa01 100644 --- a/spark/src/main/scala/com/alibaba/graphar/util/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/util/FileSystem.scala @@ -25,20 +25,24 @@ import org.apache.hadoop.conf.Configuration import com.alibaba.graphar.GeneralParams -/** Helper object to write dataframe to chunk files */ +/** Helper object to write DataFrame to chunk files */ object FileSystem { /** - * Write input dataframe to output path with certain file format. + * Write input DataFrame to output path with certain file format. * - * @param dataframe + * @param dataFrame * DataFrame to write out. * @param fileType * output file format type, the value could be csv|parquet|orc. * @param outputPrefix * output path prefix. - * @param startChunkIndex - * the start index of chunk. + * @param offsetStartChunkIndex[Optional] + * the start index of offset chunk, if not empty, that means writing a + * offset DataFrame. + * @param aggNumListOfEdgeChunk[Optional] + * the aggregated number list of edge chunk, if not empty, that means + * writing a edge DataFrame. */ def writeDataFrame( dataFrame: DataFrame, @@ -61,7 +65,7 @@ object FileSystem { } fs.close() - // write offset chunks dataframe + // write offset chunks DataFrame if (!offsetStartChunkIndex.isEmpty) { return dataFrame.write .mode("append") @@ -74,7 +78,7 @@ object FileSystem { .format("com.alibaba.graphar.datasources.GarDataSource") .save(outputPrefix) } - // write edge chunks dataframe + // write edge chunks DataFrame if (!aggNumListOfEdgeChunk.isEmpty) { implicit val formats = DefaultFormats // initialize a default formats for json4s @@ -89,7 +93,7 @@ object FileSystem { .format("com.alibaba.graphar.datasources.GarDataSource") .save(outputPrefix) } - // write vertex chunks dataframe + // write vertex chunks DataFrame dataFrame.write .mode("append") .option("header", "true") @@ -98,6 +102,16 @@ object FileSystem { .save(outputPrefix) } + /** + * Write input value to output path. + * + * @param value + * Value to write out. + * @param outputPrefix + * output path prefix. + * @param hadoopConfig + * hadoop configuration. + */ def writeValue( value: Long, outputPath: String, @@ -112,6 +126,16 @@ object FileSystem { fs.close() } + /** + * Read a value from input path. + * + * @param inputPath + * Input path. + * @param hadoopConfig + * hadoop configuration. + * @return + * The value read from input path. + */ def readValue(inputPath: String, hadoopConfig: Configuration): Long = { val path = new Path(inputPath) val fs = path.getFileSystem(hadoopConfig) diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index 941de1858..ac4a11f4d 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -126,13 +126,13 @@ object EdgeWriter { edgeChunkSize.toInt ) - // repartition edge dataframe and sort within partitions + // repartition edge DataFrame and sort within partitions val partitionRDD = rddWithEid.repartitionAndSortWithinPartitions(partitioner).values val partitionEdgeDf = spark.createDataFrame(partitionRDD, edgeSchema) partitionEdgeDf.cache() - // generate offset dataframes + // generate offset DataFrames if ( adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.ordered_by_dest ) { @@ -190,10 +190,10 @@ object EdgeWriter { } /** - * Writer for edge dataframe. + * Writer for edge DataFrame. * * @constructor - * create a new writer for edge dataframe with edge info. + * create a new writer for edge DataFrame with edge info. * @param prefix * the absolute prefix. * @param edgeInfo @@ -288,7 +288,7 @@ class EdgeWriter( } } - /** Generate the chunks of AdjList from edge dataframe for this edge type. */ + /** Generate the chunks of AdjList from edge DataFrame for this edge type. */ def writeAdjList(): Unit = { val fileType = edgeInfo.getAdjListFileType(adjListType) val outputPrefix = prefix + edgeInfo.getAdjListPathPrefix(adjListType) @@ -312,7 +312,7 @@ class EdgeWriter( } /** - * Generate the chunks of the property group from edge dataframe. + * Generate the chunks of the property group from edge DataFrame. * * @param propertyGroup * property group @@ -340,7 +340,7 @@ class EdgeWriter( ) } - /** Generate the chunks of all property groups from edge dataframe. */ + /** Generate the chunks of all property groups from edge DataFrame. */ def writeEdgeProperties(): Unit = { val property_groups = edgeInfo.getPropertyGroups(adjListType) val it = property_groups.iterator @@ -352,7 +352,7 @@ class EdgeWriter( /** * Generate the chunks for the AdjList and all property groups from edge - * dataframe. + * DataFrame. */ def writeEdges(): Unit = { writeAdjList() diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala index a993433d2..81cb7b11a 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala @@ -78,7 +78,7 @@ class VertexWriter( ) private def validate(): Unit = { - // check if vertex dataframe contains the index_filed + // check if vertex DataFrame contains the index_filed val index_filed = StructField(GeneralParams.vertexIndexCol, LongType) if (vertexDf.schema.contains(index_filed) == false) { throw new IllegalArgumentException @@ -95,7 +95,7 @@ class VertexWriter( } /** - * Generate chunks of the property group for vertex dataframe. + * Generate chunks of the property group for vertex DataFrame. * * @param propertyGroup * property group @@ -124,7 +124,7 @@ class VertexWriter( ) } - /** Generate chunks of all property groups for vertex dataframe. */ + /** Generate chunks of all property groups for vertex DataFrame. */ def writeVertexProperties(): Unit = { val property_groups = vertexInfo.getProperty_groups() val it = property_groups.iterator diff --git a/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala b/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala index 1deb67075..bd261029e 100644 --- a/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala +++ b/spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala @@ -30,7 +30,7 @@ class ComputeExampleSuite extends AnyFunSuite { .getOrCreate() test("run cc using graphx") { - // read vertex dataframe + // read vertex DataFrame val file_path = "gar-test/ldbc_sample/parquet/" val prefix = getClass.getClassLoader.getResource(file_path).getPath val vertex_yaml = getClass.getClassLoader @@ -45,7 +45,7 @@ class ComputeExampleSuite extends AnyFunSuite { assert(vertex_df.columns.size == 5) assert(vertex_df.count() == vertices_num) - // read edge dataframe + // read edge DataFrame val edge_yaml = getClass.getClassLoader .getResource(file_path + "person_knows_person.edge.yml") .getPath diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index e10162384..b9aed1f76 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -31,7 +31,7 @@ class WriterSuite extends AnyFunSuite { .getOrCreate() test("test vertex writer with only vertex table") { - // read vertex dataframe + // read vertex DataFrame val file_path = getClass.getClassLoader .getResource("gar-test/ldbc_sample/person_0_0.csv") .getPath @@ -50,7 +50,7 @@ class WriterSuite extends AnyFunSuite { .getPath val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark) - // generate vertex index column for vertex dataframe + // generate vertex index column for vertex DataFrame val vertex_df_with_index = util.IndexGenerator.generateVertexIndexColumn(vertex_df) @@ -90,7 +90,7 @@ class WriterSuite extends AnyFunSuite { } test("test edge writer with only edge table") { - // read edge dataframe + // read edge DataFrame val file_path = getClass.getClassLoader .getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv") .getPath @@ -111,7 +111,7 @@ class WriterSuite extends AnyFunSuite { val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, spark) val adj_list_type = AdjListType.ordered_by_source - // generate vertex index for edge dataframe + // generate vertex index for edge DataFrame val srcDf = edge_df.select("src").withColumnRenamed("src", "vertex") val dstDf = edge_df.select("dst").withColumnRenamed("dst", "vertex") val vertex_num = srcDf.union(dstDf).distinct().count() @@ -189,7 +189,7 @@ class WriterSuite extends AnyFunSuite { assertThrows[IllegalArgumentException]( writer.writeEdgeProperties(invalid_property_group) ) - // throw exception if not generate src index and dst index for edge dataframe + // throw exception if not generate src index and dst index for edge DataFrame assertThrows[IllegalArgumentException]( new EdgeWriter( prefix, @@ -217,7 +217,7 @@ class WriterSuite extends AnyFunSuite { } test("test edge writer with vertex table and edge table") { - // read vertex dataframe + // read vertex DataFrame val vertex_file_path = getClass.getClassLoader .getResource("gar-test/ldbc_sample/person_0_0.csv") .getPath @@ -227,7 +227,7 @@ class WriterSuite extends AnyFunSuite { .csv(vertex_file_path) val vertex_num = vertex_df.count() - // read edge dataframe + // read edge DataFrame val file_path = getClass.getClassLoader .getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv") .getPath @@ -258,12 +258,12 @@ class WriterSuite extends AnyFunSuite { val vertex_chunk_num = (vertex_num + vertex_chunk_size - 1) / vertex_chunk_size - // construct person vertex mapping with dataframe + // construct person vertex mapping with DataFrame val vertex_mapping = util.IndexGenerator.constructVertexIndexMapping( vertex_df, vertex_info.getPrimaryKey() ) - // generate src index and dst index for edge datafram with vertex mapping + // generate src index and dst index for edge DataFrame with vertex mapping val edge_df_with_src_index = util.IndexGenerator .generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping) val edge_df_with_src_dst_index =