diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala index 45d46961b..726cda7f0 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala @@ -22,15 +22,20 @@ import org.apache.hadoop.fs /** Helper object to write dataframe to chunk files */ object FileSystem { - private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = { + private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String, startChunkIndex: Int): Unit = { val sc = spark.sparkContext val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration) val path_pattern = new fs.Path(filePrefix + "part*") val files = file_system.globStatus(path_pattern) for (i <- 0 until files.length) { val file_name = files(i).getPath.getName - val new_file_name = "chunk" + i.toString - file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name)) + val new_file_name = "chunk" + (i + startChunkIndex).toString + val destPath = new fs.Path(filePrefix + new_file_name) + if (file_system.isFile(destPath)) { + // if chunk file already exists, overwrite it + file_system.delete(destPath) + } + file_system.rename(new fs.Path(filePrefix + file_name), destPath) } } @@ -39,13 +44,14 @@ object FileSystem { * @param dataframe DataFrame to write out. * @param fileType output file format type, the value could be csv|parquet|orc. * @param outputPrefix output path prefix. + * @param startChunkIndex the start index of chunk. + * */ - def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = { + def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String, startChunkIndex: Int = 0): Unit = { val spark = dataFrame.sparkSession spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") spark.conf.set("parquet.enable.summary-metadata", "false") - // spark.conf.set("spark.sql.parquet.compression.codec", "zstd") - dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix) - renameSparkGeneratedFiles(spark, outputPrefix) + dataFrame.write.mode("append").format(fileType).save(outputPrefix) + renameSparkGeneratedFiles(spark, outputPrefix, startChunkIndex) } -} \ No newline at end of file +} diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index b14f2f20c..ff6b233af 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -21,7 +21,7 @@ import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, Prop import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.{LongType, StructField} +import org.apache.spark.sql.types.{IntegerType, LongType, StructType, StructField} import org.apache.spark.util.Utils import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ @@ -148,19 +148,36 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V } } - // generate the Offset chunks files from edge dataframe for this edge type + // generate the offset chunks files from edge dataframe for this edge type private def writeOffset(): Unit = { + val spark = edgeDf.sparkSession val file_type = edgeInfo.getAdjListFileType(adjListType) - var chunk_index: Long = 0 + var chunk_index: Int = 0 + val offset_schema = StructType(Seq(StructField(GeneralParams.offsetCol, LongType))) + val vertex_chunk_size = if (adjListType == AdjListType.ordered_by_source) edgeInfo.getSrc_chunk_size() else edgeInfo.getDst_chunk_size() + val index_column = if (adjListType == AdjListType.ordered_by_source) GeneralParams.srcIndexCol else GeneralParams.dstIndexCol + val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) for (chunk <- chunks) { - val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/" - if (adjListType == AdjListType.ordered_by_source) { - val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count") - FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix) - } else { - val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count") - FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix) - } + val edge_count_df = chunk.select(index_column).groupBy(index_column).count() + // init a edge count dataframe of vertex range [begin, end] to include isloated vertex + val begin_index: Long = chunk_index * vertex_chunk_size; + val end_index: Long = (chunk_index + 1) * vertex_chunk_size + val init_count_rdd = spark.sparkContext.parallelize(begin_index to end_index).map(key => Row(key, 0L)) + val init_count_df = spark.createDataFrame(init_count_rdd, edge_count_df.schema) + // union edge count dataframe and initialized count dataframe + val union_count_chunk = edge_count_df.unionByName(init_count_df).groupBy(index_column).agg(sum("count")).coalesce(1).orderBy(index_column).select("sum(count)") + // calculate offset rdd from count chunk + val offset_rdd = union_count_chunk.rdd.mapPartitionsWithIndex((i, ps) => { + var sum = 0L + var pre_sum = 0L + for (row <- ps ) yield { + pre_sum = sum + sum = sum + row.getLong(0) + Row(pre_sum) + } + }) + val offset_df = spark.createDataFrame(offset_rdd, offset_schema) + FileSystem.writeDataFrame(offset_df, FileType.FileTypeToString(file_type), output_prefix, chunk_index) chunk_index = chunk_index + 1 } } diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index f5f3f6a96..490c78a21 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -23,6 +23,8 @@ import org.scalatest.funsuite.AnyFunSuite import org.yaml.snakeyaml.Yaml import org.yaml.snakeyaml.constructor.Constructor import org.apache.hadoop.fs.{Path, FileSystem} +import scala.io.Source.fromFile + class WriterSuite extends AnyFunSuite { val spark = SparkSession.builder() @@ -64,7 +66,8 @@ class WriterSuite extends AnyFunSuite { val invalid_property_group= new PropertyGroup() assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group)) - // close FileSystem instance + // clean generated files and close FileSystem instance + fs.delete(new Path(prefix + "vertex")) fs.close() } @@ -115,7 +118,8 @@ class WriterSuite extends AnyFunSuite { // throw exception if pass the adj list type not contain in edge info assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index)) - // close FileSystem instance + // clean generated files and close FileSystem instance + fs.delete(new Path(prefix + "edge")) fs.close() } @@ -161,6 +165,12 @@ class WriterSuite extends AnyFunSuite { val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*") val offset_chunk_files = fs.globStatus(offset_path_pattern) assert(offset_chunk_files.length == 10) + // compare with correct offset chunk value + val offset_file_path = prefix + edge_info.getAdjListOffsetFilePath(0, adj_list_type) + val correct_offset_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/offset/chunk0").getPath + val generated_offset_array = fromFile(offset_file_path).getLines.toArray + val expected_offset_array = fromFile(correct_offset_file_path).getLines.toArray + assert(generated_offset_array.sameElements(expected_offset_array)) // test write property group val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type) @@ -171,7 +181,8 @@ class WriterSuite extends AnyFunSuite { writer.writeEdges() - // close FileSystem instance + // clean generated files and close FileSystem instance + fs.delete(new Path(prefix + "edge")) fs.close() } }