Skip to content

Commit

Permalink
Init the reader and writer at graph level
Browse files Browse the repository at this point in the history
  • Loading branch information
acezen committed Mar 1, 2023
1 parent 1011eeb commit f220012
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 9 deletions.
4 changes: 4 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,10 @@ class EdgeInfo() {
str = prefix + getAdjListPrefix(adj_list_type) + str
return str
}

def getConcatKey(): String = {
return getSrc_label + GeneralParams.regularSeperator + getEdge_label + GeneralParams.regularSeperator + getDst_label
}
}

/** Helper object to load edge info files */
Expand Down
29 changes: 20 additions & 9 deletions spark/src/main/scala/com/alibaba/graphar/GraphInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,31 @@ class GraphInfo() {
var vertexInfos: Map[String, VertexInfo] = Map[String, VertexInfo]()
var edgeInfos: Map[String, EdgeInfo] = Map[String, EdgeInfo]()

def addVertexInfo(label: String, vertexInfo: VertexInfo): Unit = {
vertexInfos += (label -> vertexInfo)
def addVertexInfo(vertexInfo: VertexInfo): Unit = {
vertexInfos += (vertexInfo.getlabel -> vertexInfo)
}

def addEdgeInfo(label: String, edgeInfo: EdgeInfo): Unit = {
edgeInfos += (label -> edgeInfo)
def addEdgeInfo(edgeInfo: EdgeInfo): Unit = {
val key = edge_info.getSrc_label + GeneralParams.regularSeperator + edge_info.getEdge_label + GeneralParams.regularSeperator + edge_info.getDst_label
edgeInfos += ( key -> edgeInfo)
}

def getVertexInfo(label: String): VertexInfo = {
vertexInfos(label)
}

def getEdgeInfo(srcLabel: String, edgeLabel: String, dstLabel: String): EdgeInfo = {
val key = srcLabel + GeneralParams.regularSeperator + edgeLabel + GeneralParams.regularSeperator + dstLabel
edgeInfos(key)
}

def getVertexInfos(): Map[String, VertexInfo] = {
return vertexInfos
}

def getEdgeInfos(): Map[String, EdgeInfo] = {
return edgeInfos
}
}

/** Helper object to load graph info files */
Expand All @@ -242,19 +256,16 @@ object GraphInfo {
while (vertices_it.hasNext()) {
val file_name = vertices_it.next()
val path = prefix + file_name
println("path" + path)
val vertex_info = VertexInfo.loadVertexInfo(path, spark)
println(vertex_info.getLabel)
graph_info.addVertexInfo(vertex_info.getLabel, vertex_info)
graph_info.addVertexInfo(vertex_info)
}
val edges_yaml = graph_info.getEdges
val edges_it = edges_yaml.iterator
while (edges_it.hasNext()) {
val file_name = edges_it.next()
val path = prefix + file_name
val edge_info = EdgeInfo.loadEdgeInfo(path, spark)
val key = edge_info.getSrc_label + GeneralParams.regularSeperator + edge_info.getEdge_label + GeneralParams.regularSeperator + edge_info.getDst_label
graph_info.addEdgeInfo(key, edge_info)
graph_info.addEdgeInfo(edge_info)
}
return graph_info
}
Expand Down
64 changes: 64 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphReader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.graph

import com.alibaba.graphar.{GeneralParams, AdjListType, GraphInfo, VertexInfo, EdgeInfo}
import com.alibaba.graphar.reader.{VertexReader, EdgeReader}

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

/** The helper object for read graphs through the definitions of their infos. */
object GraphReader {
private def readAllVertices(prefix: String, vertexInfos: Map[String, VertexInfo], spark: SparkSession): Map[String, DataFrame] = {
val vertex_dataframes: Map[String, DataFrame] = vertexInfos.MapValues { vertexInfo => {
val reader = new VertexReader(prefix, vertexInfo, spark)
(vertex_info.getLabel, reader.readAllVertexPropertyGroups(true))
}}
return vertex_dataframes
}

private def readAllEdges(prefix: String, edgeInfos: Map[String, edgeInfo], spark: SparkSession): Map[String, Map[AdjListType, DataFrame]] = {
val edge_dataframes: Map[String, Map[AdjListType, DataFrame]] = edgeInfos.MapValues { edgeInfo => {
val adj_lists = edgeInfo.getAdj_lists
val adj_list_it = adj_lists.iterator
var adj_list_type_edge_df_map: Map[AdjListType, DataFrame] = Map[AdjListType, DataFrame]()
while (adj_list_it.hasNext()) {
val adj_list_type = adj_list_it.next().getAdjList_type_in_gar
val reader = new EdgeReader(prefix, edgeInfo, adj_list_type, spark)
adj_list_type_edge_df_map += (adj_list_type, reader.readEdges(false))
}
(edgeInfo.getConcatKey(), adj_list_type_edge_df_map)
}}
return edge_dataframes
}

def read(graphInfo: GraphInfo, spark: SparkSession): Pair[Map[String, DataFrame], Map[String, Map[AdjListType, DataFrame]]] = {
val prefix = graphInfo.getPrefix
val vertex_infos = graphInfo.getVertexInfos()
val edge_infos = graphInfo.getEdgeInfos()
return (readAllVertices(prefix, vertexInfos, spark), readAllEdges(prefix, edgeInfos, spark))
}

def read(graphInfoPath: String, spark: SparkSession): Pair[Map[String, DataFrame], Map[String, Map[AdjListType, DataFrame]]] = {
// load graph info
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark)

// conduct reading
read(graph_info, spark)
}
}
99 changes: 99 additions & 0 deletions spark/src/main/scala/com/alibaba/graphar/graph/GraphWriter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/** Copyright 2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphar.graph

import com.alibaba.graphar.{AdjListType, GraphInfo, VertexInfo, EdgeInfo}
import com.alibaba.graphar.writer.{VertexWriter, EdgeWriter}
import com.alibaba.graphar.utils.IndexGenerator

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

/** The helper object for write graph through the definitions of their infos. */
object GraphWriter {
private def writeAllVertices(prefix: String,
vertexInfos: Map[String, VertexInfo],
vertex_num_map: Map[String, Int],
vertexDataFrames: Map[String, DataFrame],
spark: SparkSession): Map[String, DataFrame] = {
vertexInfos.foreach { case (label, vertexInfo) = > {
val vertex_num = vertex_num_map(label).toLong
val df_with_index = IndexGenerator.generateVertexIndexColumn((vertexDataFrames(label))
val writer = new VertexWriter(prefix, vertexInfo, df_with_index, vertex_num)
writer.writeVertexProperties()
}}
}

private def writeAllEdges(prefix: String,
vertexInfos: Map[String, VertexInfo],
edgeInfos: Map[String, EdgeInfo],
vertex_num_map: Map[String, Int],
vertexDataFrames: Map[String, DataFrame],
edgeDataFrames: Map[String, DataFrame],
spark: SparkSession): Map[String, DataFrame] = {
edgeInfos.foreach { case (key, edgeInfo) = > {
val srcLabel = edgeInfo.getSrc_label
val dstLabel = edgeInfo.getDst_label
val edge_key = edgeInfo.getConcatKey()
val src_vertex_index_mapping = IndexGenerator.constructVertexIndexMapping(vertexDataFrames(srcLabel), vertexInfos(srcLabel).getPrimaryKey())
val dst_vertex_index_mapping = {
if (srcLabel == dstLabel)
src_vertex_index_mapping
else
IndexGenerator.constructVertexIndexMapping(vertexDataFrames(dstLabel), vertexInfos(dstLabel).getPrimaryKey())
}
val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexForEdgesFromMapping(edgeDataFrames(edge_key), src_vertex_index_mapping, dst_vertex_index_mapping)

val adj_lists = edgeInfo.getAdj_lists
val adj_list_it = adj_lists.iterator
while (adj_list_it.hasNext()) {
val adj_list_type = adj_list_it.next().getAdjList_type_in_gar
val vertex_num = {
if (adj_list_type == AdjListType.ordered_by_source || adj_list_type == AdjListType.unordered_by_source) {
vertex_num_map(srcLabel)
} else {
vertex_num_map(dstLabel)
}
}
val writer = new EdgeWriter(prefix, edgeInfo, adj_list_type, vertex_num.toLong, edge_df_with_index)
writer.writeEdges()
}
}}
}

def write(graphInfo: GraphInfo, vertexDataFrames: Map[String, DataFrame], edgeDataFrames: Map[String, DataFrame], spark: SparkSession): Unit = {
// get the vertex num of each vertex dataframe
val vertex_num_map: Map[String, Int] = vertexDataFrames.map { case (k, v) => (k, v.count()) }
val prefix = graphInfo.getPrefix
val vertex_infos = graphInfo.getVertexInfos()
val edge_infos = graphInfo.getEdgeInfos()

// write vertices
writeAllVertices(prefix, vertex_infos, vertex_num_map, vertexDataFrames, spark)

// write edges
writeEdges(prefix, vertex_infos, edge_infos, vertex_num_map, vertexDataFrames, edgeDataFrames, spark)
}

def write(graphInfoPath: String, vertexDataFrames: Map[String, DataFrame], edgeDataFrames: Map[String, DataFrame], spark: SparkSession): Unit = {
// load graph info
val graph_info = GraphInfo.loadGraphInfo(graphInfoPath, spark)

// conduct writing
write(graph_info, vertexDataFrames, edgeDataFrames, spark)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ object IndexGenerator {
generateDstIndexForEdgesFromMapping(df_with_src_index, dstColumnName, dstIndexMapping)
}

/** Assumes that the first and second columns are the src and dst columns */
def generateSrcAndDstIndexForEdgesFromMapping(edgeDf: DataFrame, srcIndexMapping: DataFrame, dstIndexMapping: DataFrame): DataFrame = {
val srcColName: String = edgeDf.columns(0)
val dstColName: String = edgeDfcolumns(1)
val df_with_src_index = generateSrcIndexForEdgesFromMapping(edgeDf, srcColName, srcIndexMapping)
generateDstIndexForEdgesFromMapping(df_with_src_index, dstColName, dstIndexMapping)
}

/** Construct vertex index for source column. */
def generateSrcIndexForEdges(edgeDf: DataFrame, srcColumnName: String): DataFrame = {
val srcDf = edgeDf.select(srcColumnName).distinct()
Expand Down

0 comments on commit f220012

Please sign in to comment.