Skip to content

Commit

Permalink
Initialize implementation for spark reader (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
lixueclaire authored Dec 30, 2022
1 parent 4863b5d commit 36f6b78
Show file tree
Hide file tree
Showing 7 changed files with 469 additions and 4 deletions.
4 changes: 2 additions & 2 deletions include/gar/graph_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ class EdgeInfo {
if (!ContainAdjList(adj_list_type)) {
return Status::KeyError("The adj list type is not found in edge info.");
}
return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/part" +
std::to_string(vertex_chunk_index) + "/chunk0";
return prefix_ + adj_list2prefix_.at(adj_list_type) + "offset/chunk" +
std::to_string(vertex_chunk_index);
}

/// Get the adj list offset chunk file directory path of adj list type
Expand Down
216 changes: 216 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.reader

import com.alibaba.graphar.utils.{IndexGenerator}
import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}

import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.Value,spark:SparkSession) {
// load a single offset chunk as a DataFrame
def readOffset(chunk_index: Long): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
if (adjListType != AdjListType.ordered_by_source && adjListType != AdjListType.ordered_by_dest)
throw new IllegalArgumentException
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType)
val df = spark.read.format(file_type).load(file_path)
return df
}

// load a single AdjList chunk as a DataFrame
def readAdjListChunk(vertex_chunk_index: Long, chunk_index: Long): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, chunk_index, adjListType)
val df = spark.read.format(file_type).load(file_path)
return df
}

// load all AdjList chunks for a vertex chunk as a DataFrame
def readAdjListForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
val file_path = prefix + "/" + edgeInfo.getAdjListFilePath(vertex_chunk_index, adjListType)
val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
val path_pattern = new Path(file_path + "chunk*")
val chunk_number = file_system.globStatus(path_pattern).length
var df = spark.emptyDataFrame
for ( i <- 0 to chunk_number - 1) {
val new_df = readAdjListChunk(vertex_chunk_index, i)
if (i == 0)
df = new_df
else
df = df.union(new_df)
}
if (addIndex)
df = IndexGenerator.generateEdgeIndexColumn(df)
return df
}

// load all AdjList chunks for this edge type as a DataFrame
def readAllAdjList(addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
val file_path = prefix + "/" + edgeInfo.getAdjListDirPath(adjListType)
val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
val path_pattern = new Path(file_path + "part*")
val vertex_chunk_number = file_system.globStatus(path_pattern).length
var df = spark.emptyDataFrame
for ( i <- 0 to vertex_chunk_number - 1) {
val new_df = readAdjListForVertexChunk(i)
if (i == 0)
df = new_df
else
df = df.union(new_df)
}
if (addIndex)
df = IndexGenerator.generateEdgeIndexColumn(df)
return df
}

// load a single edge property chunk as a DataFrame
def readEdgePropertyChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, chunk_index: Long): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type();
val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index)
val df = spark.read.format(file_type).load(file_path)
return df
}

// load the chunks for a property group of a vertex chunk as a DataFrame
def readEdgePropertiesForVertexChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_path = prefix + "/" + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index)
val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
val path_pattern = new Path(file_path + "chunk*")
val chunk_number = file_system.globStatus(path_pattern).length
var df = spark.emptyDataFrame
for ( i <- 0 to chunk_number - 1) {
val new_df = readEdgePropertyChunk(propertyGroup, vertex_chunk_index, i)
if (i == 0)
df = new_df
else
df = df.union(new_df)
}
if (addIndex)
df = IndexGenerator.generateEdgeIndexColumn(df)
return df
}

// load all chunks for a property group as a DataFrame
def readEdgeProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
throw new IllegalArgumentException
val file_path = prefix + "/" + edgeInfo.getPropertyDirPath(propertyGroup, adjListType)
val file_system = FileSystem.get(new Path(file_path).toUri(), spark.sparkContext.hadoopConfiguration)
val path_pattern = new Path(file_path + "part*")
val vertex_chunk_number = file_system.globStatus(path_pattern).length
var df = spark.emptyDataFrame
for ( i <- 0 to vertex_chunk_number - 1) {
val new_df = readEdgePropertiesForVertexChunk(propertyGroup, i)
if (i == 0)
df = new_df
else
df = df.union(new_df)
}
if (addIndex)
df = IndexGenerator.generateEdgeIndexColumn(df)
return df
}

// load the chunks for all property groups of a vertex chunk as a DataFrame
def readAllEdgePropertiesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
var df = spark.emptyDataFrame
val property_groups = edgeInfo.getPropertyGroups(adjListType)
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val new_df = readEdgePropertiesForVertexChunk(pg, vertex_chunk_index, true)
if (i == 0)
df = new_df
else
df = df.join(new_df, Seq(GeneralParams.edgeIndexCol))
}
df = df.sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
}

// load the chunks for all property groups as a DataFrame
def readAllEdgeProperties(addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
var df = spark.emptyDataFrame
val property_groups = edgeInfo.getPropertyGroups(adjListType)
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val new_df = readEdgeProperties(pg, true)
if (i == 0)
df = new_df
else
df = df.join(new_df, Seq(GeneralParams.edgeIndexCol))
}
df = df.sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
}

