Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize the implementation of spark writer #51

Merged
merged 13 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[submodule "test/gar-test"]
path = test/gar-test
url = https://github.com/GraphScope/gar-test.git
url = https://github.com/acezen/gar-test.git
[submodule "thirdparty/yaml-cpp"]
path = thirdparty/yaml-cpp
url = https://github.com/jbeder/yaml-cpp.git
Expand Down
2 changes: 1 addition & 1 deletion examples/construct_info_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ int main(int argc, char* argv[]) {
assert(!vertex_info.IsPrimaryKey(gender.name).status().ok());
assert(vertex_info.GetPropertyType(id.name).value() == id.type);
assert(vertex_info.GetFilePath(group1, 0).value() ==
"vertex/person/id/part0/chunk0");
"vertex/person/id/chunk0");

// extend property groups & validate
auto result = vertex_info.Extend(group2);
Expand Down
6 changes: 3 additions & 3 deletions include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ class VertexInfo {
return Status::KeyError(
"Vertex info does not contain the property group.");
}
return prefix_ + property_group.GetPrefix() + "part" +
std::to_string(chunk_index) + "/" + "chunk0";
return prefix_ + property_group.GetPrefix() + "chunk" +
std::to_string(chunk_index);
}

/// Get the chunk files directory path of property group
Expand Down Expand Up @@ -562,7 +562,7 @@ class EdgeInfo {
return Status::KeyError("The adj list type is not found in edge info.");
}
return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/part" +
std::to_string(vertex_chunk_index) + "/" + "chunk0";
std::to_string(vertex_chunk_index) + "/chunk0";
}

/// Get the adj list offset chunk file directory path of adj list type
Expand Down
72 changes: 71 additions & 1 deletion spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
<scala.binary.version>2.12</scala.binary.version>
<PermGen>512m</PermGen>
<MaxPermGen>1024m</MaxPermGen>
<spark.version>3.1.1</spark.version>
<spark.version>3.2.0</spark.version>
<maven.compiler.release>8</maven.compiler.release>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<cupid.sdk.version>3.3.8-public</cupid.sdk.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -68,6 +69,34 @@
<artifactId>snakeyaml</artifactId>
<version>1.26</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>hadoop-fs-oss</artifactId>
<version>${cupid.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.11</artifactId>
<version>${cupid.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>cupid-sdk</artifactId>
<version>${cupid.sdk.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down Expand Up @@ -119,6 +148,47 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>false</minimizeJar>
<shadedArtifactAttached>true</shadedArtifactAttached>
<artifactSet>
<includes>
<!-- Include here the dependencies you
want to be packed in your fat jar -->
<include>*:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>**/log4j.properties</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<packaging>jar</packaging>
Expand Down
15 changes: 15 additions & 0 deletions spark/src/main/java/com/alibaba/graphar/GeneralParams.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar;

public class GeneralParams {
Expand Down
38 changes: 36 additions & 2 deletions spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import java.io.{File, FileInputStream}
Expand Down Expand Up @@ -210,8 +225,8 @@ class EdgeInfo() {
def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = {
if (containAdjList(adj_list_type) == false)
throw new IllegalArgumentException
val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/part" +
chunk_index.toString() + "/chunk0"
val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/chunk" +
chunk_index.toString()
return str
}

Expand Down Expand Up @@ -256,6 +271,25 @@ class EdgeInfo() {
return str
}

def getPropertyFilePath(property_group: PropertyGroup, adj_list_type: AdjListType.Value, vertex_chunk_index: Long) : String = {
if (containPropertyGroup(property_group, adj_list_type) == false)
throw new IllegalArgumentException
var str: String = property_group.getPrefix
if (str == "") {
val properties = property_group.getProperties
val num = properties.size
for ( j <- 0 to num - 1 ) {
if (j > 0)
str += GeneralParams.regularSeperator
str += properties.get(j).getName;
}
str += "/"
}
str = prefix + getAdjListPrefix(adj_list_type) + str + "part" +
vertex_chunk_index.toString() + "/"
return str
}

def getPropertyDirPath(property_group: PropertyGroup, adj_list_type: AdjListType.Value) : String = {
if (containPropertyGroup(property_group, adj_list_type) == false)
throw new IllegalArgumentException
Expand Down
15 changes: 15 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import java.io.{File, FileInputStream}
Expand Down
18 changes: 17 additions & 1 deletion spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar

import java.io.{File, FileInputStream}
Expand Down Expand Up @@ -133,7 +148,7 @@ class VertexInfo() {
} else {
str = property_group.getPrefix
}
return prefix + str + "part" + chunk_index.toString() + "/chunk0"
return prefix + str + "chunk" + chunk_index.toString()
}

def getDirPath(property_group: PropertyGroup): String = {
Expand All @@ -148,6 +163,7 @@ class VertexInfo() {
str += GeneralParams.regularSeperator
str += properties.get(j).getName;
}
str += "/"
} else {
str = property_group.getPrefix
}
Expand Down
44 changes: 44 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/utils/FileSystem.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.utils

import java.net.URI
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.hadoop.fs

object FileSystem {
private def renameSparkGeneratedFiles(spark: SparkSession, filePrefix: String): Unit = {
val sc = spark.sparkContext
val file_system = fs.FileSystem.get(new URI(filePrefix), spark.sparkContext.hadoopConfiguration)
val path_pattern = new fs.Path(filePrefix + "part*")
val files = file_system.globStatus(path_pattern)
for (i <- 0 until files.length) {
val file_name = files(i).getPath.getName
val new_file_name = "chunk" + i.toString
file_system.rename(new fs.Path(filePrefix + file_name), new fs.Path(filePrefix + new_file_name))
}
}

def writeDataFrame(dataFrame: DataFrame, fileType: String, outputPrefix: String): Unit = {
val spark = dataFrame.sparkSession
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
// spark.conf.set("spark.sql.parquet.compression.codec", "zstd")
dataFrame.write.mode("overwrite").format(fileType).save(outputPrefix)
renameSparkGeneratedFiles(spark, outputPrefix)
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
package com.alibaba.graphar
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.utils

import com.alibaba.graphar.GeneralParams

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -78,11 +95,11 @@ object IndexGenerator {
def generateDstIndexForEdgesFromMapping(edgeDf: DataFrame, dstColumnName: String, dstIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
dstIndexMapping.createOrReplaceTempView("dst_vertex")
edgeDf.createOrReplaceTempView("edge")
edgeDf.createOrReplaceTempView("edges")
val dstCol = GeneralParams.dstIndexCol;
val indexCol = GeneralParams.vertexIndexCol;
val dstPrimaryKey = GeneralParams.primaryCol;
val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edge.* from edge inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edge.$dstColumnName%s")
val trans_df = spark.sql(f"select dst_vertex.$indexCol%s as $dstCol%s, edges.* from edges inner join dst_vertex on dst_vertex.$dstPrimaryKey%s=edges.$dstColumnName%s")
// drop the old dst id col
trans_df.drop(dstColumnName)
}
Expand Down
Loading