diff --git a/examples/construct_info_example.cc b/examples/construct_info_example.cc index b3d3a880b..e8b05993d 100644 --- a/examples/construct_info_example.cc +++ b/examples/construct_info_example.cc @@ -67,7 +67,7 @@ int main(int argc, char* argv[]) { assert(!vertex_info.IsPrimaryKey(gender.name).status().ok()); assert(vertex_info.GetPropertyType(id.name).value() == id.type); assert(vertex_info.GetFilePath(group1, 0).value() == - "vertex/person/id/part0/chunk0"); + "vertex/person/id/chunk0"); // extend property groups & validate auto result = vertex_info.Extend(group2); @@ -135,7 +135,7 @@ int main(int argc, char* argv[]) { .GetAdjListOffsetFilePath( 0, GAR_NAMESPACE::AdjListType::ordered_by_dest) .value() == - "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0"); + "edge/person_knows_person/ordered_by_dest/offset/chunk0"); // add property group & validate GAR_NAMESPACE::Property creationDate = { diff --git a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala index 2edcb0faf..b34c061e0 100644 --- a/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala +++ b/spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala @@ -22,6 +22,7 @@ object FileSystem { val spark = dataFrame.sparkSession spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") spark.conf.set("parquet.enable.summary-metadata", "false") + spark.conf.set("spark.sql.parquet.compression.codec", "zstd") dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix) renameSparkGeneratedFiles(spark, outputPrefix) } diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala index 1e5b0d70f..9c7fa7dbc 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala @@ -4,14 +4,12 @@ import com.alibaba.graphar.utils.{FileSystem, VertexChunkPartitioner} import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup} import org.apache.spark.sql.SparkSession -import org.apache.spark.rdd.OrderedRDDFunctions import org.apache.spark.sql.Row import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, IntegerType} +import org.apache.spark.sql.types.{LongType, StructField} import org.apache.spark.util.Utils import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ -import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions import scala.collection.SortedMap import scala.collection.mutable.ArrayBuffer @@ -90,6 +88,13 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V private var chunks: Seq[DataFrame] = preprocess() private def preprocess(): Seq[DataFrame] = { + // check the src index and dst index column exist + val src_filed = StructField(GeneralParams.srcIndexCol, LongType) + val dst_filed = StructField(GeneralParams.dstIndexCol, LongType) + val schema = edgeDf.schema() + if (schema.contains(src_filed) == false || schema.contains(dst_filed) == false) { + throw new IllegalArgumentException + } var vertex_chunk_size: Long = 0 var primaryColName: String = "" if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.unordered_by_source) { @@ -114,12 +119,12 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V val file_type = edgeInfo.getAdjListFileType(adjListType) var chunk_index: Long = 0 for (chunk <- chunks) { - val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/" + val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) if (adjListType == AdjListType.ordered_by_source) { - val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol) + val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count") FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix) } else { - val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol) + val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count") FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix) } chunk_index = chunk_index + 1 diff --git a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala index 6d9dc4852..fdaea4d16 100644 --- a/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala +++ b/spark/src/main/scala/com/alibaba/graphar/writer/VertexWriter.scala @@ -10,6 +10,7 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.rdd.RDD import org.apache.spark.HashPartitioner +import org.apache.spark.sql.types.{LongType, StructField} import scala.collection.SortedMap import scala.collection.mutable.ArrayBuffer @@ -34,6 +35,11 @@ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame) private val spark = vertexDf.sparkSession def writeVertexProperties(propertyGroup: PropertyGroup): Unit = { + // check if vertex dataframe contains the index_filed + val index_filed = StructField(GeneralParams.vertexIndexCol, LongType) + if (vertexDf.schema.contains(index_filed) == false) { + throw new IllegalArgumentException + } if (chunks.isEmpty) { chunks = VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size()) } diff --git a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala index fd2352a51..40a18f545 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala @@ -61,8 +61,8 @@ class GraphInfoSuite extends AnyFunSuite { assert(vertex_info.containPropertyGroup(property_group)) assert(vertex_info.getPropertyType("id") == GarType.INT64) assert(vertex_info.isPrimaryKey("id")) - assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0") - assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0") + assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/chunk0") + assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/chunk4") assert(vertex_info.getDirPath(property_group) == "vertex/person/id/") assert(vertex_info.containProperty("firstName")) @@ -73,8 +73,8 @@ class GraphInfoSuite extends AnyFunSuite { assert(vertex_info.containPropertyGroup(property_group_2)) assert(vertex_info.getPropertyType("firstName") == GarType.STRING) assert(vertex_info.isPrimaryKey("firstName") == false) - assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0") - assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0") + assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/chunk0") + assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/chunk4") assert(vertex_info.getDirPath(property_group_2) == "vertex/person/firstName_lastName_gender/") assert(vertex_info.containProperty("not_exist") == false) @@ -112,8 +112,8 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2") assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/") assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/") - assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0") - assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0") + assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk0") + assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk4") assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/") val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0) assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source)) @@ -136,8 +136,8 @@ class GraphInfoSuite extends AnyFunSuite { assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2") assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/") assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/") - assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0") - assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0") + assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk0") + assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk4") assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/") val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0) assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest)) diff --git a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala index bd3fd525c..9727e6896 100644 --- a/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala +++ b/spark/src/test/scala/com/alibaba/graphar/TestWriter.scala @@ -21,14 +21,14 @@ class WriterSuite extends AnyFunSuite { val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) // read graph yaml - val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath) val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) val graph_input = fs.open(graph_yaml_path) val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo] // read vertex yaml - val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) + val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath) val vertex_input = fs.open(vertex_yaml_path) val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] @@ -50,14 +50,14 @@ class WriterSuite extends AnyFunSuite { val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path) // read graph yaml - val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath) val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) val graph_input = fs.open(graph_yaml_path) val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo] // read edge yaml - val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) + val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person_knows_person.edge.yml").getPath) val edge_input = fs.open(edge_yaml_path) val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] @@ -84,20 +84,20 @@ class WriterSuite extends AnyFunSuite { // read graph yaml - val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath) + val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath) val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration) val graph_input = fs.open(graph_yaml_path) val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo])) val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo] // read vertex yaml - val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath) + val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath) val vertex_input = fs.open(vertex_yaml_path) val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo])) val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo] // read edge yaml - val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath) + val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person_knows_person.edge.yml").getPath) val edge_input = fs.open(edge_yaml_path) val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo])) val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo] diff --git a/test/test_arrow_chunk_writer.cc b/test/test_arrow_chunk_writer.cc index d070dcec3..9e0295ff4 100644 --- a/test/test_arrow_chunk_writer.cc +++ b/test/test_arrow_chunk_writer.cc @@ -79,9 +79,9 @@ TEST_CASE("test_orc_and_parquet_reader") { arrow::Status st; arrow::MemoryPool* pool = arrow::default_memory_pool(); std::string path1 = TEST_DATA_DIR + "/ldbc_sample/orc" + - "/vertex/person/firstName_lastName_gender/part1/chunk0"; + "/vertex/person/firstName_lastName_gender/chunk1"; std::string path2 = TEST_DATA_DIR + "/ldbc_sample/parquet" + - "/vertex/person/firstName_lastName_gender/part1/chunk0"; + "/vertex/person/firstName_lastName_gender/chunk1"; arrow::io::IOContext io_context = arrow::io::default_io_context(); // Open ORC file reader diff --git a/test/test_chunk_info_reader.cc b/test/test_chunk_info_reader.cc index 389e5c33e..aae200da5 100644 --- a/test/test_chunk_info_reader.cc +++ b/test/test_chunk_info_reader.cc @@ -47,24 +47,24 @@ TEST_CASE("test_vertex_property_chunk_info_reader") { REQUIRE(maybe_chunk_path.status().ok()); std::string chunk_path = maybe_chunk_path.value(); REQUIRE(chunk_path == - TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part0/chunk0"); + TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk0"); REQUIRE(reader.seek(520).ok()); maybe_chunk_path = reader.GetChunk(); REQUIRE(maybe_chunk_path.status().ok()); chunk_path = maybe_chunk_path.value(); REQUIRE(chunk_path == - TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part5/chunk0"); + TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk5"); REQUIRE(reader.next_chunk().ok()); maybe_chunk_path = reader.GetChunk(); REQUIRE(maybe_chunk_path.status().ok()); chunk_path = maybe_chunk_path.value(); REQUIRE(chunk_path == - TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part6/chunk0"); + TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk6"); REQUIRE(reader.seek(900).ok()); maybe_chunk_path = reader.GetChunk(); chunk_path = maybe_chunk_path.value(); REQUIRE(chunk_path == - TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part9/chunk0"); + TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk9"); // now is end of the chunks REQUIRE(reader.next_chunk().IsOutOfRange()); diff --git a/test/test_info.cc b/test/test_info.cc index f47e6a064..fa91d6c0a 100644 --- a/test/test_info.cc +++ b/test/test_info.cc @@ -130,7 +130,7 @@ TEST_CASE("test_vertex_info") { // test get file path auto maybe_path = v_info.GetFilePath(pg, 0); REQUIRE(!maybe_path.has_error()); - REQUIRE(maybe_path.value() == expected_dir_path + "part0/chunk0"); + REQUIRE(maybe_path.value() == expected_dir_path + "chunk0"); // property group not exist REQUIRE(v_info.GetFilePath(pg2, 0).status().IsKeyError()); @@ -194,7 +194,7 @@ TEST_CASE("test_edge_info") { edge_info.GetAdjListOffsetFilePath(0, adj_list_type); REQUIRE(!adj_list_offset_file_path.has_error()); REQUIRE(adj_list_offset_file_path.value() == - edge_info.GetPrefix() + adj_prefix + "offset/part0/chunk0"); + edge_info.GetPrefix() + adj_prefix + "offset/chunk0"); auto adj_list_offset_dir_path = edge_info.GetAdjListOffsetDirPath(adj_list_type); REQUIRE(!adj_list_offset_dir_path.has_error());