Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Dec 26, 2022
1 parent 0e0a345 commit 9ce48e1
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 31 deletions.
4 changes: 2 additions & 2 deletions examples/construct_info_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int main(int argc, char* argv[]) {
assert(!vertex_info.IsPrimaryKey(gender.name).status().ok());
assert(vertex_info.GetPropertyType(id.name).value() == id.type);
assert(vertex_info.GetFilePath(group1, 0).value() ==
"vertex/person/id/part0/chunk0");
"vertex/person/id/chunk0");

// extend property groups & validate
auto result = vertex_info.Extend(group2);
Expand Down Expand Up @@ -135,7 +135,7 @@ int main(int argc, char* argv[]) {
.GetAdjListOffsetFilePath(
0, GAR_NAMESPACE::AdjListType::ordered_by_dest)
.value() ==
"edge/person_knows_person/ordered_by_dest/offset/part0/chunk0");
"edge/person_knows_person/ordered_by_dest/offset/chunk0");

// add property group & validate
GAR_NAMESPACE::Property creationDate = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object FileSystem {
val spark = dataFrame.sparkSession
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
renameSparkGeneratedFiles(spark, outputPrefix)
}
Expand Down
17 changes: 11 additions & 6 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import com.alibaba.graphar.utils.{FileSystem, VertexChunkPartitioner}
import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}

import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.OrderedRDDFunctions
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType, IntegerType}
import org.apache.spark.sql.types.{LongType, StructField}
import org.apache.spark.util.Utils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.rdd.RDD.rddToOrderedRDDFunctions

import scala.collection.SortedMap
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -90,6 +88,13 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
private var chunks: Seq[DataFrame] = preprocess()

private def preprocess(): Seq[DataFrame] = {
// check the src index and dst index column exist
val src_filed = StructField(GeneralParams.srcIndexCol, LongType)
val dst_filed = StructField(GeneralParams.dstIndexCol, LongType)
val schema = edgeDf.schema()
if (schema.contains(src_filed) == false || schema.contains(dst_filed) == false) {
throw new IllegalArgumentException
}
var vertex_chunk_size: Long = 0
var primaryColName: String = ""
if (adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.unordered_by_source) {
Expand All @@ -114,12 +119,12 @@ class EdgeWriter(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
val file_type = edgeInfo.getAdjListFileType(adjListType)
var chunk_index: Long = 0
for (chunk <- chunks) {
val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType) + "part" + chunk_index.toString + "/"
val output_prefix = prefix + edgeInfo.getAdjListOffsetDirPath(adjListType)
if (adjListType == AdjListType.ordered_by_source) {
val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol)
val offset_chunk = chunk.select(GeneralParams.srcIndexCol).groupBy(GeneralParams.srcIndexCol).count().coalesce(1).orderBy(GeneralParams.srcIndexCol).select("count")
FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
} else {
val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol)
val offset_chunk = chunk.select(GeneralParams.dstIndexCol).groupBy(GeneralParams.dstIndexCol).count().coalesce(1).orderBy(GeneralParams.dstIndexCol).select("count")
FileSystem.writeDataFrame(offset_chunk, FileType.FileTypeToString(file_type), output_prefix)
}
chunk_index = chunk_index + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.types.{LongType, StructField}

import scala.collection.SortedMap
import scala.collection.mutable.ArrayBuffer
Expand All @@ -34,6 +35,11 @@ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame)
private val spark = vertexDf.sparkSession

def writeVertexProperties(propertyGroup: PropertyGroup): Unit = {
// check if vertex dataframe contains the index_filed
val index_filed = StructField(GeneralParams.vertexIndexCol, LongType)
if (vertexDf.schema.contains(index_filed) == false) {
throw new IllegalArgumentException
}
if (chunks.isEmpty) {
chunks = VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size())
}
Expand Down
16 changes: 8 additions & 8 deletions spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(vertex_info.containPropertyGroup(property_group))
assert(vertex_info.getPropertyType("id") == GarType.INT64)
assert(vertex_info.isPrimaryKey("id"))
assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/part0/chunk0")
assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/part4/chunk0")
assert(vertex_info.getFilePath(property_group, 0) == "vertex/person/id/chunk0")
assert(vertex_info.getFilePath(property_group, 4) == "vertex/person/id/chunk4")
assert(vertex_info.getDirPath(property_group) == "vertex/person/id/")

assert(vertex_info.containProperty("firstName"))
Expand All @@ -73,8 +73,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(vertex_info.containPropertyGroup(property_group_2))
assert(vertex_info.getPropertyType("firstName") == GarType.STRING)
assert(vertex_info.isPrimaryKey("firstName") == false)
assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/part0/chunk0")
assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/part4/chunk0")
assert(vertex_info.getFilePath(property_group_2, 0) == "vertex/person/firstName_lastName_gender/chunk0")
assert(vertex_info.getFilePath(property_group_2, 4) == "vertex/person/firstName_lastName_gender/chunk4")
assert(vertex_info.getDirPath(property_group_2) == "vertex/person/firstName_lastName_gender/")