// load the chunks for the AdjList and all property groups for a vertex chunk as a DataFrame
def readEdgesForVertexChunk(vertex_chunk_index: Long, addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
val adjList_df = readAdjListForVertexChunk(vertex_chunk_index, true)
val properties_df = readAllEdgePropertiesForVertexChunk(vertex_chunk_index, true)
var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
}

// load the chunks for the AdjList and all property groups as a DataFrame
def readEdges(addIndex: Boolean = false): DataFrame = {
if (edgeInfo.containAdjList(adjListType) == false)
throw new IllegalArgumentException
val adjList_df = readAllAdjList(true)
val properties_df = readAllEdgeProperties(true)
var df = adjList_df.join(properties_df, Seq(GeneralParams.edgeIndexCol)).sort(GeneralParams.edgeIndexCol);
if (addIndex == false)
df = df.drop(GeneralParams.edgeIndexCol)
return df
}
}
89 changes: 89 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/reader/VertexReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.reader

import com.alibaba.graphar.utils.{IndexGenerator}
import com.alibaba.graphar.{GeneralParams, VertexInfo, FileType, PropertyGroup}

import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

class VertexReader(prefix: String, vertexInfo: VertexInfo, spark: SparkSession) {
private val vertices_number = readVerticesNumber()
private val chunk_size = vertexInfo.getChunk_size()
private var chunk_number = vertices_number / chunk_size
if (vertices_number % chunk_size != 0)
chunk_number = chunk_number + 1

// load the total number of vertices for this vertex type
def readVerticesNumber(): Long = {
val file_path = prefix + "/" + vertexInfo.getVerticesNumFilePath()
val path = new Path(file_path)
val file_system = FileSystem.get(path.toUri(), spark.sparkContext.hadoopConfiguration)
val input = file_system.open(path)
val number = java.lang.Long.reverseBytes(input.readLong())
file_system.close()
return number
}

// load a single vertex property chunk as a DataFrame
def readVertexPropertyChunk(propertyGroup: PropertyGroup, chunk_index: Long): DataFrame = {
if (vertexInfo.containPropertyGroup(propertyGroup) == false)
throw new IllegalArgumentException
val file_type = propertyGroup.getFile_type()
val file_path = prefix + "/" + vertexInfo.getFilePath(propertyGroup, chunk_index)
val df = spark.read.format(file_type).load(file_path)
return df
}

// load all chunks for a property group as a DataFrame
def readVertexProperties(propertyGroup: PropertyGroup, addIndex: Boolean = false): DataFrame = {
if (vertexInfo.containPropertyGroup(propertyGroup) == false)
throw new IllegalArgumentException
var df = spark.emptyDataFrame
for ( i <- 0L to chunk_number - 1) {
val new_df = readVertexPropertyChunk(propertyGroup, i)
if (i == 0)
df = new_df
else
df = df.union(new_df)
}
if (addIndex)
df = IndexGenerator.generateVertexIndexColumn(df)
return df
}

// load the chunks for all property groups as a DataFrame
def readAllVertexProperties(addIndex: Boolean = false): DataFrame = {
var df = spark.emptyDataFrame
val property_groups = vertexInfo.getProperty_groups()
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val new_df = readVertexProperties(pg, true)
if (i == 0)
df = new_df
else
df = df.join(new_df, Seq(GeneralParams.vertexIndexCol))
}
df = df.sort(GeneralParams.vertexIndexCol)
if (addIndex == false)
df = df.drop(GeneralParams.vertexIndexCol)
return df
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,28 @@ object IndexGenerator {

//index helper for the Edge DataFrame

//add a column contains edge index
def generateEdgeIndexColumn(edgeDf: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
val schema = edgeDf.schema
val schema_with_index = StructType(StructType(Seq(StructField(GeneralParams.edgeIndexCol, LongType, true)))++schema)
val rdd = edgeDf.rdd
val counts = rdd
.mapPartitionsWithIndex((i, ps) => Array((i, ps.size)).iterator, preservesPartitioning = true)
.collectAsMap()
val aggregatedCounts = SortedMap(counts.toSeq: _*)
.foldLeft((0L, Map.empty[Int, Long])) { case ((total, map), (i, c)) =>
(total + c, map + (i -> total))
}
._2
val broadcastedCounts = spark.sparkContext.broadcast(aggregatedCounts)
val rdd_with_index = rdd.mapPartitionsWithIndex((i, ps) => {
val start = broadcastedCounts.value(i)
for { (p, j) <- ps.zipWithIndex } yield Row.fromSeq(Seq(start + j) ++ p.toSeq)
})
spark.createDataFrame(rdd_with_index, schema_with_index)
}

// join the edge table with the vertex index mapping for source column
def generateSrcIndexForEdgesFromMapping(edgeDf: DataFrame, srcColumnName: String, srcIndexMapping: DataFrame): DataFrame = {
val spark = edgeDf.sparkSession
Expand Down
Loading

0 comments on commit 36f6b78

Please sign in to comment.