Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] [Spark] Add methods for Spark Reader and improve the performance #87

Merged
merged 8 commits into from
Feb 7, 2023
2 changes: 1 addition & 1 deletion docs/user-guide/spark-lib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ To utilize the GAR Spark reader, please refer to the following example code.
val single_chunk_df = reader.readVertexPropertyChunk(property_group, 0)
// ...
// read all property chunks
val vertex_df = reader.readAllVertexProperties()
val vertex_df = reader.readAllVertexPropertyGroups()

//construct the edge reader
val edge_info = ...
Expand Down
145 changes: 96 additions & 49 deletions spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

package com.alibaba.graphar.reader

import com.alibaba.graphar.utils.{IndexGenerator}
import com.alibaba.graphar.utils.{IndexGenerator, DataFrameConcat}
import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}
import com.alibaba.graphar.datasources._

Expand Down Expand Up @@ -74,7 +74,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame of all AdjList chunks of vertices in given vertex chunk.
*/
def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListPathPrefix(vertex_chunk_index, adjListType)
Expand All @@ -91,7 +91,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* @param addIndex flag that add index column or not in the final DataFrame.
* @return DataFrame of all AdjList chunks.
*/
def readAllAdjList(addIndex: Boolean = false): DataFrame = {
def readAllAdjList(addIndex: Boolean = true): DataFrame = {
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListPathPrefix(adjListType)
Expand Down Expand Up @@ -128,7 +128,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* @return DataFrame that contains all property group chunks of vertices in given vertex chunk.
* If edge info does not contain the property group, raise an IllegalArgumentException error.
*/
def readEdgePropertiesForVertexChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
def readEdgePropertyGroupForVertexChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
Expand All @@ -148,7 +148,7 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* @return DataFrame that contains all chunks of property group.
* If edge info does not contain the property group, raise an IllegalArgumentException error.
*/
def readEdgeProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = {
def readEdgePropertyGroup(propertyGroup: PropertyGroup, addIndex: Boolean = true): DataFrame = {
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
Expand All @@ -161,51 +161,94 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
}
}

/** Load the chunks for mutiple property groups of a vertex chunk as a DataFrame.
*
* @param propertyGroups list of property groups.
* @param vertex_chunk_index index of vertex chunk.
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame that contains all property groups chunks of a vertex chunk.
*/
def readMultipleEdgePropertyGroupsForVertexChunk(propertyGroups: java.util.ArrayList[PropertyGroup], vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
val len: Int = propertyGroups.size
if (len == 0)
return spark.emptyDataFrame

val pg0: PropertyGroup = propertyGroups.get(0)
val df0 = readEdgePropertyGroupForVertexChunk(pg0, vertex_chunk_index, false)
if (len == 1)
return df0

var rdd = df0.rdd
var schema_array = df0.schema.fields
for ( i <- 1 to len - 1 ) {
val pg: PropertyGroup = propertyGroups.get(i)
val new_df = readEdgePropertyGroupForVertexChunk(pg, vertex_chunk_index, false)
schema_array = Array.concat(schema_array, new_df.schema.fields)
rdd = DataFrameConcat.concatRdd(rdd, new_df.rdd)
}

val schema = StructType(schema_array)
val df = spark.createDataFrame(rdd, schema)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
} else {
return df
}
}

/** Load the chunks for multiple property groups as a DataFrame.
*
* @param propertyGroups list of property groups.
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame tha contains all property groups chunks of edge.
*/
def readMultipleEdgePropertyGroups(propertyGroups: java.util.ArrayList[PropertyGroup], addIndex: Boolean = true): DataFrame = {
val len: Int = propertyGroups.size
if (len == 0)
return spark.emptyDataFrame

val pg0: PropertyGroup = propertyGroups.get(0)
val df0 = readEdgePropertyGroup(pg0, false)
if (len == 1)
return df0

var rdd = df0.rdd
var schema_array = df0.schema.fields
for ( i <- 1 to len - 1 ) {
val pg: PropertyGroup = propertyGroups.get(i)
val new_df = readEdgePropertyGroup(pg, false)
schema_array = Array.concat(schema_array, new_df.schema.fields)
rdd = DataFrameConcat.concatRdd(rdd, new_df.rdd)
}

val schema = StructType(schema_array)
val df = spark.createDataFrame(rdd, schema)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
} else {
return df
}
}

