Skip to content

Commit

Permalink
Initialize the implementation of spark writer (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen authored Dec 29, 2022
1 parent 08a654b commit 4863b5d
Show file tree
Hide file tree
Showing 19 changed files with 810 additions and 33 deletions.
2 changes: 1 addition & 1 deletion examples/construct_info_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int main(int argc, char* argv[]) {
assert(!vertex_info.IsPrimaryKey(gender.name).status().ok());
assert(vertex_info.GetPropertyType(id.name).value() == id.type);
assert(vertex_info.GetFilePath(group1, 0).value() ==
"vertex/person/id/part0/chunk0");
"vertex/person/id/chunk0");

// extend property groups & validate
auto result = vertex_info.Extend(group2);
Expand Down
6 changes: 3 additions & 3 deletions include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ class VertexInfo {
return Status::KeyError(
"Vertex info does not contain the property group.");
}
return prefix_ + property_group.GetPrefix() + "part" +
std::to_string(chunk_index) + "/" + "chunk0";
return prefix_ + property_group.GetPrefix() + "chunk" +
std::to_string(chunk_index);
}

/// Get the chunk files directory path of property group
Expand Down Expand Up @@ -562,7 +562,7 @@ class EdgeInfo {
return Status::KeyError("The adj list type is not found in edge info.");
}
return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/part" +
std::to_string(vertex_chunk_index) + "/" + "chunk0";
std::to_string(vertex_chunk_index) + "/chunk0";
}

/// Get the adj list offset chunk file directory path of adj list type
Expand Down
72 changes: 71 additions & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
<scala.binary.version>2.12</scala.binary.version>
<PermGen>512m</PermGen>
<MaxPermGen>1024m</MaxPermGen>
<spark.version>3.1.1</spark.version>
<spark.version>3.2.0</spark.version>
<maven.compiler.release>8</maven.compiler.release>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -68,6 +69,34 @@
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.11</artifactId>
<version>${cupid.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -119,6 +148,47 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<artifactSet>
<includes>
<!-- Include here the dependencies you
want to be packed in your fat jar -->
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>**/log4j.properties</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<packaging>jar</packaging>
Expand Down
15 changes: 15 additions & 0 deletions spark/src/main/java/com/alibaba/graphar/GeneralParams.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar;

public class GeneralParams {
Expand Down
38 changes: 36 additions & 2 deletions spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import java.io.{File, FileInputStream}
Expand Down Expand Up @@ -210,8 +225,8 @@ class EdgeInfo() {
def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = {
if (containAdjList(adj_list_type) == false)
throw new IllegalArgumentException
val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/part" +
chunk_index.toString() + "/chunk0"
val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/chunk" +
chunk_index.toString()
return str
}

Expand Down Expand Up @@ -256,6 +271,25 @@ class EdgeInfo() {
return str
}

def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long) : String = {
if (containPropertyGroup(property_group, adj_list_type) == false)
throw new IllegalArgumentException
var str: String = property_group.getPrefix
if (str == "") {
val properties = property_group.getProperties
val num = properties.size
for ( j <- 0 to num - 1 ) {
if (j > 0)
str += GeneralParams.regularSeperator
str += properties.get(j).getName;
}
str += "/"
}
str = prefix + getAdjListPrefix(adj_list_type) + str + "part" +
vertex_chunk_index.toString() + "/"
return str
}

def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = {
if (containPropertyGroup(property_group, adj_list_type) == false)
throw new IllegalArgumentException
Expand Down
15 changes: 15 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import java.io.{File, FileInputStream}
Expand Down
18 changes: 17 additions & 1 deletion spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import java.io.{File, FileInputStream}
Expand Down Expand Up @@ -133,7 +148,7 @@ class VertexInfo() {
} else {
str = property_group.getPrefix
}
return prefix + str + "part" + chunk_index.toString() + "/chunk0"
return prefix + str + "chunk" + chunk_index.toString()
}

def getDirPath(property_group: PropertyGroup): String = {
Expand All @@ -148,6 +163,7 @@ class VertexInfo() {
str += GeneralParams.regularSeperator
str += properties.get(j).getName;
}
str += "/"
} else {
str = property_group.getPrefix
}
Expand Down
44 changes: 44 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.utils

import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs

object FileSystem {
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
val sc = spark.sparkContext
val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
val path_pattern = new fs.Path(filePrefix + "part*")
val files = file_system.globStatus(path_pattern)
for (i <- 0 until files.length) {
val file_name = files(i).getPath.getName
val new_file_name = "chunk" + i.toString
file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name))
}
}

def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = {
val spark = dataFrame.sparkSession
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
// spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
renameSparkGeneratedFiles(spark, outputPrefix)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
package com.alibaba.graphar
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.utils

import com.alibaba.graphar.GeneralParams

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -78,11 +95,11 @@ object IndexGenerator {
def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
dstIndexMapping.createOrReplaceTempView("dst_vertex")
edgeDf.createOrReplaceTempView("edge")
edgeDf.createOrReplaceTempView("edges")
val dstCol = GeneralParams.dstIndexCol;
val indexCol = GeneralParams.vertexIndexCol;
val dstPrimaryKey = GeneralParams.primaryCol;
val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s")
val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edges.* from edges inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edges.$dstColumnName%s")
// drop the old dst id col
trans_df.drop(dstColumnName)
}
Expand Down
42 changes: 42 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/utils/Patitioner.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.utils

import org.apache.spark.sql.types._
import org.apache.spark.Partitioner


class ChunkPartitioner(partitions: Int, chunk_size: Long) extends Partitioner {
require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

def numPartitions: Int = partitions

def chunkSize: Long = chunk_size

def getPartition(key: Any): Int = key match {
case null => 0
case _ => (key.asInstanceOf[Long] / chunk_size).toInt
}

override def equals(other: Any): Boolean = other match {
case h: ChunkPartitioner =>
h.numPartitions == numPartitions
case _ =>
false
}

override def hashCode: Int = numPartitions
}
Loading

0 comments on commit 4863b5d

Please sign in to comment.