assert(vertex_info.containProperty("not_exist") == false)
Expand Down Expand Up @@ -112,8 +112,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/chunk2")
assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/part1/")
assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/adj_list/")
assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part0/chunk0")
assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/part4/chunk0")
assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk0")
assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/chunk4")
assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_source) == "edge/person_knows_person/ordered_by_source/offset/")
val property_group = edge_info.getPropertyGroups(AdjListType.ordered_by_source).get(0)
assert(edge_info.containPropertyGroup(property_group, AdjListType.ordered_by_source))
Expand All @@ -136,8 +136,8 @@ class GraphInfoSuite extends AnyFunSuite {
assert(edge_info.getAdjListFilePath(1, 2, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/chunk2")
assert(edge_info.getAdjListFilePath(1, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/part1/")
assert(edge_info.getAdjListDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/adj_list/")
assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part0/chunk0")
assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/part4/chunk0")
assert(edge_info.getAdjListOffsetFilePath(0, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk0")
assert(edge_info.getAdjListOffsetFilePath(4, AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/chunk4")
assert(edge_info.getAdjListOffsetDirPath(AdjListType.ordered_by_dest) == "edge/person_knows_person/ordered_by_dest/offset/")
val property_group_2 = edge_info.getPropertyGroups(AdjListType.ordered_by_dest).get(0)
assert(edge_info.containPropertyGroup(property_group_2, AdjListType.ordered_by_dest))
Expand Down
14 changes: 7 additions & 7 deletions spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ class WriterSuite extends AnyFunSuite {
val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)

// read graph yaml
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val graph_input = fs.open(graph_yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo]

// read vertex yaml
val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath)
val vertex_input = fs.open(vertex_yaml_path)
val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]
Expand All @@ -50,14 +50,14 @@ class WriterSuite extends AnyFunSuite {
val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)

// read graph yaml
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val graph_input = fs.open(graph_yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo]

// read edge yaml
val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person_knows_person.edge.yml").getPath)
val edge_input = fs.open(edge_yaml_path)
val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
Expand All @@ -84,20 +84,20 @@ class WriterSuite extends AnyFunSuite {


// read graph yaml
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val graph_input = fs.open(graph_yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo]

// read vertex yaml
val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person.vertex.yml").getPath)
val vertex_input = fs.open(vertex_yaml_path)
val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]

// read edge yaml
val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/parquet/person_knows_person.edge.yml").getPath)
val edge_input = fs.open(edge_yaml_path)
val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]
Expand Down
4 changes: 2 additions & 2 deletions test/test_arrow_chunk_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ TEST_CASE("test_orc_and_parquet_reader") {
arrow::Status st;
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::string path1 = TEST_DATA_DIR + "/ldbc_sample/orc" +
"/vertex/person/firstName_lastName_gender/part1/chunk0";
"/vertex/person/firstName_lastName_gender/chunk1";
std::string path2 = TEST_DATA_DIR + "/ldbc_sample/parquet" +
"/vertex/person/firstName_lastName_gender/part1/chunk0";
"/vertex/person/firstName_lastName_gender/chunk1";
arrow::io::IOContext io_context = arrow::io::default_io_context();

// Open ORC file reader
Expand Down
8 changes: 4 additions & 4 deletions test/test_chunk_info_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ TEST_CASE("test_vertex_property_chunk_info_reader") {
REQUIRE(maybe_chunk_path.status().ok());
std::string chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part0/chunk0");
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk0");
REQUIRE(reader.seek(520).ok());
maybe_chunk_path = reader.GetChunk();
REQUIRE(maybe_chunk_path.status().ok());
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part5/chunk0");
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk5");
REQUIRE(reader.next_chunk().ok());
maybe_chunk_path = reader.GetChunk();
REQUIRE(maybe_chunk_path.status().ok());
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part6/chunk0");
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk6");
REQUIRE(reader.seek(900).ok());
maybe_chunk_path = reader.GetChunk();
chunk_path = maybe_chunk_path.value();
REQUIRE(chunk_path ==
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/part9/chunk0");
TEST_DATA_DIR + "/ldbc_sample/parquet/vertex/person/id/chunk9");
// now is end of the chunks
REQUIRE(reader.next_chunk().IsOutOfRange());

Expand Down
4 changes: 2 additions & 2 deletions test/test_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ TEST_CASE("test_vertex_info") {
// test get file path
auto maybe_path = v_info.GetFilePath(pg, 0);
REQUIRE(!maybe_path.has_error());
REQUIRE(maybe_path.value() == expected_dir_path + "part0/chunk0");
REQUIRE(maybe_path.value() == expected_dir_path + "chunk0");
// property group not exist
REQUIRE(v_info.GetFilePath(pg2, 0).status().IsKeyError());

Expand Down Expand Up @@ -194,7 +194,7 @@ TEST_CASE("test_edge_info") {
edge_info.GetAdjListOffsetFilePath(0, adj_list_type);
REQUIRE(!adj_list_offset_file_path.has_error());
REQUIRE(adj_list_offset_file_path.value() ==
edge_info.GetPrefix() + adj_prefix + "offset/part0/chunk0");
edge_info.GetPrefix() + adj_prefix + "offset/chunk0");
auto adj_list_offset_dir_path =
edge_info.GetAdjListOffsetDirPath(adj_list_type);
REQUIRE(!adj_list_offset_dir_path.has_error());
Expand Down

0 comments on commit 9ce48e1

Please sign in to comment.