/** Load the chunks for all property groups of a vertex chunk as a DataFrame.
*
* @param vertex_chunk_index index of vertex chunk.
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame that contains all property groups chunks of a vertex chunk.
*/
def readAllEdgePropertiesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
var df = spark.emptyDataFrame
def readAllEdgePropertyGroupsForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
val property_groups = edgeInfo.getPropertyGroups(adjListType)
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val new_df = readEdgePropertiesForVertexChunk(pg, vertex_chunk_index, true)
if (i == 0)
df = new_df
else
df = df.join(new_df, Seq(GeneralParams.edgeIndexCol))
}
df = df.sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
return readMultipleEdgePropertyGroupsForVertexChunk(property_groups, vertex_chunk_index, addIndex)
}

/** Load the chunks for all property groups as a DataFrame.
*
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame tha contains all property groups chunks of edge.
*/
def readAllEdgeProperties(addIndex: Boolean = false): DataFrame = {
var df = spark.emptyDataFrame
def readAllEdgePropertyGroups(addIndex: Boolean = true): DataFrame = {
val property_groups = edgeInfo.getPropertyGroups(adjListType)
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val new_df = readEdgeProperties(pg, true)
if (i == 0)
df = new_df
else
df = df.join(new_df, Seq(GeneralParams.edgeIndexCol))
}
df = df.sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
return readMultipleEdgePropertyGroups(property_groups, addIndex)
}

/** Load the chunks for the AdjList and all property groups for a vertex chunk as a DataFrame.
Expand All @@ -214,26 +257,30 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame that contains all chunks of AdjList and property groups of vertices in given vertex chunk.
*/
def readEdgesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
val adjList_df = readAdjListForVertexChunk(vertex_chunk_index, true)
val properties_df = readAllEdgePropertiesForVertexChunk(vertex_chunk_index, true)
var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
def readEdgesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
val adjList_df = readAdjListForVertexChunk(vertex_chunk_index, false)
val properties_df = readAllEdgePropertyGroupsForVertexChunk(vertex_chunk_index, false)
val df = DataFrameConcat.concat(adjList_df, properties_df)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
} else {
return df
}
}

/** Load the chunks for the AdjList and all property groups as a DataFrame.
*
* @param addIndex flag that add edge index column or not in the final DataFrame.
* @return DataFrame that contains all chunks of AdjList and property groups of edge.
*/
def readEdges(addIndex: Boolean = false): DataFrame = {
val adjList_df = readAllAdjList(true)
val properties_df = readAllEdgeProperties(true)
var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
def readEdges(addIndex: Boolean = true): DataFrame = {
val adjList_df = readAllAdjList(false)
val properties_df = readAllEdgePropertyGroups(false)
val df = DataFrameConcat.concat(adjList_df, properties_df)
if (addIndex) {
return IndexGenerator.generateEdgeIndexColumn(df)
} else {
return df
}
}
}
63 changes: 40 additions & 23 deletions spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package com.alibaba.graphar.reader

import com.alibaba.graphar.utils.{IndexGenerator}
import com.alibaba.graphar.utils.{IndexGenerator, DataFrameConcat}
import com.alibaba.graphar.{GeneralParams, VertexInfo, FileType, PropertyGroup}
import com.alibaba.graphar.datasources._

import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

