From 06ef57543a5e0ed7b419f3f6dbcdb05d24a2a364 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 5 Jan 2023 14:16:14 +0800 Subject: [PATCH 1/4] Fix offset generate bug and write out path --- .../alibaba/graphar/utils/FileSystem.scala | 21 +++++++---- .../alibaba/graphar/writer/EdgeWriter.scala | 37 ++++++++++++++----- .../com/alibaba/graphar/TestWriter.scala | 8 ++++ 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala index 45d46961b..568940a9d 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala @@ -22,15 +22,20 @@ import org.apache.hadoop.fs /** Helper object to write dataframe to chunk files */ object FileSystem { - private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = { + private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String, startChunkIndex: Int): Unit = { val sc = spark.sparkContext val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration) val path_pattern = new fs.Path(filePrefix + "part*") val files = file_system.globStatus(path_pattern) for (i <- 0 until files.length) { val file_name = files(i).getPath.getName - val new_file_name = "chunk" + i.toString - file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name)) + val new_file_name = "chunk" + (i + startChunkIndex).toString + val destPath = new fs.Path(filePrefix + new_file_name) + if (file_system.isFile(destPath)) { + // if chunk file already exists, overwrite it + file_system.delete(destPath) + } + file_system.rename(new fs.Path(filePrefix + file_name), destPath) } } @@ -39,13 +44,15 @@ object FileSystem { * @param dataframe DataFrame to write out. * @param fileType output file format type, the value could be csv|parquet|orc. * @param outputPrefix output path prefix. + * @param startChunkIndex the start index of chunk. + * + * raise FileAlreadyExistsException if the chunk file already exists. */ - def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = { + def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String, startChunkIndex: Int = 0): Unit = { val spark = dataFrame.sparkSession spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") spark.conf.set("parquet.enable.summary-metadata", "false") - // spark.conf.set("spark.sql.parquet.compression.codec", "zstd") - dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix) - renameSparkGeneratedFiles(spark, outputPrefix) + dataFrame.write.mode("append").format(fileType).save(outputPrefix) + renameSparkGeneratedFiles(spark, outputPrefix, startChunkIndex) } } \ No newline at end of file diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index b14f2f20c..c7711ba0b 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -21,7 +21,7 @@ import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, Prop import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.{LongType, StructField} +import org.apache.spark.sql.types.{IntegerType, LongType, StructType, StructField} import org.apache.spark.util.Utils import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ @@ -148,19 +148,36 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V } } - // generate the Offset chunks files from edge dataframe for this edge type + // generate the offset chunks files from edge dataframe for this edge type private def writeOffset(): Unit = { + val spark = edgeDf.sparkSession val file_type = edgeInfo.getAdjListFileType(adjListType) var chunk_index: Long = 0 + val offset_schema = StructType(Seq(StructField(GeneralParams.offsetCol, LongType))) + val vertex_chunk_size = if (adjListType == AdjListType.ordered_by_source) edgeInfo.getSrc_chunk_size() else edgeInfo.getDst_chunk_size() + val index_column = if (adjListType == AdjListType.ordered_by_source) GeneralParams.srcIndexCol else GeneralParams.dstIndexCol + val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) for (chunk <- chunks) { - val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/" - if (adjListType == AdjListType.ordered_by_source) { - val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count") - FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix) - } else { - val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count") - FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix) - } + val edge_count_df = chunk.select(index_column).groupBy(index_column).count() + // init a edge count dataframe of vertex range [begin, end] to include isloated vertex + val begin_index: Long = chunk_index * vertex_chunk_size; + val end_index: Long = (chunk_index + 1) * vertex_chunk_size + val init_count_rdd = spark.sparkContext.parallelize(begin_index to end_index).map(key => Row(key, 0L)) + val init_count_df = spark.createDataFrame(init_count_rdd, offset_chunk.schema) + // union edge count dataframe and initialized count dataframe + val union_count_chunk = edge_count_df.unionByName(init_count_df).groupBy(index_column).agg(sum("count")).coalesce(1).orderBy(index_column).select("sum(count)") + // calculate offset rdd from count chunk + val offset_rdd = union_count_chunk.rdd.mapPartitionsWithIndex((i, ps) => { + var sum = 0L + var pre_sum = 0L + for (row <- ps ) yield { + pre_sum = sum + sum = sum + row.getLong(0) + Row(pre_sum) + } + }) + val offset_df = spark.createDataFrame(offset_rdd, offset_schema) + FileSystem.writeDataFrame(offset_df, FileType.FileTypeToString(file_type), output_prefix, chunk_index) chunk_index = chunk_index + 1 } } diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index f5f3f6a96..faa6e2361 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -55,6 +55,7 @@ class WriterSuite extends AnyFunSuite { val id_chunk_path = new Path(prefix + vertex_info.getDirPath(property_group) + "chunk*") val id_chunk_files = fs.globStatus(id_chunk_path) assert(id_chunk_files.length == 10) + fs.delete(new Path(prefix + "vertex")) writer.writeVertexProperties() val chunk_path = new Path(prefix + vertex_info.getPrefix() + "*/*") val chunk_files = fs.globStatus(chunk_path) @@ -65,6 +66,7 @@ class WriterSuite extends AnyFunSuite { assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group)) // close FileSystem instance + fs.delete(new Path(prefix + "vertex")) fs.close() } @@ -103,6 +105,8 @@ class WriterSuite extends AnyFunSuite { val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*") val property_group_chunk_files = fs.globStatus(property_group_path_pattern) assert(property_group_chunk_files.length == 9) + // clean generated files + // fs.delete(new Path(prefix + "edge")) // test write edges writer.writeEdges() @@ -116,6 +120,7 @@ class WriterSuite extends AnyFunSuite { assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index)) // close FileSystem instance + // fs.delete(new Path(prefix + "edge")) fs.close() } @@ -168,10 +173,13 @@ class WriterSuite extends AnyFunSuite { val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*") val property_group_chunk_files = fs.globStatus(property_group_path_pattern) assert(property_group_chunk_files.length == 11) + // clean generated files + fs.delete(new Path(prefix + "edge")) writer.writeEdges() // close FileSystem instance + // fs.delete(new Path(prefix + "edge")) fs.close() } } From d8c32d3ecb357fc35182a45b8fa031ac71628b73 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 5 Jan 2023 14:54:13 +0800 Subject: [PATCH 2/4] Add test to check the offset chunk is correct --- .../com/alibaba/graphar/writer/EdgeWriter.scala | 4 ++-- .../scala/com/alibaba/graphar/TestWriter.scala | 16 ++++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index c7711ba0b..ff6b233af 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -152,7 +152,7 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V private def writeOffset(): Unit = { val spark = edgeDf.sparkSession val file_type = edgeInfo.getAdjListFileType(adjListType) - var chunk_index: Long = 0 + var chunk_index: Int = 0 val offset_schema = StructType(Seq(StructField(GeneralParams.offsetCol, LongType))) val vertex_chunk_size = if (adjListType == AdjListType.ordered_by_source) edgeInfo.getSrc_chunk_size() else edgeInfo.getDst_chunk_size() val index_column = if (adjListType == AdjListType.ordered_by_source) GeneralParams.srcIndexCol else GeneralParams.dstIndexCol @@ -163,7 +163,7 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V val begin_index: Long = chunk_index * vertex_chunk_size; val end_index: Long = (chunk_index + 1) * vertex_chunk_size val init_count_rdd = spark.sparkContext.parallelize(begin_index to end_index).map(key => Row(key, 0L)) - val init_count_df = spark.createDataFrame(init_count_rdd, offset_chunk.schema) + val init_count_df = spark.createDataFrame(init_count_rdd, edge_count_df.schema) // union edge count dataframe and initialized count dataframe val union_count_chunk = edge_count_df.unionByName(init_count_df).groupBy(index_column).agg(sum("count")).coalesce(1).orderBy(index_column).select("sum(count)") // calculate offset rdd from count chunk diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index faa6e2361..95eabfda8 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -23,6 +23,8 @@ import org.scalatest.funsuite.AnyFunSuite import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import org.apache.hadoop.fs.{Path, FileSystem} +import scala.io.Source.fromFile + class WriterSuite extends AnyFunSuite { val spark = SparkSession.builder() @@ -105,8 +107,6 @@ class WriterSuite extends AnyFunSuite { val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*") val property_group_chunk_files = fs.globStatus(property_group_path_pattern) assert(property_group_chunk_files.length == 9) - // clean generated files - // fs.delete(new Path(prefix + "edge")) // test write edges writer.writeEdges() @@ -120,7 +120,7 @@ class WriterSuite extends AnyFunSuite { assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index)) // close FileSystem instance - // fs.delete(new Path(prefix + "edge")) + fs.delete(new Path(prefix + "edge")) fs.close() } @@ -166,6 +166,12 @@ class WriterSuite extends AnyFunSuite { val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*") val offset_chunk_files = fs.globStatus(offset_path_pattern) assert(offset_chunk_files.length == 10) + // compare with correct offset chunk value + val offset_file_path = prefix + edge_info.getAdjListOffsetFilePath(0, adj_list_type) + val correct_offset_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/offset/chunk0").getPath + val generated_offset_array = fromFile(offset_file_path).getLines.toArray + val expected_offset_array = fromFile(correct_offset_file_path).getLines.toArray + assert(generated_offset_array.sameElements(expected_offset_array)) // test write property group val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type) @@ -173,13 +179,11 @@ class WriterSuite extends AnyFunSuite { val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*") val property_group_chunk_files = fs.globStatus(property_group_path_pattern) assert(property_group_chunk_files.length == 11) - // clean generated files - fs.delete(new Path(prefix + "edge")) writer.writeEdges() // close FileSystem instance - // fs.delete(new Path(prefix + "edge")) + fs.delete(new Path(prefix + "edge")) fs.close() } } From a9f5437331bdf48ecb79d0fa1b0a786c04177721 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 5 Jan 2023 14:58:21 +0800 Subject: [PATCH 3/4] Minor fix --- spark/src/test/scala/com/alibaba/graphar/TestWriter.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index 95eabfda8..490c78a21 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -57,7 +57,6 @@ class WriterSuite extends AnyFunSuite { val id_chunk_path = new Path(prefix + vertex_info.getDirPath(property_group) + "chunk*") val id_chunk_files = fs.globStatus(id_chunk_path) assert(id_chunk_files.length == 10) - fs.delete(new Path(prefix + "vertex")) writer.writeVertexProperties() val chunk_path = new Path(prefix + vertex_info.getPrefix() + "*/*") val chunk_files = fs.globStatus(chunk_path) @@ -67,7 +66,7 @@ class WriterSuite extends AnyFunSuite { val invalid_property_group= new PropertyGroup() assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group)) - // close FileSystem instance + // clean generated files and close FileSystem instance fs.delete(new Path(prefix + "vertex")) fs.close() } @@ -119,7 +118,7 @@ class WriterSuite extends AnyFunSuite { // throw exception if pass the adj list type not contain in edge info assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index)) - // close FileSystem instance + // clean generated files and close FileSystem instance fs.delete(new Path(prefix + "edge")) fs.close() } @@ -182,7 +181,7 @@ class WriterSuite extends AnyFunSuite { writer.writeEdges() - // close FileSystem instance + // clean generated files and close FileSystem instance fs.delete(new Path(prefix + "edge")) fs.close() } From 1156c103c2263e42cdd0c62f4923b37517041684 Mon Sep 17 00:00:00 2001 From: acezen Date: Thu, 5 Jan 2023 15:01:21 +0800 Subject: [PATCH 4/4] Minor fix --- .../src/main/scala/com/alibaba/graphar/utils/FileSystem.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala index 568940a9d..726cda7f0 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala @@ -46,7 +46,6 @@ object FileSystem { * @param outputPrefix output path prefix. * @param startChunkIndex the start index of chunk. * - * raise FileAlreadyExistsException if the chunk file already exists. */ def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String, startChunkIndex: Int = 0): Unit = { val spark = dataFrame.sparkSession @@ -55,4 +54,4 @@ object FileSystem { dataFrame.write.mode("append").format(fileType).save(outputPrefix) renameSparkGeneratedFiles(spark, outputPrefix, startChunkIndex) } -} \ No newline at end of file +}