Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Dec 23, 2022
1 parent 6e42cfe commit dbda188
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 19 deletions.
72 changes: 71 additions & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
<scala.binary.version>2.12</scala.binary.version>
<PermGen>512m</PermGen>
<MaxPermGen>1024m</MaxPermGen>
<spark.version>3.1.1</spark.version>
<spark.version>3.2.0</spark.version>
<maven.compiler.release>8</maven.compiler.release>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<cupid.sdk.version>3.3.14-SNAPSHOT</cupid.sdk.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -68,6 +69,34 @@
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
<version>${cupid.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -119,6 +148,47 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<artifactSet>
<includes>
<!-- Include here the dependencies you
want to be packed in your fat jar -->
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>**/log4j.properties</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.alibaba.graphar.utils

import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs

object FileSystem {
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
val sc = spark.sparkContext
val file_system = fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
val path_pattern = new fs.Path(filePrefix + "part*")
val files = file_system.globStatus(path_pattern)
for (i <- 0 until files.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ object IndexGenerator {
def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
dstIndexMapping.createOrReplaceTempView("dst_vertex")
edgeDf.createOrReplaceTempView("edge")
edgeDf.createOrReplaceTempView("edges")
val dstCol = GeneralParams.dstIndexCol;
val indexCol = GeneralParams.vertexIndexCol;
val dstPrimaryKey = GeneralParams.primaryCol;
val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s")
val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edges.* from edges inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edges.$dstColumnName%s")
// drop the old dst id col
trans_df.drop(dstColumnName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ class VertexWriter(prefix: String, vertexInfo: VertexInfo, vertexDf: DataFrame)

def writeVertexProperties(propertyGroup: PropertyGroup): Unit = {
if (chunks.isEmpty) {
val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertexDf)
chunks = VertexWriter.repartitionAndSort(vertex_df_with_index, vertexInfo.getChunk_size())
chunks = VertexWriter.repartitionAndSort(vertexDf, vertexInfo.getChunk_size())
}

// write out the chunks
Expand Down
24 changes: 18 additions & 6 deletions spark/src/test/scala/com/alibaba/graphar/TestGraphInfo.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package com.alibaba.graphar

import java.io.{File, FileInputStream}
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import scala.beans.BeanProperty
import org.scalatest.funsuite.AnyFunSuite
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.SparkSession

class GraphInfoSuite extends AnyFunSuite {
val spark = SparkSession.builder()
.enableHiveSupport()
.master("local[*]")
.getOrCreate()

test("load graph info") {
val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml")
val yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = yaml.load(input).asInstanceOf[GraphInfo]
// read graph yaml
val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(input).asInstanceOf[GraphInfo]

assert(graph_info.getName == "ldbc_sample")
assert(graph_info.getPrefix == "" )
Expand All @@ -27,7 +35,9 @@ class GraphInfoSuite extends AnyFunSuite {
}

test("load vertex info") {
val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml")
val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = yaml.load(input).asInstanceOf[VertexInfo]

Expand Down Expand Up @@ -74,7 +84,9 @@ class GraphInfoSuite extends AnyFunSuite {
}

test("load edge info") {
val input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml")
val yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
val fs = FileSystem.get(yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = fs.open(yaml_path)
val yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = yaml.load(input).asInstanceOf[EdgeInfo]

Expand Down
82 changes: 75 additions & 7 deletions spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,112 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.funsuite.AnyFunSuite
import org.yaml.snakeyaml.Yaml
import org.yaml.snakeyaml.constructor.Constructor
import org.apache.hadoop.fs.{Path, FileSystem}

class WriterSuite extends AnyFunSuite {
val spark = SparkSession.builder()
.enableHiveSupport()
.master("local[*]")
.getOrCreate()

test("test vertex writer") {
test("test vertex writer with only vertex table") {
// read vertex dataframe
val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath
val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)

val graph_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml")
// read graph yaml
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val graph_input = fs.open(graph_yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo]

val vertex_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person.vertex.yml")
// read vertex yaml
val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
val vertex_input = fs.open(vertex_yaml_path)
val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]

val writer = new VertexWriter(graph_info.getPrefix(), vertex_info, vertex_df)
// generate vertex index column for vertex dataframe
val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df)

// create writer object for person and generate the properties with GAR format
val writer = new VertexWriter(graph_info.getPrefix(), vertex_info, vertex_df_with_index)
writer.writeVertexProperties()

// close FileSystem instance
fs.close()
}

test("test edge writer") {
test("test edge writer with only edge table") {
// read edge dataframe
val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath
val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)

val graph_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml")
// read graph yaml
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val graph_input = fs.open(graph_yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo]

val edge_input = getClass.getClassLoader.getResourceAsStream("gar-test/ldbc_sample/csv/person_knows_person.edge.yml")
// read edge yaml
val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
val edge_input = fs.open(edge_yaml_path)
val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]

// generate vertex index for edge dataframe
val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst")

// create writer object for person_knows_person and generate the adj list and properties with GAR format
val writer = new EdgeWriter(graph_info.getPrefix(), edge_info, AdjListType.ordered_by_source, edge_df_with_index)
writer.writeEdges()

// close FileSystem instance
fs.close()
}

test("test edge writer with vertex table and edge table") {
// read vertex dataframe
val vertex_file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_0_0.csv").getPath
val vertex_df = spark.read.option("delimiter", "|").option("header", "true").csv(vertex_file_path)

// read edge dataframe
val file_path = getClass.getClassLoader.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv").getPath
val edge_df = spark.read.option("delimiter", "|").option("header", "true").csv(file_path)


// read graph yaml
val graph_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/ldbc_sample.graph.yml").getPath)
val fs = FileSystem.get(graph_yaml_path.toUri(), spark.sparkContext.hadoopConfiguration)
val graph_input = fs.open(graph_yaml_path)
val graph_yaml = new Yaml(new Constructor(classOf[GraphInfo]))
val graph_info = graph_yaml.load(graph_input).asInstanceOf[GraphInfo]

// read vertex yaml
val vertex_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person.vertex.yml").getPath)
val vertex_input = fs.open(vertex_yaml_path)
val vertex_yaml = new Yaml(new Constructor(classOf[VertexInfo]))
val vertex_info = vertex_yaml.load(vertex_input).asInstanceOf[VertexInfo]

// read edge yaml
val edge_yaml_path = new Path(getClass.getClassLoader.getResource("gar-test/ldbc_sample/csv/person_knows_person.edge.yml").getPath)
val edge_input = fs.open(edge_yaml_path)
val edge_yaml = new Yaml(new Constructor(classOf[EdgeInfo]))
val edge_info = edge_yaml.load(edge_input).asInstanceOf[EdgeInfo]

// construct person vertex mapping with dataframe
val vertex_mapping = IndexGenerator.constructVertexIndexMapping(vertex_df, vertex_info.getPrimaryKey())
// generate src index and dst index for edge datafram with vertex mapping
val edge_df_with_src_index = IndexGenerator.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping)
val edge_df_with_src_dst_index = IndexGenerator.generateDstIndexForEdgesFromMapping(edge_df_with_src_index, "dst", vertex_mapping)

// create writer object for person_knows_person and generate the adj list and properties with GAR format
val writer = new EdgeWriter(graph_info.getPrefix(), edge_info, AdjListType.ordered_by_source, edge_df_with_src_dst_index)
writer.writeEdges()

// close FileSystem instance
fs.close()
}
}

0 comments on commit dbda188

Please sign in to comment.