/** Reader for vertex chunks.
*
Expand All @@ -32,12 +33,6 @@ import org.apache.spark.sql.functions._
* @param spark spark session for the reader to read chunks as Spark DataFrame.
*/
class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) {
private val vertices_number = readVerticesNumber()
private val chunk_size = vertexInfo.getChunk_size()
private var chunk_number = vertices_number / chunk_size
if (vertices_number % chunk_size != 0)
chunk_number = chunk_number + 1

/** Load the total number of vertices for this vertex type. */
def readVerticesNumber(): Long = {
val file_path = prefix + "/" + vertexInfo.getVerticesNumFilePath()
Expand Down Expand Up @@ -71,7 +66,7 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession)
* @return DataFrame that contains all chunks of property group.
* Raise IllegalArgumentException if the property group not contained.
*/
def readVertexProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = {
def readVertexPropertyGroup(propertyGroup: PropertyGroup, addIndex: Boolean = true): DataFrame = {
if (vertexInfo.containPropertyGroup(propertyGroup) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
Expand All @@ -85,26 +80,48 @@ class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession)
}
}

/** Load the chunks for multiple property groups as a DataFrame.
*
* @param propertyGroups list of property groups.
* @param addIndex flag that add vertex index column or not in the final DataFrame.
* @return DataFrame that contains all chunks of property group.
* Raise IllegalArgumentException if the property group not contained.
*/
def readMultipleVertexPropertyGroups(propertyGroups: java.util.ArrayList[PropertyGroup], addIndex: Boolean = true): DataFrame = {
val len: Int = propertyGroups.size
if (len == 0)
return spark.emptyDataFrame

val pg0: PropertyGroup = propertyGroups.get(0)
val df0 = readVertexPropertyGroup(pg0, false)
if (len == 1)
return df0

var rdd = df0.rdd
var schema_array = df0.schema.fields
for ( i <- 1 to len - 1 ) {
val pg: PropertyGroup = propertyGroups.get(i)
val new_df = readVertexPropertyGroup(pg, false)
schema_array = Array.concat(schema_array, new_df.schema.fields)
rdd = DataFrameConcat.concatRdd(rdd, new_df.rdd)
}

val schema = StructType(schema_array)
val df = spark.createDataFrame(rdd, schema)
if (addIndex) {
return IndexGenerator.generateVertexIndexColumn(df)
} else {
return df
}
}

/** Load the chunks for all property groups as a DataFrame.
*
* @param addIndex flag that add vertex index column or not in the final DataFrame.
* @return DataFrame that contains all property group chunks of vertex.
*/
def readAllVertexProperties(addIndex: Boolean = false): DataFrame = {
var df = spark.emptyDataFrame
def readAllVertexPropertyGroups(addIndex: Boolean = true): DataFrame = {
val property_groups = vertexInfo.getProperty_groups()
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val new_df = readVertexProperties(pg, true)
if (i == 0)
df = new_df
else
df = df.join(new_df, Seq(GeneralParams.vertexIndexCol))
}
df = df.sort(GeneralParams.vertexIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.vertexIndexCol)
return df
return readMultipleVertexPropertyGroups(property_groups, addIndex)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.utils

import com.alibaba.graphar.GeneralParams

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

/** Helper object to concat DataFrames */
object DataFrameConcat {
/** Concat two DataFrames.
*
* @param df1 The first DataFrame.
* @param df2 The second DataFrame.
* @return The result DataFrame that concats the two DataFrames.
*/
def concat(df1: DataFrame, df2: DataFrame): DataFrame = {
val spark = df1.sparkSession
val schema = StructType(Array.concat(df1.schema.fields, df2.schema.fields))
val res_rdd = df1.rdd.zip(df2.rdd).map(pair => Row.fromSeq(pair._1.toSeq.toList ::: pair._2.toSeq.toList))
val df = spark.createDataFrame(res_rdd, schema)
return df
}

/** Concat two RDDs.
*
* @param rdd1 The first RDD.
* @param rdd2 The second RDD.
* @return The result RDD that concats the two RDDs.
*/
def concatRdd(rdd1: RDD[Row], rdd2: RDD[Row]): RDD[Row] = {
rdd1.zip(rdd2).map(pair => Row.fromSeq(pair._1.toSeq.toList ::: pair._2.toSeq.toList))
}
}
4 changes: 2 additions & 2 deletions spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ComputeExampleSuite extends AnyFunSuite {

val vertex_reader = new VertexReader(prefix, vertex_info, spark)
val vertices_num = vertex_reader.readVerticesNumber()
val vertex_df = vertex_reader.readAllVertexProperties(true)
val vertex_df = vertex_reader.readAllVertexPropertyGroups(true)
vertex_df.show()
assert(vertex_df.columns.size == 5)
assert(vertex_df.count() == vertices_num)
Expand All @@ -54,7 +54,7 @@ class ComputeExampleSuite extends AnyFunSuite {
val adj_list_type = AdjListType.ordered_by_source

val edge_reader = new EdgeReader(prefix, edge_info, adj_list_type, spark)
val edge_df = edge_reader.readAllAdjList()
val edge_df = edge_reader.readAllAdjList(false)
edge_df.show()
assert(edge_df.columns.size == 2)
assert(edge_df.count() == 6626)
Expand Down
Loading