Skip to content

Commit

Permalink
[Spark] Update the Spark library to align with the latest file format…
Browse files Browse the repository at this point in the history
… design (#144)
  • Loading branch information
lixueclaire authored Apr 25, 2023
1 parent 7153a66 commit 8a3462f
Show file tree
Hide file tree
Showing 12 changed files with 233 additions and 42 deletions.
10 changes: 5 additions & 5 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Each type of vertices (with the same label) constructs a logical vertex table, w
Given a vertex id and the vertex label, a vertex is uniquely identifiable and its respective properties can be accessed from this table. The vertex id is further used to identify the source and destination vertices when maintaining the topology of the graph.

.. image:: https://alibaba.github.io/GraphAr/_images/vertex_logical_table.png
:width: 700
:width: 650
:align: center
:alt: vertex logical table

Expand All @@ -92,7 +92,7 @@ The logical vertex table will be partitioned into multiple continuous vertex chu
Take the "person" vertex table as an example, if the chunk size is set to be 500, the logical table will be separated into sub-logical-tables of 500 rows with the exception of the last one, which may have less than 500 rows. The columns for maintaining properties will also be divided into distinct groups (e.g., 2 for our example). As a result, a total of 4 physical vertex tables are created for storing the example logical table, which can be seen from the following figure.

.. image:: https://alibaba.github.io/GraphAr/_images/vertex_physical_table.png
:width: 700
:width: 650
:align: center
:alt: vertex physical table

Expand All @@ -107,7 +107,7 @@ For maintaining a type of edges (that with the same triplet of the source label,
Take the logical table for "person likes person" edges as an example, the logical edge table looks like:

.. image:: https://alibaba.github.io/GraphAr/_images/edge_logical_table.png
:width: 700
:width: 650
:align: center
:alt: edge logical table

Expand All @@ -127,12 +127,12 @@ Additionally, the partition of the offset table should be in alignment with the
Take the "person knows person" edges to illustrate. Suppose the vertex chunk size is set to 500 and the edge chunk size is 1024, the edges will be saved in the following physical tables:

.. image:: https://alibaba.github.io/GraphAr/_images/edge_physical_table1.png
:width: 700
:width: 650
:align: center
:alt: edge logical table1

.. image:: https://alibaba.github.io/GraphAr/_images/edge_physical_table2.png
:width: 700
:width: 650
:align: center
:alt: edge logical table2

Expand Down
13 changes: 13 additions & 0 deletions docs/user-guide/file-format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ And each edge contains:
The following is an example property graph containing two types of vertices ("person" and "comment") and three types of edges.

.. image:: ../images/property_graph.png
:width: 700
:align: center
:alt: property graph


Expand All @@ -36,6 +38,8 @@ Each type of vertices (with the same label) constructs a logical vertex table, w
Given a vertex id and the vertex label, a vertex is uniquely identifiable and its respective properties can be accessed from this table. The vertex id is further used to identify the source and destination vertices when maintaining the topology of the graph.

.. image:: ../images/vertex_logical_table.png
:width: 650
:align: center
:alt: vertex logical table

.. note::
Expand All @@ -50,6 +54,8 @@ The logical vertex table will be partitioned into multiple continuous vertex chu
Take the "person" vertex table as an example, if the chunk size is set to be 500, the logical table will be separated into sub-logical-tables of 500 rows with the exception of the last one, which may have less than 500 rows. The columns for maintaining properties will also be divided into distinct groups (e.g., 2 for our example). As a result, a total of 4 physical vertex tables are created for storing the example logical table, which can be seen from the following figure.

.. image:: ../images/vertex_physical_table.png
:width: 650
:align: center
:alt: vertex physical table


Expand All @@ -63,6 +69,8 @@ For maintaining a type of edges (that with the same triplet of the source label,
Take the logical table for "person likes person" edges as an example, the logical edge table looks like:

.. image:: ../images/edge_logical_table.png
:width: 650
:align: center
:alt: edge logical table

Physical table of edges
Expand All @@ -80,8 +88,13 @@ Additionally, the partition of the offset table should be in alignment with the
Take the "person knows person" edges to illustrate. Suppose the vertex chunk size is set to 500 and the edge chunk size is 1024, the edges will be saved in the following physical tables:

.. image:: ../images/edge_physical_table1.png
:width: 650
:align: center
:alt: edge physical table1

.. image:: ../images/edge_physical_table2.png
:width: 650
:align: center
:alt: edge physical table2

.. tip::
Expand Down
64 changes: 57 additions & 7 deletions spark/src/main/scala/com/alibaba/graphar/EdgeInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ class EdgeInfo() {
val tot: Int = adj_lists.size
for ( k <- 0 to tot - 1 ) {
val adj_list = adj_lists.get(k)
if (adj_list.getAdjList_type_in_gar == adj_list_type)
if (adj_list.getAdjList_type_in_gar == adj_list_type) {
return true
}
}
return false
}
Expand Down Expand Up @@ -260,10 +261,12 @@ class EdgeInfo() {

/** Check if the edge info is validated. */
def isValidated(): Boolean = {
if (src_label == "" || edge_label == "" || dst_label == "")
if (src_label == "" || edge_label == "" || dst_label == "") {
return false
if (chunk_size <= 0 || src_chunk_size <= 0 || dst_chunk_size <= 0)
}
if (chunk_size <= 0 || src_chunk_size <= 0 || dst_chunk_size <= 0) {
return false
}
val tot: Int = adj_lists.size
for ( k <- 0 to tot - 1 ) {
val adj_list = adj_lists.get(k)
Expand All @@ -274,25 +277,71 @@ class EdgeInfo() {
val pg: PropertyGroup = property_groups.get(i)
val properties = pg.getProperties
val num = properties.size
if (num == 0)
if (num == 0) {
return false
}
val pg_file_type = pg.getFile_type_in_gar
}
}
return true
}

/** Get the vertex num file path
*
* @param adj_list_type type of adj list structure.
* @return the vertex num file path. If edge info not support the adj list type,
* raise an IllegalArgumentException error.
*/
def getVerticesNumFilePath(adj_list_type: AdjListType.Value): String = {
if (containAdjList(adj_list_type) == false) {
throw new IllegalArgumentException
}
val str: String = prefix + getAdjListPrefix(adj_list_type) + "vertex_count"
return str
}

/** Get the path prefix of the edge num file path
*
* @param adj_list_type type of adj list structure.
* @return the edge num file path. If edge info not support the adj list type,
* raise an IllegalArgumentException error.
*/
def getEdgesNumPathPrefix(adj_list_type: AdjListType.Value): String = {
if (containAdjList(adj_list_type) == false) {
throw new IllegalArgumentException
}
val str: String = prefix + getAdjListPrefix(adj_list_type) + "edge_count"
return str
}

/** Get the edge num file path of the vertex chunk
*
* @param chunk_index index of vertex chunk.
* @param adj_list_type type of adj list structure.
* @return the edge num file path. If edge info not support the adj list type,
* raise an IllegalArgumentException error.
*/
def getEdgesNumFilePath(chunk_index: Long, adj_list_type: AdjListType.Value): String = {
if (containAdjList(adj_list_type) == false) {
throw new IllegalArgumentException
}
val str: String = prefix + getAdjListPrefix(adj_list_type) + "edge_count" +
chunk_index.toString()
return str
}

/** Get the adj list offset chunk file path of vertex chunk
* the offset chunks is aligned with the vertex chunks
*
* @param chunk_inde index of vertex chunk.
* @param chunk_index index of vertex chunk.
* @param adj_list_type type of adj list structure.
* @return the offset chunk file path. If edge info not support the adj list type,
* raise an IllegalArgumentException error.
*/
def getAdjListOffsetFilePath(chunk_index: Long, adj_list_type: AdjListType.Value) : String = {
if (containAdjList(adj_list_type) == false)
if (containAdjList(adj_list_type) == false) {
throw new IllegalArgumentException
}
val str: String = prefix + getAdjListPrefix(adj_list_type) + "offset/chunk" +
chunk_index.toString()
return str
Expand All @@ -305,8 +354,9 @@ class EdgeInfo() {
* raise an IllegalArgumentException error.
*/
def getOffsetPathPrefix(adj_list_type: AdjListType.Value) : String = {
if (containAdjList(adj_list_type) == false)
if (containAdjList(adj_list_type) == false) {
throw new IllegalArgumentException
}
return prefix + getAdjListPrefix(adj_list_type) + "offset/"
}

Expand Down
15 changes: 10 additions & 5 deletions spark/src/main/scala/com/alibaba/graphar/VertexInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,17 @@ class VertexInfo() {
* @return true if the vertex info is validated, otherwise return false.
*/
def isValidated(): Boolean = {
if (label == "" || chunk_size <= 0)
if (label == "" || chunk_size <= 0) {
return false
}
val len: Int = property_groups.size
for ( i <- 0 to len - 1 ) {
val pg: PropertyGroup = property_groups.get(i)
val properties = pg.getProperties
val num = properties.size
if (num == 0)
if (num == 0) {
return false
}
val file_type = pg.getFile_type_in_gar
}
return true
Expand All @@ -177,15 +179,17 @@ class VertexInfo() {
* @return chunk file path.
*/
def getFilePath(property_group: PropertyGroup, chunk_index: Long): String = {
if (containPropertyGroup(property_group) == false)
if (containPropertyGroup(property_group) == false) {
throw new IllegalArgumentException
}
var str: String = ""
if (property_group.getPrefix == "") {
val properties = property_group.getProperties
val num = properties.size
for ( j <- 0 to num - 1 ) {
if (j > 0)
if (j > 0) {
str += GeneralParams.regularSeperator
}
str += properties.get(j).getName;
}
str += "/"
Expand All @@ -201,8 +205,9 @@ class VertexInfo() {
* @return the path prefix of the property group chunk files.
*/
def getPathPrefix(property_group: PropertyGroup): String = {
if (containPropertyGroup(property_group) == false)
if (containPropertyGroup(property_group) == false) {
throw new IllegalArgumentException
}
var str: String = ""
if (property_group.getPrefix == "") {
val properties = property_group.getProperties
Expand Down
64 changes: 56 additions & 8 deletions spark/src/main/scala/com/alibaba/graphar/reader/EdgeReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.alibaba.graphar.reader
import com.alibaba.graphar.utils.{IndexGenerator, DataFrameConcat}
import com.alibaba.graphar.{GeneralParams, EdgeInfo, FileType, AdjListType, PropertyGroup}
import com.alibaba.graphar.datasources._
import com.alibaba.graphar.utils.FileSystem

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types._
Expand All @@ -38,15 +39,55 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
throw new IllegalArgumentException
}

/** Load the total number of src/dst vertices for this edge type. */
def readVerticesNumber(): Long = {
val file_path = prefix + "/" + edgeInfo.getVerticesNumFilePath(adjListType)
val number = FileSystem.readValue(file_path, spark.sparkContext.hadoopConfiguration)
return number
}

/** Load the chunk number of src/dst vertices. */
def readVertexChunkNumber(): Long = {
val vertices_number = readVerticesNumber
var vertex_chunk_size = edgeInfo.getSrc_chunk_size
if (adjListType == AdjListType.ordered_by_dest || adjListType == AdjListType.unordered_by_dest) {
vertex_chunk_size = edgeInfo.getDst_chunk_size
}
val vertex_chunk_number = (vertices_number + vertex_chunk_size - 1) / vertex_chunk_size
return vertex_chunk_number
}

/** Load the number of edges for the vertex chunk.
*
* @param chunk_index index of vertex chunk
* @return the number of edges
*/
def readEdgesNumber(chunk_index: Long): Long = {
val file_path = prefix + "/" + edgeInfo.getEdgesNumFilePath(chunk_index, adjListType)
val number = FileSystem.readValue(file_path, spark.sparkContext.hadoopConfiguration)
return number
}

/** Load the total number of edges for this edge type. */
def readEdgesNumber(): Long = {
val vertex_chunk_number = readVertexChunkNumber
var number: Long = 0
for (i <- 0L until vertex_chunk_number) {
number += readEdgesNumber(i)
}
return number
}

/** Load a single offset chunk as a DataFrame.
*
* @param chunk_index index of offset chunk
* @return offset chunk DataFrame. Raise IllegalArgumentException if adjListType is not
* AdjListType.ordered_by_source or AdjListType.ordered_by_dest.
*/
def readOffset(chunk_index: Long): DataFrame = {
if (adjListType != AdjListType.ordered_by_source && adjListType != AdjListType.ordered_by_dest)
if (adjListType != AdjListType.ordered_by_source && adjListType != AdjListType.ordered_by_dest) {
throw new IllegalArgumentException
}
val file_type_in_gar = edgeInfo.getAdjListFileType(adjListType)
val file_type = FileType.FileTypeToString(file_type_in_gar)
val file_path = prefix + edgeInfo.getAdjListOffsetFilePath(chunk_index, adjListType)
Expand Down Expand Up @@ -112,8 +153,9 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* raise an IllegalArgumentException error.
*/
def readEdgePropertyChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, chunk_index: Long): DataFrame = {
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) {
throw new IllegalArgumentException
}
val file_type = propertyGroup.getFile_type()
val file_path = prefix + edgeInfo.getPropertyFilePath(propertyGroup, adjListType, vertex_chunk_index, chunk_index)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
Expand All @@ -129,8 +171,9 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* If edge info does not contain the property group, raise an IllegalArgumentException error.
*/
def readEdgePropertyGroupForVertexChunk(propertyGroup: PropertyGroup, vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) {
throw new IllegalArgumentException
}
val file_type = propertyGroup.getFile_type()
val file_path = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType, vertex_chunk_index)
val df = spark.read.option("fileFormat", file_type).option("header", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
Expand All @@ -149,8 +192,9 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
* If edge info does not contain the property group, raise an IllegalArgumentException error.
*/
def readEdgePropertyGroup(propertyGroup: PropertyGroup, addIndex: Boolean = true): DataFrame = {
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false)
if (edgeInfo.containPropertyGroup(propertyGroup, adjListType) == false) {
throw new IllegalArgumentException
}
val file_type = propertyGroup.getFile_type()
val file_path = prefix + edgeInfo.getPropertyGroupPathPrefix(propertyGroup, adjListType)
val df = spark.read.option("fileFormat", file_type).option("header", "true").option("recursiveFileLookup", "true").format("com.alibaba.graphar.datasources.GarDataSource").load(file_path)
Expand All @@ -170,13 +214,15 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
*/
def readMultipleEdgePropertyGroupsForVertexChunk(propertyGroups: java.util.ArrayList[PropertyGroup], vertex_chunk_index: Long, addIndex: Boolean = true): DataFrame = {
val len: Int = propertyGroups.size
if (len == 0)
if (len == 0) {
return spark.emptyDataFrame
}

val pg0: PropertyGroup = propertyGroups.get(0)
val df0 = readEdgePropertyGroupForVertexChunk(pg0, vertex_chunk_index, false)
if (len == 1)
if (len == 1) {
return df0
}

var rdd = df0.rdd
var schema_array = df0.schema.fields
Expand Down Expand Up @@ -204,13 +250,15 @@ class EdgeReader(prefix: String, edgeInfo: EdgeInfo, adjListType: AdjListType.V
*/
def readMultipleEdgePropertyGroups(propertyGroups: java.util.ArrayList[PropertyGroup], addIndex: Boolean = true): DataFrame = {
val len: Int = propertyGroups.size
if (len == 0)
if (len == 0) {
return spark.emptyDataFrame
}

val pg0: PropertyGroup = propertyGroups.get(0)
val df0 = readEdgePropertyGroup(pg0, false)
if (len == 1)
if (len == 1) {
return df0
}

var rdd = df0.rdd
var schema_array = df0.schema.fields
Expand Down
Loading

0 comments on commit 8a3462f

Please sign in to comment.