diff --git a/docs/api-reference.rst b/docs/api-reference.rst
index f4e3322cd..dbe83f947 100644
--- a/docs/api-reference.rst
+++ b/docs/api-reference.rst
@@ -174,7 +174,7 @@ Id Type
Data Type
~~~~~~~~~~~~~~~~~~~
-.. doxygenstruct:: GraphArchive::DataType
+.. doxygenclass:: GraphArchive::DataType
:members:
:undoc-members:
diff --git a/docs/images/edge_physical_table1.png b/docs/images/edge_physical_table1.png
index d224589c5..589867c71 100644
Binary files a/docs/images/edge_physical_table1.png and b/docs/images/edge_physical_table1.png differ
diff --git a/docs/images/edge_physical_table2.png b/docs/images/edge_physical_table2.png
index 728c0ecf6..047c01f7a 100644
Binary files a/docs/images/edge_physical_table2.png and b/docs/images/edge_physical_table2.png differ
diff --git a/docs/images/vertex_physical_table.png b/docs/images/vertex_physical_table.png
index f864c41e8..15ff249b3 100644
Binary files a/docs/images/vertex_physical_table.png and b/docs/images/vertex_physical_table.png differ
diff --git a/docs/user-guide/getting-started.rst b/docs/user-guide/getting-started.rst
index 83f1b61bf..34ddd6a6d 100644
--- a/docs/user-guide/getting-started.rst
+++ b/docs/user-guide/getting-started.rst
@@ -47,7 +47,7 @@ GAR Data Files
Property data
`````````````
The vertex properties are stored in vertex property chunks with the chunk size specified by the vertex information file. Different property groups correspond to individual groups of data files.
-In our example, the property group ("first name", "last name", "gender") for vertex chunk 0 of "person" vertices are stored in `./vertex/person/firstName_lastName_gender/part0`_.
+In our example, the property group ("first name", "last name", "gender") for vertex chunk 0 of "person" vertices are stored in `./vertex/person/firstName_lastName_gender/chunk0`_.
In practice of graph processing, it is common to only query a subset of columns of the properties. Thus, the column-oriented formats like Apache ORC and Apache Parquet are more efficient, which eliminate the need to read columns that are not relevant. We also provide data files in ORC and Parquet for the example graph in the `test data`_.
@@ -63,7 +63,7 @@ For example, the file `./edge/person_knows_person/ordered_by_source/adj_list/par
.. note::
- If the edges are ordered, there may be offset chunks to construct the index for accessing edges of a single vertex. It stores the start offset for each vertex's edges, see `./edge/person_knows_person/ordered_by_source/offset/part0`_ as an example.
+ If the edges are ordered, there may be offset chunks to construct the index for accessing edges of a single vertex. It stores the start offset for each vertex's edges, see `./edge/person_knows_person/ordered_by_source/offset/chunk0`_ as an example.
How to Use GAR
@@ -184,7 +184,7 @@ Please refer to `more examples <../applications/out-of-core.html>`_ for learning
.. _person_knows_person.edge.yml: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/csv/person_knows_person.edge.yml
-.. _./vertex/person/firstName_lastName_gender/part0: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/csv/vertex/person/firstName_lastName_gender/part0
+.. _./vertex/person/firstName_lastName_gender/chunk0: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/csv/vertex/person/firstName_lastName_gender/chunk0
.. _test data: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/
@@ -192,7 +192,7 @@ Please refer to `more examples <../applications/out-of-core.html>`_ for learning
.. _./edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/adj_list/part0/chunk0
-.. _./edge/person_knows_person/ordered_by_source/offset/part0: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/offset/part0
+.. _./edge/person_knows_person/ordered_by_source/offset/chunk0: https://github.com/GraphScope/gar-test/blob/main/ldbc_sample/csv/edge/person_knows_person/ordered_by_source/offset/chunk0
.. _example program: https://github.com/alibaba/GraphAr/blob/main/examples/construct_info_example.cc
diff --git a/examples/construct_info_example.cc b/examples/construct_info_example.cc
index b3d3a880b..f24b591a8 100644
--- a/examples/construct_info_example.cc
+++ b/examples/construct_info_example.cc
@@ -67,7 +67,7 @@ int main(int argc, char* argv[]) {
assert(!vertex_info.IsPrimaryKey(gender.name).status().ok());
assert(vertex_info.GetPropertyType(id.name).value() == id.type);
assert(vertex_info.GetFilePath(group1, 0).value() ==
- "vertex/person/id/part0/chunk0");
+ "vertex/person/id/chunk0");
// extend property groups & validate
auto result = vertex_info.Extend(group2);
diff --git a/include/gar/graph_info.h b/include/gar/graph_info.h
index d3812e243..8683809c8 100644
--- a/include/gar/graph_info.h
+++ b/include/gar/graph_info.h
@@ -254,8 +254,8 @@ class VertexInfo {
return Status::KeyError(
"Vertex info does not contain the property group.");
}
- return prefix_ + property_group.GetPrefix() + "part" +
- std::to_string(chunk_index) + "/" + "chunk0";
+ return prefix_ + property_group.GetPrefix() + "chunk" +
+ std::to_string(chunk_index);
}
/// Get the chunk files directory path of property group
@@ -561,8 +561,8 @@ class EdgeInfo {
if (!ContainAdjList(adj_list_type)) {
return Status::KeyError("The adj list type is not found in edge info.");
}
- return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/part" +
- std::to_string(vertex_chunk_index) + "/" + "chunk0";
+ return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/chunk" +
+ std::to_string(vertex_chunk_index);
}
/// Get the adj list offset chunk file directory path of adj list type
diff --git a/requirements-dev.txt b/requirements-dev.txt
index 4f1c05bf6..f5fdaa3b1 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -1,5 +1,5 @@
breathe
-docutils==0.16
+docutils
furo # sphinx theme
nbsphinx
sphinx>=3.0.2
diff --git a/spark/pom.xml b/spark/pom.xml
index 375d4250a..8d69d3a15 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -15,10 +15,11 @@
2.12
512m
1024m
- 3.1.1
+ 3.2.0
8
1.8
1.8
+ 3.3.8-public
@@ -68,6 +69,34 @@
snakeyaml
1.26
+
+ com.aliyun.odps
+ hadoop-fs-oss
+ ${cupid.sdk.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+
+
+
+
+ com.aliyun.odps
+ odps-spark-datasource_2.11
+ ${cupid.sdk.version}
+
+
+ net.jpountz.lz4
+ lz4
+
+
+
+
+ com.aliyun.odps
+ cupid-sdk
+ ${cupid.sdk.version}
+ provided
+
@@ -119,6 +148,47 @@
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.1
+
+
+ package
+
+ shade
+
+
+ false
+ true
+
+
+
+ *:*
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+ **/log4j.properties
+
+
+
+
+
+ reference.conf
+
+
+
+
+
+
jar
diff --git a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java
index 8cfb5069b..6c8868199 100644
--- a/spark/src/main/java/com/alibaba/graphar/GeneralParams.java
+++ b/spark/src/main/java/com/alibaba/graphar/GeneralParams.java
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar;
public class GeneralParams {
diff --git a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
index 9f408b3e6..bddd4ee09 100644
--- a/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
import java.io.{File, FileInputStream}
@@ -210,8 +225,8 @@ class EdgeInfo() {
def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = {
if (containAdjList(adj_list_type) == false)
throw new IllegalArgumentException
- val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/part" +
- chunk_index.toString() + "/chunk0"
+ val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/chunk" +
+ chunk_index.toString()
return str
}
@@ -256,6 +271,25 @@ class EdgeInfo() {
return str
}
+ def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long) : String = {
+ if (containPropertyGroup(property_group, adj_list_type) == false)
+ throw new IllegalArgumentException
+ var str: String = property_group.getPrefix
+ if (str == "") {
+ val properties = property_group.getProperties
+ val num = properties.size
+ for ( j <- 0 to num - 1 ) {
+ if (j > 0)
+ str += GeneralParams.regularSeperator
+ str += properties.get(j).getName;
+ }
+ str += "/"
+ }
+ str = prefix + getAdjListPrefix(adj_list_type) + str + "part" +
+ vertex_chunk_index.toString() + "/"
+ return str
+ }
+
def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = {
if (containPropertyGroup(property_group, adj_list_type) == false)
throw new IllegalArgumentException
diff --git a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
index a60813080..52ad38c66 100644
--- a/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
import java.io.{File, FileInputStream}
diff --git a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
index 5cab17601..225f4999e 100644
--- a/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
@@ -1,3 +1,18 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
import java.io.{File, FileInputStream}
@@ -133,7 +148,7 @@ class VertexInfo() {
} else {
str = property_group.getPrefix
}
- return prefix + str + "part" + chunk_index.toString() + "/chunk0"
+ return prefix + str + "chunk" + chunk_index.toString()
}
def getDirPath(property_group: PropertyGroup): String = {
@@ -148,6 +163,7 @@ class VertexInfo() {
str += GeneralParams.regularSeperator
str += properties.get(j).getName;
}
+ str += "/"
} else {
str = property_group.getPrefix
}
diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala
new file mode 100644
index 000000000..4e3b2dcde
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala
@@ -0,0 +1,216 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.reader
+
+import com.alibaba.graphar.utils.{IndexGenerator}
+import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.functions._
+
+class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value,spark:SparkSession) {
+ // load a single offset chunk as a DataFrame
+ def readOffset(chunk_index: Long): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ if (adjListType != AdjListType.ordered_by_source && adjListType != AdjListType.ordered_by_dest)
+ throw new IllegalArgumentException
+ val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
+ val file_type = FileType.FileTypeToString(file_type_in_gar)
+ val file_path = prefix + "/" + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType)
+ val df = spark.read.format(file_type).load(file_path)
+ return df
+ }
+
+ // load a single AdjList chunk as a DataFrame
+ def readAdjListChunk(vertex_chunk_index: Long, chunk_index: Long): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
+ val file_type = FileType.FileTypeToString(file_type_in_gar)
+ val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType)
+ val df = spark.read.format(file_type).load(file_path)
+ return df
+ }
+
+ // load all AdjList chunks for a vertex chunk as a DataFrame
+ def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, adjListType)
+ val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+ val path_pattern = new Path(file_path + "chunk*")
+ val chunk_number = file_system.globStatus(path_pattern).length
+ var df = spark.emptyDataFrame
+ for ( i <- 0 to chunk_number - 1) {
+ val new_df = readAdjListChunk(vertex_chunk_index, i)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.union(new_df)
+ }
+ if (addIndex)
+ df = IndexGenerator.generateEdgeIndexColumn(df)
+ return df
+ }
+
+ // load all AdjList chunks for this edge type as a DataFrame
+ def readAllAdjList(addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ val file_path = prefix + "/" + edgeInfo.getAdjListDirPath(adjListType)
+ val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+ val path_pattern = new Path(file_path + "part*")
+ val vertex_chunk_number = file_system.globStatus(path_pattern).length
+ var df = spark.emptyDataFrame
+ for ( i <- 0 to vertex_chunk_number - 1) {
+ val new_df = readAdjListForVertexChunk(i)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.union(new_df)
+ }
+ if (addIndex)
+ df = IndexGenerator.generateEdgeIndexColumn(df)
+ return df
+ }
+
+ // load a single edge property chunk as a DataFrame
+ def readEdgePropertyChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, chunk_index: Long): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
+ throw new IllegalArgumentException
+ val file_type = propertyGroup.getFile_type();
+ val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index)
+ val df = spark.read.format(file_type).load(file_path)
+ return df
+ }
+
+ // load the chunks for a property group of a vertex chunk as a DataFrame
+ def readEdgePropertiesForVertexChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
+ throw new IllegalArgumentException
+ val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index)
+ val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+ val path_pattern = new Path(file_path + "chunk*")
+ val chunk_number = file_system.globStatus(path_pattern).length
+ var df = spark.emptyDataFrame
+ for ( i <- 0 to chunk_number - 1) {
+ val new_df = readEdgePropertyChunk(propertyGroup, vertex_chunk_index, i)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.union(new_df)
+ }
+ if (addIndex)
+ df = IndexGenerator.generateEdgeIndexColumn(df)
+ return df
+ }
+
+ // load all chunks for a property group as a DataFrame
+ def readEdgeProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
+ throw new IllegalArgumentException
+ val file_path = prefix + "/" + edgeInfo.getPropertyDirPath(propertyGroup, adjListType)
+ val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+ val path_pattern = new Path(file_path + "part*")
+ val vertex_chunk_number = file_system.globStatus(path_pattern).length
+ var df = spark.emptyDataFrame
+ for ( i <- 0 to vertex_chunk_number - 1) {
+ val new_df = readEdgePropertiesForVertexChunk(propertyGroup, i)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.union(new_df)
+ }
+ if (addIndex)
+ df = IndexGenerator.generateEdgeIndexColumn(df)
+ return df
+ }
+
+ // load the chunks for all property groups of a vertex chunk as a DataFrame
+ def readAllEdgePropertiesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ var df = spark.emptyDataFrame
+ val property_groups = edgeInfo.getPropertyGroups(adjListType)
+ val len: Int = property_groups.size
+ for ( i <- 0 to len - 1 ) {
+ val pg: PropertyGroup = property_groups.get(i)
+ val new_df = readEdgePropertiesForVertexChunk(pg, vertex_chunk_index, true)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.join(new_df, Seq(GeneralParams.edgeIndexCol))
+ }
+ df = df.sort(GeneralParams.edgeIndexCol)
+ if (addIndex == false)
+ df = df.drop(GeneralParams.edgeIndexCol)
+ return df
+ }
+
+ // load the chunks for all property groups as a DataFrame
+ def readAllEdgeProperties(addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ var df = spark.emptyDataFrame
+ val property_groups = edgeInfo.getPropertyGroups(adjListType)
+ val len: Int = property_groups.size
+ for ( i <- 0 to len - 1 ) {
+ val pg: PropertyGroup = property_groups.get(i)
+ val new_df = readEdgeProperties(pg, true)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.join(new_df, Seq(GeneralParams.edgeIndexCol))
+ }
+ df = df.sort(GeneralParams.edgeIndexCol)
+ if (addIndex == false)
+ df = df.drop(GeneralParams.edgeIndexCol)
+ return df
+ }
+
+ // load the chunks for the AdjList and all property groups for a vertex chunk as a DataFrame
+ def readEdgesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ val adjList_df = readAdjListForVertexChunk(vertex_chunk_index, true)
+ val properties_df = readAllEdgePropertiesForVertexChunk(vertex_chunk_index, true)
+ var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol)
+ if (addIndex == false)
+ df = df.drop(GeneralParams.edgeIndexCol)
+ return df
+ }
+
+ // load the chunks for the AdjList and all property groups as a DataFrame
+ def readEdges(addIndex: Boolean = false): DataFrame = {
+ if (edgeInfo.containAdjList(adjListType) == false)
+ throw new IllegalArgumentException
+ val adjList_df = readAllAdjList(true)
+ val properties_df = readAllEdgeProperties(true)
+ var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol);
+ if (addIndex == false)
+ df = df.drop(GeneralParams.edgeIndexCol)
+ return df
+ }
+}
\ No newline at end of file
diff --git a/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala
new file mode 100644
index 000000000..7032c1a22
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala
@@ -0,0 +1,89 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.reader
+
+import com.alibaba.graphar.utils.{IndexGenerator}
+import com.alibaba.graphar.{GeneralParams, VertexInfo, FileType, PropertyGroup}
+
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.functions._
+
+class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) {
+ private val vertices_number = readVerticesNumber()
+ private val chunk_size = vertexInfo.getChunk_size()
+ private var chunk_number = vertices_number / chunk_size
+ if (vertices_number % chunk_size != 0)
+ chunk_number = chunk_number + 1
+
+ // load the total number of vertices for this vertex type
+ def readVerticesNumber(): Long = {
+ val file_path = prefix + "/" + vertexInfo.getVerticesNumFilePath()
+ val path = new Path(file_path)
+ val file_system = FileSystem.get(path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = file_system.open(path)
+ val number = java.lang.Long.reverseBytes(input.readLong())
+ file_system.close()
+ return number
+ }
+
+ // load a single vertex property chunk as a DataFrame
+ def readVertexPropertyChunk(propertyGroup: PropertyGroup, chunk_index: Long): DataFrame = {
+ if (vertexInfo.containPropertyGroup(propertyGroup) == false)
+ throw new IllegalArgumentException
+ val file_type = propertyGroup.getFile_type()
+ val file_path = prefix + "/" + vertexInfo.getFilePath(propertyGroup, chunk_index)
+ val df = spark.read.format(file_type).load(file_path)
+ return df
+ }
+
+ // load all chunks for a property group as a DataFrame
+ def readVertexProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = {
+ if (vertexInfo.containPropertyGroup(propertyGroup) == false)
+ throw new IllegalArgumentException
+ var df = spark.emptyDataFrame
+ for ( i <- 0L to chunk_number - 1) {
+ val new_df = readVertexPropertyChunk(propertyGroup, i)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.union(new_df)
+ }
+ if (addIndex)
+ df = IndexGenerator.generateVertexIndexColumn(df)
+ return df
+ }
+
+ // load the chunks for all property groups as a DataFrame
+ def readAllVertexProperties(addIndex: Boolean = false): DataFrame = {
+ var df = spark.emptyDataFrame
+ val property_groups = vertexInfo.getProperty_groups()
+ val len: Int = property_groups.size
+ for ( i <- 0 to len - 1 ) {
+ val pg: PropertyGroup = property_groups.get(i)
+ val new_df = readVertexProperties(pg, true)
+ if (i == 0)
+ df = new_df
+ else
+ df = df.join(new_df, Seq(GeneralParams.vertexIndexCol))
+ }
+ df = df.sort(GeneralParams.vertexIndexCol)
+ if (addIndex == false)
+ df = df.drop(GeneralParams.vertexIndexCol)
+ return df
+ }
+}
\ No newline at end of file
diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
new file mode 100644
index 000000000..2ce28a7d2
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
@@ -0,0 +1,44 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.utils
+
+import java.net.URI
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.DataFrame
+import org.apache.hadoop.fs
+
+object FileSystem {
+ private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
+ val sc = spark.sparkContext
+ val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
+ val path_pattern = new fs.Path(filePrefix + "part*")
+ val files = file_system.globStatus(path_pattern)
+ for (i <- 0 until files.length) {
+ val file_name = files(i).getPath.getName
+ val new_file_name = "chunk" + i.toString
+ file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name))
+ }
+ }
+
+ def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = {
+ val spark = dataFrame.sparkSession
+ spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
+ spark.conf.set("parquet.enable.summary-metadata", "false")
+ // spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
+ dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
+ renameSparkGeneratedFiles(spark, outputPrefix)
+ }
+}
\ No newline at end of file
diff --git a/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala
similarity index 75%
rename from spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala
rename to spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala
index 15bd02d3e..2e6f594f9 100644
--- a/spark/src/main/scala/com/alibaba/graphar/IndexGenerator.scala
+++ b/spark/src/main/scala/com/alibaba/graphar/utils/IndexGenerator.scala
@@ -1,4 +1,21 @@
-package com.alibaba.graphar
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.utils
+
+import com.alibaba.graphar.GeneralParams
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
@@ -61,6 +78,28 @@ object IndexGenerator {
//index helper for the Edge DataFrame
+ //add a column contains edge index
+ def generateEdgeIndexColumn(edgeDf: DataFrame): DataFrame = {
+ val spark = edgeDf.sparkSession
+ val schema = edgeDf.schema
+ val schema_with_index = StructType(StructType(Seq(StructField(GeneralParams.edgeIndexCol, LongType, true)))++schema)
+ val rdd = edgeDf.rdd
+ val counts = rdd
+ .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
+ .collectAsMap()
+ val aggregatedCounts = SortedMap(counts.toSeq: _*)
+ .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
+ (total + c, map + (i -> total))
+ }
+ ._2
+ val broadcastedCounts = spark.sparkContext.broadcast(aggregatedCounts)
+ val rdd_with_index = rdd.mapPartitionsWithIndex((i, ps) => {
+ val start = broadcastedCounts.value(i)
+ for { (p, j) <- ps.zipWithIndex } yield Row.fromSeq(Seq(start + j) ++ p.toSeq)
+ })
+ spark.createDataFrame(rdd_with_index, schema_with_index)
+ }
+
// join the edge table with the vertex index mapping for source column
def generateSrcIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, srcIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
@@ -78,11 +117,11 @@ object IndexGenerator {
def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
dstIndexMapping.createOrReplaceTempView("dst_vertex")
- edgeDf.createOrReplaceTempView("edge")
+ edgeDf.createOrReplaceTempView("edges")
val dstCol = GeneralParams.dstIndexCol;
val indexCol = GeneralParams.vertexIndexCol;
val dstPrimaryKey = GeneralParams.primaryCol;
- val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s")
+ val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edges.* from edges inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edges.$dstColumnName%s")
// drop the old dst id col
trans_df.drop(dstColumnName)
}
diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala b/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala
new file mode 100644
index 000000000..7d68a7ef0
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala
@@ -0,0 +1,42 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.utils
+
+import org.apache.spark.sql.types._
+import org.apache.spark.Partitioner
+
+
+class ChunkPartitioner(partitions: Int, chunk_size: Long) extends Partitioner {
+ require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
+
+ def numPartitions: Int = partitions
+
+ def chunkSize: Long = chunk_size
+
+ def getPartition(key: Any): Int = key match {
+ case null => 0
+ case _ => (key.asInstanceOf[Long] / chunk_size).toInt
+ }
+
+ override def equals(other: Any): Boolean = other match {
+ case h: ChunkPartitioner =>
+ h.numPartitions == numPartitions
+ case _ =>
+ false
+ }
+
+ override def hashCode: Int = numPartitions
+}
diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
new file mode 100644
index 000000000..f3c49bb6a
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
@@ -0,0 +1,213 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.writer
+
+import com.alibaba.graphar.utils.{FileSystem, ChunkPartitioner}
+import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.{LongType, StructField}
+import org.apache.spark.util.Utils
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions._
+
+import scala.collection.SortedMap
+import scala.collection.mutable.ArrayBuffer
+
+object EdgeWriter {
+ // split the whole edge dataframe into chunk dataframes by vertex chunk size.
+ private def split(edgeDf: DataFrame, keyColumnName: String, vertexChunkSize: Long): Seq[DataFrame] = {
+ // split the dataframe to mutiple daraframes by vertex chunk
+ edgeDf.cache()
+ val spark = edgeDf.sparkSession
+ import spark.implicits._
+ val df_schema = edgeDf.schema
+ val index = df_schema.fieldIndex(keyColumnName)
+ val vertex_chunk_num = math.floor(edgeDf.agg(max(keyColumnName)).head().getLong(0) / vertexChunkSize.toDouble).toInt
+ val chunks: Seq[DataFrame] = (0 to vertex_chunk_num).map {i => edgeDf.where(edgeDf(keyColumnName) >= (i * vertexChunkSize) and edgeDf(keyColumnName) < ((i + 1) * vertexChunkSize))}
+ return chunks
+ }
+
+ // repartition the chunk dataframe by edge chunk size (this is for COO)
+ private def repartition(chunkDf: DataFrame, keyColumnName: String, edgeChunkSize: Long): DataFrame = {
+ // repartition the dataframe by edge chunk size
+ val spark = chunkDf.sparkSession
+ import spark.implicits._
+ val df_schema = chunkDf.schema
+ val index = df_schema.fieldIndex(keyColumnName)
+ val df_rdd = chunkDf.rdd.map(row => (row(index).asInstanceOf[Long], row))
+
+ // generate global edge id for each record of dataframe
+ val parition_counts = df_rdd
+ .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
+ .collectAsMap()
+ val aggregatedPartitionCounts = SortedMap(parition_counts.toSeq: _*)
+ .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
+ (total + c, map + (i -> total))
+ }
+ ._2
+ val broadcastedPartitionCounts = spark.sparkContext.broadcast(aggregatedPartitionCounts)
+ val rdd_with_eid = df_rdd.mapPartitionsWithIndex((i, ps) => {
+ val start = broadcastedPartitionCounts.value(i)
+ for { ((k, row), j) <- ps.zipWithIndex } yield (start + j, row)
+ })
+ val partition_num = Math.ceil(chunkDf.count() / edgeChunkSize.toDouble).toInt
+ val partitioner = new ChunkPartitioner(partition_num, edgeChunkSize)
+ val chunks = rdd_with_eid.partitionBy(partitioner).values
+ spark.createDataFrame(chunks, df_schema)
+ }
+
+ // repartition and sort the chunk dataframe by edge chunk size (this is for CSR/CSC)
+ private def sortAndRepartition(chunkDf: DataFrame, keyColumnName: String, edgeChunkSize: Long): DataFrame = {
+ // repartition the dataframe by edge chunk size
+ val spark = chunkDf.sparkSession
+ import spark.implicits._
+ val df_schema = chunkDf.schema
+ val index = df_schema.fieldIndex(keyColumnName)
+ val rdd_ordered = chunkDf.rdd.map(row => (row(index).asInstanceOf[Long], row)).sortByKey()
+
+ // generate global edge id for each record of dataframe
+ val parition_counts = rdd_ordered
+ .mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
+ .collectAsMap()
+ val aggregatedPartitionCounts = SortedMap(parition_counts.toSeq: _*)
+ .foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
+ (total + c, map + (i -> total))
+ }
+ ._2
+ val broadcastedPartitionCounts = spark.sparkContext.broadcast(aggregatedPartitionCounts)
+ val rdd_with_eid = rdd_ordered.mapPartitionsWithIndex((i, ps) => {
+ val start = broadcastedPartitionCounts.value(i)
+ for { ((k, row), j) <- ps.zipWithIndex } yield (start + j, row)
+ })
+ val partition_num = Math.ceil(chunkDf.count() / edgeChunkSize.toDouble).toInt
+ val partitioner = new ChunkPartitioner(partition_num, edgeChunkSize)
+ val chunks = rdd_with_eid.repartitionAndSortWithinPartitions(partitioner).values
+ spark.createDataFrame(chunks, df_schema)
+ }
+}
+
+class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value, edgeDf: DataFrame) {
+ private var chunks: Seq[DataFrame] = preprocess()
+
+ // convert the edge dataframe to chunk dataframes
+ private def preprocess(): Seq[DataFrame] = {
+ // chunk if edge info contains the adj list type
+ if (edgeInfo.containAdjList(adjListType) == false) {
+ throw new IllegalArgumentException
+ }
+
+ // check the src index and dst index column exist
+ val src_filed = StructField(GeneralParams.srcIndexCol, LongType, false)
+ val dst_filed = StructField(GeneralParams.dstIndexCol, LongType, false)
+ val schema = edgeDf.schema
+ if (schema.contains(src_filed) == false || schema.contains(dst_filed) == false) {
+ throw new IllegalArgumentException
+ }
+ var vertex_chunk_size: Long = 0
+ var primaryColName: String = ""
+ if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.unordered_by_source) {
+ vertex_chunk_size = edgeInfo.getSrc_chunk_size()
+ primaryColName = GeneralParams.srcIndexCol
+ } else {
+ vertex_chunk_size = edgeInfo.getDst_chunk_size()
+ primaryColName = GeneralParams.dstIndexCol
+ }
+ val edges_of_vertex_chunks = EdgeWriter.split(edgeDf, primaryColName, vertex_chunk_size)
+ val vertex_chunk_num = edges_of_vertex_chunks.length
+ if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.ordered_by_dest) {
+ val processed_chunks: Seq[DataFrame] = (0 until vertex_chunk_num).map {i => EdgeWriter.sortAndRepartition(edges_of_vertex_chunks(i), primaryColName, edgeInfo.getChunk_size())}
+ return processed_chunks
+ } else {
+ val processed_chunks: Seq[DataFrame] = (0 until vertex_chunk_num).map {i => EdgeWriter.repartition(edges_of_vertex_chunks(i), primaryColName, edgeInfo.getChunk_size())}
+ return processed_chunks
+ }
+ }
+
+ // generate the Offset chunks files from edge dataframe for this edge type
+ private def writeOffset(): Unit = {
+ val file_type = edgeInfo.getAdjListFileType(adjListType)
+ var chunk_index: Long = 0
+ for (chunk <- chunks) {
+ val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/"
+ if (adjListType == AdjListType.ordered_by_source) {
+ val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count")
+ FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
+ } else {
+ val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count")
+ FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
+ }
+ chunk_index = chunk_index + 1
+ }
+ }
+
+ // generate the chunks of AdjList from edge dataframe for this edge type
+ def writeAdjList(): Unit = {
+ val file_type = edgeInfo.getAdjListFileType(adjListType)
+ var chunk_index: Long = 0
+ for (chunk <- chunks) {
+ val output_prefix = prefix + edgeInfo.getAdjListFilePath(chunk_index, adjListType)
+ val adj_list_chunk = chunk.select(GeneralParams.srcIndexCol, GeneralParams.dstIndexCol)
+ FileSystem.writeDataFrame(adj_list_chunk, FileType.FileTypeToString(file_type), output_prefix)
+ chunk_index = chunk_index + 1
+ }
+
+ if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.ordered_by_dest) {
+ writeOffset()
+ }
+ }
+
+ // generate the chunks of the property group from edge dataframe
+ def writeEdgeProperties(propertyGroup: PropertyGroup): Unit = {
+ if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) {
+ throw new IllegalArgumentException
+ }
+
+ val property_list = ArrayBuffer[String]()
+ val p_it = propertyGroup.getProperties().iterator
+ while (p_it.hasNext()) {
+ val property = p_it.next()
+ property_list += property.getName()
+ }
+ var chunk_index: Long = 0
+ for (chunk <- chunks) {
+ val output_prefix = prefix + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, chunk_index)
+ val property_group_chunk = chunk.select(property_list.map(col): _*)
+ FileSystem.writeDataFrame(property_group_chunk, propertyGroup.getFile_type(), output_prefix)
+ chunk_index = chunk_index + 1
+ }
+ }
+
+ // generate the chunks of all property groups from edge dataframe
+ def writeEdgeProperties(): Unit = {
+ val property_groups = edgeInfo.getPropertyGroups(adjListType)
+ val it = property_groups.iterator
+ while (it.hasNext()) {
+ val property_group = it.next()
+ writeEdgeProperties(property_group)
+ }
+ }
+
+ // generate the chunks for the AdjList and all property groups from edge dataframe
+ def writeEdges(): Unit = {
+ writeAdjList()
+ writeEdgeProperties()
+ }
+}
+
+
diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala
new file mode 100644
index 000000000..41780114c
--- /dev/null
+++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala
@@ -0,0 +1,90 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar.writer
+
+import com.alibaba.graphar.utils.{FileSystem, ChunkPartitioner, IndexGenerator}
+import com.alibaba.graphar.{GeneralParams, VertexInfo, FileType, AdjListType, PropertyGroup}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.rdd.RDD
+import org.apache.spark.HashPartitioner
+import org.apache.spark.sql.types.{LongType, StructField}
+
+import scala.collection.SortedMap
+import scala.collection.mutable.ArrayBuffer
+
+
+object VertexWriter {
+ private def repartitionAndSort(vertexDf: DataFrame, chunkSize: Long): DataFrame = {
+ val vertex_df_schema = vertexDf.schema
+ val index = vertex_df_schema.fieldIndex(GeneralParams.vertexIndexCol)
+ val partition_num = Math.ceil(vertexDf.count / chunkSize.toDouble).toInt
+ val rdd = vertexDf.rdd.map(row => (row(index).asInstanceOf[Long], row))
+
+ // repartition
+ val partitioner = new ChunkPartitioner(partition_num, chunkSize)
+ val chunks_rdd = rdd.repartitionAndSortWithinPartitions(partitioner).values
+ vertexDf.sparkSession.createDataFrame(chunks_rdd, vertex_df_schema)
+ }
+}
+
+class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) {
+ private var chunks:DataFrame = preprocess()
+ private val spark = vertexDf.sparkSession
+
+ private def preprocess() : DataFrame = {
+ // check if vertex dataframe contains the index_filed
+ val index_filed = StructField(GeneralParams.vertexIndexCol, LongType)
+ if (vertexDf.schema.contains(index_filed) == false) {
+ throw new IllegalArgumentException
+ }
+ return VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size())
+ }
+
+ // generate chunks of the property group for vertex dataframe
+ def writeVertexProperties(propertyGroup: PropertyGroup): Unit = {
+ // check if contains the property group
+ if (vertexInfo.containPropertyGroup(propertyGroup) == false) {
+ throw new IllegalArgumentException
+ }
+
+ // write out the chunks
+ val output_prefix = prefix + vertexInfo.getDirPath(propertyGroup)
+ val property_list = ArrayBuffer[String]()
+ val it = propertyGroup.getProperties().iterator
+ while (it.hasNext()) {
+ val property = it.next()
+ property_list += property.getName()
+ }
+ val pg_df = chunks.select(property_list.map(col): _*)
+ FileSystem.writeDataFrame(pg_df, propertyGroup.getFile_type(), output_prefix)
+ }
+
+ // generate chunks of all property groups for vertex dataframe
+ def writeVertexProperties(): Unit = {
+ val property_groups = vertexInfo.getProperty_groups()
+ val it = property_groups.iterator
+ while (it.hasNext()) {
+ val property_group = it.next()
+ writeVertexProperties(property_group)
+ }
+ }
+}
+
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
index d1e4db582..39f125d9c 100644
--- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
+++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
@@ -1,17 +1,40 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
-import java.io.{File, FileInputStream}
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
import org.scalatest.funsuite.AnyFunSuite
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.spark.sql.SparkSession
class GraphInfoSuite extends AnyFunSuite {
+ val spark = SparkSession.builder()
+ .enableHiveSupport()
+ .master("local[*]")
+ .getOrCreate()
test("load graph info") {
- val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml")
- val yaml = new Yaml(new Constructor(classOf[GraphInfo]))
- val graph_info = yaml.load(input).asInstanceOf[GraphInfo]
+ // read graph yaml
+ val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
+ val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = fs.open(yaml_path)
+ val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
+ val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo]
assert(graph_info.getName == "ldbc_sample")
assert(graph_info.getPrefix == "" )
@@ -27,7 +50,9 @@ class GraphInfoSuite extends AnyFunSuite {
}
test("load vertex info") {
- val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml")
+ val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
+ val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = yaml.load(input).asInstanceOf[VertexInfo]
@@ -51,8 +76,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(vertex_info.containPropertyGroup(property_group))
assert(vertex_info.getPropertyType("id") == GarType.INT64)
assert(vertex_info.isPrimaryKey("id"))
- assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0")
- assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0")
+ assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/chunk0")
+ assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/chunk4")
assert(vertex_info.getDirPath(property_group) == "vertex/person/id/")
assert(vertex_info.containProperty("firstName"))
@@ -63,8 +88,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(vertex_info.containPropertyGroup(property_group_2))
assert(vertex_info.getPropertyType("firstName") == GarType.STRING)
assert(vertex_info.isPrimaryKey("firstName") == false)
- assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0")
- assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0")
+ assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/chunk0")
+ assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/chunk4")
assert(vertex_info.getDirPath(property_group_2) == "vertex/person/firstName_lastName_gender/")
assert(vertex_info.containProperty("not_exist") == false)
@@ -74,7 +99,9 @@ class GraphInfoSuite extends AnyFunSuite {
}
test("load edge info") {
- val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml")
+ val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
+ val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
+ val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = yaml.load(input).asInstanceOf[EdgeInfo]
@@ -100,8 +127,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2")
assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/")
assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/")
- assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0")
- assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk4")
assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/")
val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0)
assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source))
@@ -124,8 +151,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2")
assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/")
assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/")
- assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0")
- assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk0")
+ assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk4")
assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/")
val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0)
assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest))
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala
index a2dcea274..5f71ed0b8 100644
--- a/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala
+++ b/spark/src/test/scala/com/alibaba/graphar/TestIndexGenerator.scala
@@ -1,5 +1,22 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package com.alibaba.graphar
+import com.alibaba.graphar.utils.IndexGenerator
+
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.funsuite.AnyFunSuite
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestReader.scala b/spark/src/test/scala/com/alibaba/graphar/TestReader.scala
new file mode 100644
index 000000000..8c0bf687a
--- /dev/null
+++ b/spark/src/test/scala/com/alibaba/graphar/TestReader.scala
@@ -0,0 +1,138 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar
+
+import com.alibaba.graphar.reader.{VertexReader, EdgeReader}
+
+import java.io.{File, FileInputStream}
+import org.yaml.snakeyaml.Yaml
+import org.yaml.snakeyaml.constructor.Constructor
+import scala.beans.BeanProperty
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.scalatest.funsuite.AnyFunSuite
+
+class ReaderSuite extends AnyFunSuite {
+ val spark = SparkSession.builder()
+ .enableHiveSupport()
+ .master("local[*]")
+ .getOrCreate()
+
+ test("read vertex chunks") {
+ val file_path = "gar-test/ldbc_sample/csv"
+ val prefix = getClass.getClassLoader.getResource(file_path).getPath
+ val vertex_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person.vertex.yml")
+ val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
+ val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
+
+ val reader = new VertexReader(prefix, vertex_info, spark)
+ assert(reader.readVerticesNumber() == 903)
+ val property_group = vertex_info.getPropertyGroup("gender")
+ val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
+ assert(single_chunk_df.columns.size == 3)
+ assert(single_chunk_df.count() == 100)
+ val property_df = reader.readVertexProperties(property_group)
+ assert(property_df.columns.size == 3)
+ assert(property_df.count() == 903)
+ val vertex_df = reader.readAllVertexProperties()
+ vertex_df.show()
+ assert(vertex_df.columns.size == 4)
+ assert(vertex_df.count() == 903)
+ val vertex_df_with_index = reader.readAllVertexProperties(true)
+ vertex_df_with_index.show()
+ assert(vertex_df_with_index.columns.size == 5)
+ assert(vertex_df_with_index.count() == 903)
+
+ val invalid_property_group= new PropertyGroup()
+ assertThrows[IllegalArgumentException](reader.readVertexPropertyChunk(invalid_property_group, 0))
+ assertThrows[IllegalArgumentException](reader.readVertexProperties(invalid_property_group))
+ }
+
+ test("read edge chunks") {
+ val file_path = "gar-test/ldbc_sample/csv"
+ val prefix = getClass.getClassLoader.getResource(file_path).getPath
+ val edge_input = getClass.getClassLoader.getResourceAsStream(file_path + "/person_knows_person.edge.yml")
+ val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
+ val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
+
+ val adj_list_type = AdjListType.ordered_by_source
+ val reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
+ val offset_df = reader.readOffset(0)
+ assert(offset_df.columns.size == 1)
+ assert(offset_df.count() == 101)
+ val single_adj_list_df = reader.readAdjListChunk(2, 0)
+ assert(single_adj_list_df.columns.size == 2)
+ assert(single_adj_list_df.count() == 1024)
+ val adj_list_df_chunk_2 = reader.readAdjListForVertexChunk(2)
+ assert(adj_list_df_chunk_2.columns.size == 2)
+ assert(adj_list_df_chunk_2.count() == 1077)
+ val adj_list_df = reader.readAllAdjList()
+ assert(adj_list_df.columns.size == 2)
+ assert(adj_list_df.count() == 6626)
+
+ val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
+ val single_property_df = reader.readEdgePropertyChunk(property_group, 2, 0)
+ assert(single_property_df.columns.size == 1)
+ assert(single_property_df.count() == 1024)
+ val property_df_chunk_2 = reader.readEdgePropertiesForVertexChunk(property_group, 2)
+ assert(property_df_chunk_2.columns.size == 1)
+ assert(property_df_chunk_2.count() == 1077)
+ val property_df = reader.readEdgeProperties(property_group)
+ assert(property_df.columns.size == 1)
+ assert(property_df.count() == 6626)
+ val all_property_df_chunk_2 = reader.readAllEdgePropertiesForVertexChunk(2)
+ assert(all_property_df_chunk_2.columns.size == 1)
+ assert(all_property_df_chunk_2.count() == 1077)
+ val all_property_df = reader.readAllEdgeProperties()
+ assert(all_property_df.columns.size == 1)
+ assert(all_property_df.count() == 6626)
+
+ val edge_df_chunk_2 = reader.readEdgesForVertexChunk(2)
+ edge_df_chunk_2.show()
+ assert(edge_df_chunk_2.columns.size == 3)
+ assert(edge_df_chunk_2.count() == 1077)
+ val edge_df_chunk_2_with_index = reader.readEdgesForVertexChunk(2, true)
+ edge_df_chunk_2_with_index.show()
+ assert(edge_df_chunk_2_with_index.columns.size == 4)
+ assert(edge_df_chunk_2_with_index.count() == 1077)
+ val edge_df = reader.readEdges()
+ edge_df.show()
+ assert(edge_df.columns.size == 3)
+ assert(edge_df.count() == 6626)
+ val edge_df_with_index = reader.readEdges(true)
+ edge_df_with_index.show()
+ assert(edge_df_with_index.columns.size == 4)
+ assert(edge_df_with_index.count() == 6626)
+
+ val invalid_property_group= new PropertyGroup()
+ assertThrows[IllegalArgumentException](reader.readEdgePropertyChunk(invalid_property_group, 0, 0))
+ assertThrows[IllegalArgumentException](reader.readEdgePropertiesForVertexChunk(invalid_property_group, 0))
+ assertThrows[IllegalArgumentException](reader.readEdgeProperties(invalid_property_group))
+
+ val invalid_adj_list_type = AdjListType.unordered_by_dest
+ val invalid_reader = new EdgeReader(prefix, edge_info, invalid_adj_list_type, spark)
+ assertThrows[IllegalArgumentException](invalid_reader.readOffset(0))
+ assertThrows[IllegalArgumentException](invalid_reader.readAdjListChunk(0, 0))
+ assertThrows[IllegalArgumentException](invalid_reader.readAdjListForVertexChunk(0))
+ assertThrows[IllegalArgumentException](invalid_reader.readAllAdjList())
+ assertThrows[IllegalArgumentException](invalid_reader.readEdgePropertyChunk(property_group, 0, 0))
+ assertThrows[IllegalArgumentException](invalid_reader.readEdgePropertiesForVertexChunk(property_group, 0))
+ assertThrows[IllegalArgumentException](invalid_reader.readEdgeProperties(property_group))
+ assertThrows[IllegalArgumentException](invalid_reader.readAllEdgePropertiesForVertexChunk(0))
+ assertThrows[IllegalArgumentException](invalid_reader.readAllEdgeProperties())
+ assertThrows[IllegalArgumentException](invalid_reader.readEdgesForVertexChunk(0))
+ assertThrows[IllegalArgumentException](invalid_reader.readEdges())
+ }
+}
\ No newline at end of file
diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
new file mode 100644
index 000000000..f5f3f6a96
--- /dev/null
+++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
@@ -0,0 +1,177 @@
+/** Copyright 2022 Alibaba Group Holding Limited.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.graphar
+
+import com.alibaba.graphar.utils.IndexGenerator
+import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter}
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.scalatest.funsuite.AnyFunSuite
+import org.yaml.snakeyaml.Yaml
+import org.yaml.snakeyaml.constructor.Constructor
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+class WriterSuite extends AnyFunSuite {
+ val spark = SparkSession.builder()
+ .enableHiveSupport()
+ .master("local[*]")
+ .getOrCreate()
+
+ test("test vertex writer with only vertex table") {
+ // read vertex dataframe
+ val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath
+ val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)
+ val fs = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+
+ // read vertex yaml
+ val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath)
+ val vertex_input = fs.open(vertex_yaml_path)
+ val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
+ val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
+
+ // generate vertex index column for vertex dataframe
+ val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df)
+
+ // create writer object for person and generate the properties with GAR format
+ val prefix : String = "/tmp/"
+ val writer = new VertexWriter(prefix, vertex_info, vertex_df_with_index)
+
+ // write certain property group
+ val property_group = vertex_info.getPropertyGroup("id")
+ writer.writeVertexProperties(property_group)
+ val id_chunk_path = new Path(prefix + vertex_info.getDirPath(property_group) + "chunk*")
+ val id_chunk_files = fs.globStatus(id_chunk_path)
+ assert(id_chunk_files.length == 10)
+ writer.writeVertexProperties()
+ val chunk_path = new Path(prefix + vertex_info.getPrefix() + "*/*")
+ val chunk_files = fs.globStatus(chunk_path)
+ assert(chunk_files.length == 20)
+
+ assertThrows[IllegalArgumentException](new VertexWriter(prefix, vertex_info, vertex_df))
+ val invalid_property_group= new PropertyGroup()
+ assertThrows[IllegalArgumentException](writer.writeVertexProperties(invalid_property_group))
+
+ // close FileSystem instance
+ fs.close()
+ }
+
+ test("test edge writer with only edge table") {
+ // read edge dataframe
+ val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath
+ val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)
+ val prefix : String = "/tmp/"
+ val fs = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
+
+ // read edge yaml
+ val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
+ val edge_input = fs.open(edge_yaml_path)
+ val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
+ val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
+ val adj_list_type = AdjListType.ordered_by_source
+
+ // generate vertex index for edge dataframe
+ val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst")
+
+ // create writer object for person_knows_person and generate the adj list and properties with GAR format
+ val writer = new EdgeWriter(prefix, edge_info, adj_list_type, edge_df_with_index)
+
+ // test write adj list
+ writer.writeAdjList()
+ val adj_list_path_pattern = new Path(prefix + edge_info.getAdjListDirPath(adj_list_type) + "*/*")
+ val adj_list_chunk_files = fs.globStatus(adj_list_path_pattern)
+ assert(adj_list_chunk_files.length == 9)
+ val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*")
+ val offset_chunk_files = fs.globStatus(offset_path_pattern)
+ assert(offset_chunk_files.length == 7)
+
+ // test write property group
+ val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
+ writer.writeEdgeProperties(property_group)
+ val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*")
+ val property_group_chunk_files = fs.globStatus(property_group_path_pattern)
+ assert(property_group_chunk_files.length == 9)
+
+ // test write edges
+ writer.writeEdges()
+
+ val invalid_property_group = new PropertyGroup()
+
+ assertThrows[IllegalArgumentException](writer.writeEdgeProperties(invalid_property_group))
+ // throw exception if not generate src index and dst index for edge dataframe
+ assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.ordered_by_source, edge_df))
+ // throw exception if pass the adj list type not contain in edge info
+ assertThrows[IllegalArgumentException](new EdgeWriter(prefix, edge_info, AdjListType.unordered_by_dest, edge_df_with_index))
+
+ // close FileSystem instance
+ fs.close()
+ }
+
+ test("test edge writer with vertex table and edge table") {
+ // read vertex dataframe
+ val vertex_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath
+ val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(vertex_file_path)
+
+ // read edge dataframe
+ val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath
+ val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)
+
+ val prefix : String = "/tmp/test2/"
+ val fs = FileSystem.get(new Path(prefix).toUri(), spark.sparkContext.hadoopConfiguration)
+ val adj_list_type = AdjListType.ordered_by_source
+
+ // read vertex yaml
+ val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
+ val vertex_input = fs.open(vertex_yaml_path)
+ val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
+ val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
+
+ // read edge yaml
+ val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
+ val edge_input = fs.open(edge_yaml_path)
+ val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
+ val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
+
+ // construct person vertex mapping with dataframe
+ val vertex_mapping = IndexGenerator.constructVertexIndexMapping(vertex_df, vertex_info.getPrimaryKey())
+ // generate src index and dst index for edge datafram with vertex mapping
+ val edge_df_with_src_index = IndexGenerator.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping)
+ val edge_df_with_src_dst_index = IndexGenerator.generateDstIndexForEdgesFromMapping(edge_df_with_src_index, "dst", vertex_mapping)
+
+ // create writer object for person_knows_person and generate the adj list and properties with GAR format
+ val writer = new EdgeWriter(prefix, edge_info, adj_list_type, edge_df_with_src_dst_index)
+
+ // test write adj list
+ writer.writeAdjList()
+ val adj_list_path_pattern = new Path(prefix + edge_info.getAdjListDirPath(adj_list_type) + "*/*")
+ val adj_list_chunk_files = fs.globStatus(adj_list_path_pattern)
+ assert(adj_list_chunk_files.length == 11)
+ val offset_path_pattern = new Path(prefix + edge_info.getAdjListOffsetDirPath(adj_list_type) + "*")
+ val offset_chunk_files = fs.globStatus(offset_path_pattern)
+ assert(offset_chunk_files.length == 10)
+
+ // test write property group
+ val property_group = edge_info.getPropertyGroup("creationDate", adj_list_type)
+ writer.writeEdgeProperties(property_group)
+ val property_group_path_pattern = new Path(prefix + edge_info.getPropertyDirPath(property_group, adj_list_type) + "*/*")
+ val property_group_chunk_files = fs.globStatus(property_group_path_pattern)
+ assert(property_group_chunk_files.length == 11)
+
+ writer.writeEdges()
+
+ // close FileSystem instance
+ fs.close()
+ }
+}
diff --git a/test/gar-test b/test/gar-test
index 19f1e57b9..de8bdac46 160000
--- a/test/gar-test
+++ b/test/gar-test
@@ -1 +1 @@
-Subproject commit 19f1e57b9c137ad5447667c4bf71bbc2a1e4d371
+Subproject commit de8bdac46b250dead8e71c4392976bcfe65cdcfe
diff --git a/test/test_arrow_chunk_writer.cc b/test/test_arrow_chunk_writer.cc
index d070dcec3..9e0295ff4 100644
--- a/test/test_arrow_chunk_writer.cc
+++ b/test/test_arrow_chunk_writer.cc
@@ -79,9 +79,9 @@ TEST_CASE("test_orc_and_parquet_reader") {
arrow::Status st;
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::string path1 = TEST_DATA_DIR + "/ldbc_sample/orc" +
- "/vertex/person/firstName_lastName_gender/part1/chunk0";
+ "/vertex/person/firstName_lastName_gender/chunk1";
std::string path2 = TEST_DATA_DIR + "/ldbc_sample/parquet" +
- "/vertex/person/firstName_lastName_gender/part1/chunk0";
+ "/vertex/person/firstName_lastName_gender/chunk1";
arrow::io::IOContext io_context = arrow::io::default_io_context();
// Open ORC file reader
diff --git a/test/test_chunk_info_reader.cc b/test/test_chunk_info_reader.cc
index 389e5c33e..aae200da5 100644
--- a/test/test_chunk_info_reader.cc
+++ b/test/test_chunk_info_reader.cc
@@ -47,24 +47,24 @@ TEST_CASE("test_vertex_property_chunk_info_reader") {
REQUIRE(maybe_chunk_path.status().ok());
std::string chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part0/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk0");
REQUIRE(reader.seek(520).ok());
maybe_chunk_path = reader.GetChunk();
REQUIRE(maybe_chunk_path.status().ok());
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part5/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk5");
REQUIRE(reader.next_chunk().ok());
maybe_chunk_path = reader.GetChunk();
REQUIRE(maybe_chunk_path.status().ok());
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part6/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk6");
REQUIRE(reader.seek(900).ok());
maybe_chunk_path = reader.GetChunk();
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
- TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part9/chunk0");
+ TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk9");
// now is end of the chunks
REQUIRE(reader.next_chunk().IsOutOfRange());
diff --git a/test/test_info.cc b/test/test_info.cc
index f47e6a064..fa91d6c0a 100644
--- a/test/test_info.cc
+++ b/test/test_info.cc
@@ -130,7 +130,7 @@ TEST_CASE("test_vertex_info") {
// test get file path
auto maybe_path = v_info.GetFilePath(pg, 0);
REQUIRE(!maybe_path.has_error());
- REQUIRE(maybe_path.value() == expected_dir_path + "part0/chunk0");
+ REQUIRE(maybe_path.value() == expected_dir_path + "chunk0");
// property group not exist
REQUIRE(v_info.GetFilePath(pg2, 0).status().IsKeyError());
@@ -194,7 +194,7 @@ TEST_CASE("test_edge_info") {
edge_info.GetAdjListOffsetFilePath(0, adj_list_type);
REQUIRE(!adj_list_offset_file_path.has_error());
REQUIRE(adj_list_offset_file_path.value() ==
- edge_info.GetPrefix() + adj_prefix + "offset/part0/chunk0");
+ edge_info.GetPrefix() + adj_prefix + "offset/chunk0");
auto adj_list_offset_dir_path =
edge_info.GetAdjListOffsetDirPath(adj_list_type);
REQUIRE(!adj_list_offset_dir_path.has_error());