Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix][Spark] Fix offset chunk output path and offset value of spark writer #63

Merged
merged 4 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ import org.apache.hadoop.fs

/** Helper object to write dataframe to chunk files */
object FileSystem {
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String, startChunkIndex: Int): Unit = {
val sc = spark.sparkContext
val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
val path_pattern = new fs.Path(filePrefix + "part*")
val files = file_system.globStatus(path_pattern)
for (i <- 0 until files.length) {
val file_name = files(i).getPath.getName
val new_file_name = "chunk" + i.toString
file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name))
val new_file_name = "chunk" + (i + startChunkIndex).toString
val destPath = new fs.Path(filePrefix + new_file_name)
if (file_system.isFile(destPath)) {
// if chunk file already exists, overwrite it
file_system.delete(destPath)
}
file_system.rename(new fs.Path(filePrefix + file_name), destPath)
}
}

Expand All @@ -39,13 +44,14 @@ object FileSystem {
* @param dataframe DataFrame to write out.
* @param fileType output file format type, the value could be csv|parquet|orc.
* @param outputPrefix output path prefix.
* @param startChunkIndex the start index of chunk.
*
*/
def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = {
def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String, startChunkIndex: Int = 0): Unit = {
val spark = dataFrame.sparkSession
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
// spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
renameSparkGeneratedFiles(spark, outputPrefix)
dataFrame.write.mode("append").format(fileType).save(outputPrefix)
renameSparkGeneratedFiles(spark, outputPrefix, startChunkIndex)
}
}
}
39 changes: 28 additions & 11 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, Prop
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField}
import org.apache.spark.sql.types.{IntegerType, LongType, StructType, StructField}
import org.apache.spark.util.Utils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -148,19 +148,36 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
}
}

// generate the Offset chunks files from edge dataframe for this edge type
// generate the offset chunks files from edge dataframe for this edge type
private def writeOffset(): Unit = {
val spark = edgeDf.sparkSession
val file_type = edgeInfo.getAdjListFileType(adjListType)
var chunk_index: Long = 0
var chunk_index: Int = 0
val offset_schema = StructType(Seq(StructField(GeneralParams.offsetCol, LongType)))
val vertex_chunk_size = if (adjListType == AdjListType.ordered_by_source) edgeInfo.getSrc_chunk_size() else edgeInfo.getDst_chunk_size()
val index_column = if (adjListType == AdjListType.ordered_by_source) GeneralParams.srcIndexCol else GeneralParams.dstIndexCol
val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType)
for (chunk <- chunks) {
val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/"
if (adjListType == AdjListType.ordered_by_source) {
val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count")
FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
} else {
val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count")
FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
}
val edge_count_df = chunk.select(index_column).groupBy(index_column).count()
// init a edge count dataframe of vertex range [begin, end] to include isloated vertex
val begin_index: Long = chunk_index * vertex_chunk_size;
val end_index: Long = (chunk_index + 1) * vertex_chunk_size
val init_count_rdd = spark.sparkContext.parallelize(begin_index to end_index).map(key => Row(key, 0L))
val init_count_df = spark.createDataFrame(init_count_rdd, edge_count_df.schema)
// union edge count dataframe and initialized count dataframe
val union_count_chunk = edge_count_df.unionByName(init_count_df).groupBy(index_column).agg(sum("count")).coalesce(1).orderBy(index_column).select("sum(count)")
// calculate offset rdd from count chunk
val offset_rdd = union_count_chunk.rdd.mapPartitionsWithIndex((i, ps) => {
var sum = 0L
var pre_sum = 0L
for (row <- ps ) yield {
pre_sum = sum
sum = sum + row.getLong(0)
Row(pre_sum)
}
})
val offset_df = spark.createDataFrame(offset_rdd, offset_schema)
FileSystem.writeDataFrame(offset_df, FileType.FileTypeToString(file_type), output_prefix, chunk_index)
chunk_index = chunk_index + 1
}
}
Expand Down
17 changes: 14 additions & 3 deletions spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.scalatest.funsuite.AnyFunSuite
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import org.apache.hadoop.fs.{Path, FileSystem}
import scala.io.Source.fromFile


class WriterSuite extends AnyFunSuite {
val spark = SparkSession.builder()
Expand Down Expand Up @@ -64,7 +66,8 @@ class WriterSuite extends AnyFunSuite {
val invalid_property_group= new PropertyGroup()
assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group))

// close FileSystem instance
// clean generated files and close FileSystem instance
fs.delete(new Path(prefix + "vertex"))
fs.close()
}

Expand Down Expand Up @@ -115,7 +118,8 @@ class WriterSuite extends AnyFunSuite {
// throw exception if pass the adj list type not contain in edge info
assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index))

// close FileSystem instance
// clean generated files and close FileSystem instance
fs.delete(new Path(prefix + "edge"))
fs.close()
}

Expand Down Expand Up @@ -161,6 +165,12 @@ class WriterSuite extends AnyFunSuite {
val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*")
val offset_chunk_files = fs.globStatus(offset_path_pattern)
assert(offset_chunk_files.length == 10)
// compare with correct offset chunk value
val offset_file_path = prefix + edge_info.getAdjListOffsetFilePath(0, adj_list_type)
val correct_offset_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/offset/chunk0").getPath
val generated_offset_array = fromFile(offset_file_path).getLines.toArray
val expected_offset_array = fromFile(correct_offset_file_path).getLines.toArray
assert(generated_offset_array.sameElements(expected_offset_array))

// test write property group
val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
Expand All @@ -171,7 +181,8 @@ class WriterSuite extends AnyFunSuite {

writer.writeEdges()

// close FileSystem instance
// clean generated files and close FileSystem instance
fs.delete(new Path(prefix + "edge"))
fs.close()
}
}