diff --git a/examples/construct_info_example.cc b/examples/construct_info_example.cc
index b3d3a880b..f24b591a8 100644
--- a/examples/construct_info_example.cc
+++ b/examples/construct_info_example.cc
@@ -67,7 +67,7 @@ int main(int argc, char* argv[]) {
assert(!vertex_info.IsPrimaryKey(gender.name).status().ok());
assert(vertex_info.GetPropertyType(id.name).value() == id.type);
assert(vertex_info.GetFilePath(group1, 0).value() ==
- "vertex/person/id/part0/chunk0");
+ "vertex/person/id/chunk0");
// extend property groups & validate
auto result = vertex_info.Extend(group2);
diff --git a/include/gar/graph_info.h b/include/gar/graph_info.h
index d3812e243..2b2f1dfde 100644
--- a/include/gar/graph_info.h
+++ b/include/gar/graph_info.h
@@ -254,8 +254,8 @@ class VertexInfo {
return Status::KeyError(
"Vertex info does not contain the property group.");
}
- return prefix_ + property_group.GetPrefix() + "part" +
- std::to_string(chunk_index) + "/" + "chunk0";
+ return prefix_ + property_group.GetPrefix() + "chunk" +
+ std::to_string(chunk_index);
}
/// Get the chunk files directory path of property group
@@ -562,7 +562,7 @@ class EdgeInfo {
return Status::KeyError("The adj list type is not found in edge info.");
}
return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/part" +
- std::to_string(vertex_chunk_index) + "/" + "chunk0";
+ std::to_string(vertex_chunk_index) + "/chunk0";
}
/// Get the adj list offset chunk file directory path of adj list type
diff --git a/spark/pom.xml b/spark/pom.xml
index 375d4250a..8d69d3a15 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -15,10 +15,11 @@
2.12
512m
1024m
- 3.1.1
+ 3.2.0
8
1.8
1.8
+ 3.3.8-public
@@ -68,6 +69,34 @@
snakeyaml
1.26
+
+ com.aliyun.odps
+ hadoop-fs-oss
+ ${cupid.sdk.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+ com.aliyun.odps
+ odps-spark-datasource_2.11
+ ${cupid.sdk.version}
+
+
+ net.jpountz.lz4
+ lz4
+
+
+
+
+ com.aliyun.odps
+ cupid-sdk
+ ${cupid.sdk.version}
+ provided
+
@@ -119,6 +148,47 @@
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.1
+
+
+ package
+
+ shade
+
+
+ false
+ true
+
+
+
+ *:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ **/log4j.properties
+
+
+
+
+
+ reference.conf
+
+
+
+
+
+
jar
diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java
index 8cfb5069b..6c8868199 100644
--- a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java
+++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar;
public class GeneralParams {
diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
index 9f408b3e6..bddd4ee09 100644
--- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
import java.io.{File, FileInputStream}
@@ -210,8 +225,8 @@ class EdgeInfo() {
def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = {
if (containAdjList(adj_list_type) == false)
throw new IllegalArgumentException
- val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/part" +
- chunk_index.toString() + "/chunk0"
+ val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/chunk" +
+ chunk_index.toString()
return str
}
@@ -256,6 +271,25 @@ class EdgeInfo() {
return str
}
+ def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long) : String = {
+ if (containPropertyGroup(property_group, adj_list_type) == false)
+ throw new IllegalArgumentException
+ var str: String = property_group.getPrefix
+ if (str == "") {
+ val properties = property_group.getProperties
+ val num = properties.size
+ for ( j <- 0 to num - 1 ) {
+ if (j > 0)
+ str += GeneralParams.regularSeperator
+ str += properties.get(j).getName;
+ }
+ str += "/"
+ }
+ str = prefix + getAdjListPrefix(adj_list_type) + str + "part" +
+ vertex_chunk_index.toString() + "/"
+ return str
+ }
+
def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = {
if (containPropertyGroup(property_group, adj_list_type) == false)
throw new IllegalArgumentException
diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
index a60813080..52ad38c66 100644
--- a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
import java.io.{File, FileInputStream}
diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
index 5cab17601..225f4999e 100644
--- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
import java.io.{File, FileInputStream}
@@ -133,7 +148,7 @@ class VertexInfo() {
} else {
str = property_group.getPrefix
}
- return prefix + str + "part" + chunk_index.toString() + "/chunk0"
+ return prefix + str + "chunk" + chunk_index.toString()
}
def getDirPath(property_group: PropertyGroup): String = {
@@ -148,6 +163,7 @@ class VertexInfo() {
str += GeneralParams.regularSeperator
str += properties.get(j).getName;
}
+ str += "/"
} else {
str = property_group.getPrefix
}
diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
new file mode 100644
index 000000000..2ce28a7d2
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
@@ -0,0 +1,44 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.utils
+
+import java.net.URI
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.DataFrame
+import org.apache.hadoop.fs
+
+object FileSystem {
+ private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
+ val sc = spark.sparkContext
+ val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
+ val path_pattern = new fs.Path(filePrefix + "part*")
+ val files = file_system.globStatus(path_pattern)
+ for (i <- 0 until files.length) {
+ val file_name = files(i).getPath.getName
+ val new_file_name = "chunk" + i.toString
+ file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name))
+ }
+ }
+
+ def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = {
+ val spark = dataFrame.sparkSession
+ spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
+ spark.conf.set("parquet.enable.summary-metadata", "false")
+ // spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
+ dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
+ renameSparkGeneratedFiles(spark, outputPrefix)
+ }
+}
\ No newline at end of file
diff --git a/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala
similarity index 87%
rename from spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala
rename to spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala
index 15bd02d3e..8d7018cf8 100644
--- a/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala
@@ -1,4 +1,21 @@
-package com.alibaba.graphar
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.utils
+
+import com.alibaba.graphar.GeneralParams
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
@@ -78,11 +95,11 @@ object IndexGenerator {
def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
dstIndexMapping.createOrReplaceTempView("dst_vertex")
- edgeDf.createOrReplaceTempView("edge")
+ edgeDf.createOrReplaceTempView("edges")
val dstCol = GeneralParams.dstIndexCol;
val indexCol = GeneralParams.vertexIndexCol;
val dstPrimaryKey = GeneralParams.primaryCol;
- val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s")
+ val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edges.* from edges inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edges.$dstColumnName%s")
// drop the old dst id col
trans_df.drop(dstColumnName)
}
diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala b/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala
new file mode 100644
index 000000000..7d68a7ef0
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala
@@ -0,0 +1,42 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.utils
+
+import org.apache.spark.sql.types._
+import org.apache.spark.Partitioner
+
+
+class ChunkPartitioner(partitions: Int, chunk_size: Long) extends Partitioner {
+ require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
+
+ def numPartitions: Int = partitions
+
+ def chunkSize: Long = chunk_size
+
+ def getPartition(key: Any): Int = key match {
+ case null => 0
+ case _ => (key.asInstanceOf[Long] / chunk_size).toInt
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case h: ChunkPartitioner =>
+ h.numPartitions == numPartitions
+ case _ =>
+ false
+ }
+
+ override def hashCode: Int = numPartitions
+}
diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
new file mode 100644
index 000000000..f3c49bb6a
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
@@ -0,0 +1,213 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.writer
+
+import com.alibaba.graphar.utils.{FileSystem, ChunkPartitioner}
+import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.{LongType, StructField}
+import org.apache.spark.util.Utils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions._
+
+import scala.collection.SortedMap
+import scala.collection.mutable.ArrayBuffer
+
+object EdgeWriter {
+ // split the whole edge dataframe into chunk dataframes by vertex chunk size.
+ private def split(edgeDf: DataFrame, keyColumnName: String, vertexChunkSize: Long): Seq[DataFrame] = {
+ // split the dataframe to mutiple daraframes by vertex chunk
+ edgeDf.cache()
+ val spark = edgeDf.sparkSession
+ import spark.implicits._
+ val df_schema = edgeDf.schema
+ val index = df_schema.fieldIndex(keyColumnName)
+ val vertex_chunk_num = math.floor(edgeDf.agg(max(keyColumnName)).head().getLong(0) / vertexChunkSize.toDouble).toInt
+ val chunks: Seq[DataFrame] = (0 to vertex_chunk_num).map {i => edgeDf.where(edgeDf(keyColumnName) >= (i * vertexChunkSize) and edgeDf(keyColumnName) < ((i + 1) * vertexChunkSize))}
+ return chunks
+ }
+
+ // repartition the chunk dataframe by edge chunk size (this is for COO)
+ private def repartition(chunkDf: DataFrame, keyColumnName: String, edgeChunkSize: Long): DataFrame = {
+ // repartition the dataframe by edge chunk size
+ val spark = chunkDf.sparkSession
+ import spark.implicits._
+ val df_schema = chunkDf.schema
+ val index = df_schema.fieldIndex(keyColumnName)
+ val df_rdd = chunkDf.rdd.map(row => (row(index).asInstanceOf[Long], row))
+
+ // generate global edge id for each record of dataframe
+ val parition_counts = df_rdd
+ .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
+ .collectAsMap()
+ val aggregatedPartitionCounts = SortedMap(parition_counts.toSeq: _*)
+ .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
+ (total + c, map + (i -> total))
+ }
+ ._2
+ val broadcastedPartitionCounts = spark.sparkContext.broadcast(aggregatedPartitionCounts)
+ val rdd_with_eid = df_rdd.mapPartitionsWithIndex((i, ps) => {
+ val start = broadcastedPartitionCounts.value(i)
+ for { ((k, row), j) <- ps.zipWithIndex } yield (start + j, row)
+ })
+ val partition_num = Math.ceil(chunkDf.count() / edgeChunkSize.toDouble).toInt
+ val partitioner = new ChunkPartitioner(partition_num, edgeChunkSize)
+ val chunks = rdd_with_eid.partitionBy(partitioner).values
+ spark.createDataFrame(chunks, df_schema)
+ }
+
+ // repartition and sort the chunk dataframe by edge chunk size (this is for CSR/CSC)
+ private def sortAndRepartition(chunkDf: DataFrame, keyColumnName: String, edgeChunkSize: Long): DataFrame = {
+ // repartition the dataframe by edge chunk size
+ val spark = chunkDf.sparkSession
+ import spark.implicits._
+ val df_schema = chunkDf.schema
+ val index = df_schema.fieldIndex(keyColumnName)
+ val rdd_ordered = chunkDf.rdd.map(row => (row(index).asInstanceOf[Long], row)).sortByKey()
+
+ // generate global edge id for each record of dataframe
+ val parition_counts = rdd_ordered
+ .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
+ .collectAsMap()
+ val aggregatedPartitionCounts = SortedMap(parition_counts.toSeq: _*)
+ .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
+ (total + c, map + (i -> total))
+ }
+ ._2
+ val broadcastedPartitionCounts = spark.sparkContext.broadcast(aggregatedPartitionCounts)
+ val rdd_with_eid = rdd_ordered.mapPartitionsWithIndex((i, ps) => {
+ val start = broadcastedPartitionCounts.value(i)
+ for { ((k, row), j) <- ps.zipWithIndex } yield (start + j, row)
+ })
+ val partition_num = Math.ceil(chunkDf.count() / edgeChunkSize.toDouble).toInt
+ val partitioner = new ChunkPartitioner(partition_num, edgeChunkSize)
+ val chunks = rdd_with_eid.repartitionAndSortWithinPartitions(partitioner).values
+ spark.createDataFrame(chunks, df_schema)
+ }
+}
+
+class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value, edgeDf: DataFrame) {
+ private var chunks: Seq[DataFrame] = preprocess()
+
+ // convert the edge dataframe to chunk dataframes
+ private def preprocess(): Seq[DataFrame] = {
+ // chunk if edge info contains the adj list type
+ if (edgeInfo.containAdjList(adjListType) == false) {
+ throw new IllegalArgumentException
+ }
+
+ // check the src index and dst index column exist
+ val src_filed = StructField(GeneralParams.srcIndexCol, LongType, false)
+ val dst_filed = StructField(GeneralParams.dstIndexCol, LongType, false)
+ val schema = edgeDf.schema
+ if (schema.contains(src_filed) == false || schema.contains(dst_filed) == false) {
+ throw new IllegalArgumentException
+ }
+ var vertex_chunk_size: Long = 0
+ var primaryColName: String = ""
+ if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.unordered_by_source) {
+ vertex_chunk_size = edgeInfo.getSrc_chunk_size()
+ primaryColName = GeneralParams.srcIndexCol
+ } else {
+ vertex_chunk_size = edgeInfo.getDst_chunk_size()
+ primaryColName = GeneralParams.dstIndexCol
+ }
+ val edges_of_vertex_chunks = EdgeWriter.split(edgeDf, primaryColName, vertex_chunk_size)
+ val vertex_chunk_num = edges_of_vertex_chunks.length
+ if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.ordered_by_dest) {
+ val processed_chunks: Seq[DataFrame] = (0 until vertex_chunk_num).map {i => EdgeWriter.sortAndRepartition(edges_of_vertex_chunks(i), primaryColName, edgeInfo.getChunk_size())}
+ return processed_chunks
+ } else {
+ val processed_chunks: Seq[DataFrame] = (0 until vertex_chunk_num).map {i => EdgeWriter.repartition(edges_of_vertex_chunks(i), primaryColName, edgeInfo.getChunk_size())}
+ return processed_chunks
+ }
+ }
+
+ // generate the Offset chunks files from edge dataframe for this edge type
+ private def writeOffset(): Unit = {
+ val file_type = edgeInfo.getAdjListFileType(adjListType)
+ var chunk_index: Long = 0
+ for (chunk <- chunks) {
+ val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/"
+ if (adjListType == AdjListType.ordered_by_source) {
+ val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count")
+ FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
+ } else {
+ val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count")
+ FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
+ }
+ chunk_index = chunk_index + 1
+ }
+ }
+
+ // generate the chunks of AdjList from edge dataframe for this edge type
+ def writeAdjList(): Unit = {
+ val file_type = edgeInfo.getAdjListFileType(adjListType)
+ var chunk_index: Long = 0
+ for (chunk <- chunks) {
+ val output_prefix = prefix + edgeInfo.getAdjListFilePath(chunk_index, adjListType)
+ val adj_list_chunk = chunk.select(GeneralParams.srcIndexCol, GeneralParams.dstIndexCol)
+ FileSystem.writeDataFrame(adj_list_chunk, FileType.FileTypeToString(file_type), output_prefix)
+ chunk_index = chunk_index + 1
+ }
+
+ if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.ordered_by_dest) {
+ writeOffset()
+ }
+ }
+
+ // generate the chunks of the property group from edge dataframe
+ def writeEdgeProperties(propertyGroup: PropertyGroup): Unit = {
+ if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) {
+ throw new IllegalArgumentException
+ }
+
+ val property_list = ArrayBuffer[String]()
+ val p_it = propertyGroup.getProperties().iterator
+ while (p_it.hasNext()) {
+ val property = p_it.next()
+ property_list += property.getName()
+ }
+ var chunk_index: Long = 0
+ for (chunk <- chunks) {
+ val output_prefix = prefix + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, chunk_index)
+ val property_group_chunk = chunk.select(property_list.map(col): _*)
+ FileSystem.writeDataFrame(property_group_chunk, propertyGroup.getFile_type(), output_prefix)
+ chunk_index = chunk_index + 1
+ }
+ }
+
+ // generate the chunks of all property groups from edge dataframe
+ def writeEdgeProperties(): Unit = {
+ val property_groups = edgeInfo.getPropertyGroups(adjListType)
+ val it = property_groups.iterator
+ while (it.hasNext()) {
+ val property_group = it.next()
+ writeEdgeProperties(property_group)
+ }
+ }
+
+ // generate the chunks for the AdjList and all property groups from edge dataframe
+ def writeEdges(): Unit = {
+ writeAdjList()
+ writeEdgeProperties()
+ }
+}
+
+
diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala
new file mode 100644
index 000000000..41780114c
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala
@@ -0,0 +1,90 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.writer
+
+import com.alibaba.graphar.utils.{FileSystem, ChunkPartitioner, IndexGenerator}
+import com.alibaba.graphar.{GeneralParams, VertexInfo, FileType, AdjListType, PropertyGroup}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.rdd.RDD
+import org.apache.spark.HashPartitioner
+import org.apache.spark.sql.types.{LongType, StructField}
+
+import scala.collection.SortedMap
+import scala.collection.mutable.ArrayBuffer
+
+
+object VertexWriter {
+ private def repartitionAndSort(vertexDf: DataFrame, chunkSize: Long): DataFrame = {
+ val vertex_df_schema = vertexDf.schema
+ val index = vertex_df_schema.fieldIndex(GeneralParams.vertexIndexCol)
+ val partition_num = Math.ceil(vertexDf.count / chunkSize.toDouble).toInt
+ val rdd = vertexDf.rdd.map(row => (row(index).asInstanceOf[Long], row))
+
+ // repartition
+ val partitioner = new ChunkPartitioner(partition_num, chunkSize)
+ val chunks_rdd = rdd.repartitionAndSortWithinPartitions(partitioner).values
+ vertexDf.sparkSession.createDataFrame(chunks_rdd, vertex_df_schema)
+ }
+}
+
+class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) {
+ private var chunks:DataFrame = preprocess()
+ private val spark = vertexDf.sparkSession
+
+ private def preprocess() : DataFrame = {
+ // check if vertex dataframe contains the index_filed
+ val index_filed = StructField(GeneralParams.vertexIndexCol, LongType)
+ if (vertexDf.schema.contains(index_filed) == false) {
+ throw new IllegalArgumentException
+ }
+ return VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size())
+ }
+
+ // generate chunks of the property group for vertex dataframe
+ def writeVertexProperties(propertyGroup: PropertyGroup): Unit = {
+ // check if contains the property group
+ if (vertexInfo.containPropertyGroup(propertyGroup) == false) {
+ throw new IllegalArgumentException
+ }
+
+ // write out the chunks
+ val output_prefix = prefix + vertexInfo.getDirPath(propertyGroup)
+ val property_list = ArrayBuffer[String]()
+ val it = propertyGroup.getProperties().iterator
+ while (it.hasNext()) {
+ val property = it.next()
+ property_list += property.getName()
+ }
+ val pg_df = chunks.select(property_list.map(col): _*)
+ FileSystem.writeDataFrame(pg_df, propertyGroup.getFile_type(), output_prefix)
+ }
+
+ // generate chunks of all property groups for vertex dataframe
+ def writeVertexProperties(): Unit = {
+ val property_groups = vertexInfo.getProperty_groups()
+ val it = property_groups.iterator
+ while (it.hasNext()) {
+ val property_group = it.next()
+ writeVertexProperties(property_group)
+ }
+ }
+}
+
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
index d1e4db582..39f125d9c 100644
--- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
+++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
@@ -1,17 +1,40 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
-import java.io.{File, FileInputStream}
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
import org.scalatest.funsuite.AnyFunSuite
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.spark.sql.SparkSession
class GraphInfoSuite extends AnyFunSuite {
+ val spark = SparkSession.builder()
+ .enableHiveSupport()
+ .master("local[*]")
+ .getOrCreate()
test("load graph info") {
- val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml")
- val yaml = new Yaml(new Constructor(classOf[GraphInfo]))
- val graph_info = yaml.load(input).asInstanceOf[GraphInfo]
+ // read graph yaml
+ val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
+ val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = fs.open(yaml_path)
+ val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
+ val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo]
assert(graph_info.getName == "ldbc_sample")
assert(graph_info.getPrefix == "" )
@@ -27,7 +50,9 @@ class GraphInfoSuite extends AnyFunSuite {
}
test("load vertex info") {
- val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml")
+ val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
+ val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = yaml.load(input).asInstanceOf[VertexInfo]
@@ -51,8 +76,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(vertex_info.containPropertyGroup(property_group))
assert(vertex_info.getPropertyType("id") == GarType.INT64)
assert(vertex_info.isPrimaryKey("id"))
- assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0")
- assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0")
+ assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/chunk0")
+ assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/chunk4")
assert(vertex_info.getDirPath(property_group) == "vertex/person/id/")
assert(vertex_info.containProperty("firstName"))
@@ -63,8 +88,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(vertex_info.containPropertyGroup(property_group_2))
assert(vertex_info.getPropertyType("firstName") == GarType.STRING)
assert(vertex_info.isPrimaryKey("firstName") == false)
- assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0")
- assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0")
+ assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/chunk0")
+ assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/chunk4")
assert(vertex_info.getDirPath(property_group_2) == "vertex/person/firstName_lastName_gender/")
assert(vertex_info.containProperty("not_exist") == false)
@@ -74,7 +99,9 @@ class GraphInfoSuite extends AnyFunSuite {
}
test("load edge info") {
- val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml")
+ val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
+ val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = yaml.load(input).asInstanceOf[EdgeInfo]
@@ -100,8 +127,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2")
assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/")
assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/")
- assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0")
- assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk4")
assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/")
val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0)
assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source))
@@ -124,8 +151,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2")
assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/")
assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/")
- assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0")
- assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk4")
assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/")
val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0)
assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest))
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala
index a2dcea274..5f71ed0b8 100644
--- a/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala
+++ b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala
@@ -1,5 +1,22 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
+import com.alibaba.graphar.utils.IndexGenerator
+
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.funsuite.AnyFunSuite
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
new file mode 100644
index 000000000..f5f3f6a96
--- /dev/null
+++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
@@ -0,0 +1,177 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar
+
+import com.alibaba.graphar.utils.IndexGenerator
+import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter}
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.scalatest.funsuite.AnyFunSuite
+import org.yaml.snakeyaml.Yaml
+import org.yaml.snakeyaml.constructor.Constructor
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+class WriterSuite extends AnyFunSuite {
+ val spark = SparkSession.builder()
+ .enableHiveSupport()
+ .master("local[*]")
+ .getOrCreate()
+
+ test("test vertex writer with only vertex table") {
+ // read vertex dataframe
+ val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath
+ val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)
+ val fs = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+
+ // read vertex yaml
+ val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath)
+ val vertex_input = fs.open(vertex_yaml_path)
+ val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
+ val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
+
+ // generate vertex index column for vertex dataframe
+ val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df)
+
+ // create writer object for person and generate the properties with GAR format
+ val prefix : String = "/tmp/"
+ val writer = new VertexWriter(prefix, vertex_info, vertex_df_with_index)
+
+ // write certain property group
+ val property_group = vertex_info.getPropertyGroup("id")
+ writer.writeVertexProperties(property_group)
+ val id_chunk_path = new Path(prefix + vertex_info.getDirPath(property_group) + "chunk*")
+ val id_chunk_files = fs.globStatus(id_chunk_path)
+ assert(id_chunk_files.length == 10)
+ writer.writeVertexProperties()
+ val chunk_path = new Path(prefix + vertex_info.getPrefix() + "*/*")
+ val chunk_files = fs.globStatus(chunk_path)
+ assert(chunk_files.length == 20)
+
+ assertThrows[IllegalArgumentException](new VertexWriter(prefix, vertex_info, vertex_df))
+ val invalid_property_group= new PropertyGroup()
+ assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group))
+
+ // close FileSystem instance
+ fs.close()
+ }
+
+ test("test edge writer with only edge table") {
+ // read edge dataframe
+ val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath
+ val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)
+ val prefix : String = "/tmp/"
+ val fs = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+
+ // read edge yaml
+ val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
+ val edge_input = fs.open(edge_yaml_path)
+ val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
+ val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
+ val adj_list_type = AdjListType.ordered_by_source
+
+ // generate vertex index for edge dataframe
+ val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst")
+
+ // create writer object for person_knows_person and generate the adj list and properties with GAR format
+ val writer = new EdgeWriter(prefix, edge_info, adj_list_type, edge_df_with_index)
+
+ // test write adj list
+ writer.writeAdjList()
+ val adj_list_path_pattern = new Path(prefix + edge_info.getAdjListDirPath(adj_list_type) + "*/*")
+ val adj_list_chunk_files = fs.globStatus(adj_list_path_pattern)
+ assert(adj_list_chunk_files.length == 9)
+ val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*")
+ val offset_chunk_files = fs.globStatus(offset_path_pattern)
+ assert(offset_chunk_files.length == 7)
+
+ // test write property group
+ val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
+ writer.writeEdgeProperties(property_group)
+ val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*")
+ val property_group_chunk_files = fs.globStatus(property_group_path_pattern)
+ assert(property_group_chunk_files.length == 9)
+
+ // test write edges
+ writer.writeEdges()
+
+ val invalid_property_group = new PropertyGroup()
+
+ assertThrows[IllegalArgumentException](writer.writeEdgeProperties(invalid_property_group))
+ // throw exception if not generate src index and dst index for edge dataframe
+ assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.ordered_by_source, edge_df))
+ // throw exception if pass the adj list type not contain in edge info
+ assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index))
+
+ // close FileSystem instance
+ fs.close()
+ }
+
+ test("test edge writer with vertex table and edge table") {
+ // read vertex dataframe
+ val vertex_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath
+ val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(vertex_file_path)
+
+ // read edge dataframe
+ val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath
+ val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)
+
+ val prefix : String = "/tmp/test2/"
+ val fs = FileSystem.get(new Path(prefix).toUri(), spark.sparkContext.hadoopConfiguration)
+ val adj_list_type = AdjListType.ordered_by_source
+
+ // read vertex yaml
+ val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
+ val vertex_input = fs.open(vertex_yaml_path)
+ val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
+ val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
+
+ // read edge yaml
+ val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
+ val edge_input = fs.open(edge_yaml_path)
+ val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
+ val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
+
+ // construct person vertex mapping with dataframe
+ val vertex_mapping = IndexGenerator.constructVertexIndexMapping(vertex_df, vertex_info.getPrimaryKey())
+ // generate src index and dst index for edge datafram with vertex mapping
+ val edge_df_with_src_index = IndexGenerator.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping)
+ val edge_df_with_src_dst_index = IndexGenerator.generateDstIndexForEdgesFromMapping(edge_df_with_src_index, "dst", vertex_mapping)
+
+ // create writer object for person_knows_person and generate the adj list and properties with GAR format
+ val writer = new EdgeWriter(prefix, edge_info, adj_list_type, edge_df_with_src_dst_index)
+
+ // test write adj list
+ writer.writeAdjList()
+ val adj_list_path_pattern = new Path(prefix + edge_info.getAdjListDirPath(adj_list_type) + "*/*")
+ val adj_list_chunk_files = fs.globStatus(adj_list_path_pattern)
+ assert(adj_list_chunk_files.length == 11)
+ val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*")
+ val offset_chunk_files = fs.globStatus(offset_path_pattern)
+ assert(offset_chunk_files.length == 10)
+
+ // test write property group
+ val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
+ writer.writeEdgeProperties(property_group)
+ val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*")
+ val property_group_chunk_files = fs.globStatus(property_group_path_pattern)
+ assert(property_group_chunk_files.length == 11)
+
+ writer.writeEdges()
+
+ // close FileSystem instance
+ fs.close()
+ }
+}
diff --git a/test/gar-test b/test/gar-test
index 19f1e57b9..57e6011f0 160000
--- a/test/gar-test
+++ b/test/gar-test
@@ -1 +1 @@
-Subproject commit 19f1e57b9c137ad5447667c4bf71bbc2a1e4d371
+Subproject commit 57e6011f0c624771275ec9f005dd29e15cd3a302
diff --git a/test/test_arrow_chunk_writer.cc b/test/test_arrow_chunk_writer.cc
index d070dcec3..9e0295ff4 100644
--- a/test/test_arrow_chunk_writer.cc
+++ b/test/test_arrow_chunk_writer.cc
@@ -79,9 +79,9 @@ TEST_CASE("test_orc_and_parquet_reader") {
arrow::Status st;
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::string path1 = TEST_DATA_DIR + "/ldbc_sample/orc" +
- "/vertex/person/firstName_lastName_gender/part1/chunk0";
+ "/vertex/person/firstName_lastName_gender/chunk1";
std::string path2 = TEST_DATA_DIR + "/ldbc_sample/parquet" +
- "/vertex/person/firstName_lastName_gender/part1/chunk0";
+ "/vertex/person/firstName_lastName_gender/chunk1";
arrow::io::IOContext io_context = arrow::io::default_io_context();
// Open ORC file reader
diff --git a/test/test_chunk_info_reader.cc b/test/test_chunk_info_reader.cc
index 389e5c33e..aae200da5 100644
--- a/test/test_chunk_info_reader.cc
+++ b/test/test_chunk_info_reader.cc
@@ -47,24 +47,24 @@ TEST_CASE("test_vertex_property_chunk_info_reader") {
REQUIRE(maybe_chunk_path.status().ok());
std::string chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part0/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk0");
REQUIRE(reader.seek(520).ok());
maybe_chunk_path = reader.GetChunk();
REQUIRE(maybe_chunk_path.status().ok());
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part5/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk5");
REQUIRE(reader.next_chunk().ok());
maybe_chunk_path = reader.GetChunk();
REQUIRE(maybe_chunk_path.status().ok());
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part6/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk6");
REQUIRE(reader.seek(900).ok());
maybe_chunk_path = reader.GetChunk();
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part9/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk9");
// now is end of the chunks
REQUIRE(reader.next_chunk().IsOutOfRange());
diff --git a/test/test_info.cc b/test/test_info.cc
index f47e6a064..671b8da86 100644
--- a/test/test_info.cc
+++ b/test/test_info.cc
@@ -130,7 +130,7 @@ TEST_CASE("test_vertex_info") {
// test get file path
auto maybe_path = v_info.GetFilePath(pg, 0);
REQUIRE(!maybe_path.has_error());
- REQUIRE(maybe_path.value() == expected_dir_path + "part0/chunk0");
+ REQUIRE(maybe_path.value() == expected_dir_path + "chunk0");
// property group not exist
REQUIRE(v_info.GetFilePath(pg2, 0).status().IsKeyError());