Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Doc][Spark] Update the doc: fix the outdated argument annotations and typo #267

Merged
merged 4 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/applications/spark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ The `putVertexDataIntoNeo4j` and `putEdgeDataIntoNeo4j` methods creates or updat
val sourceDf = vertexData(sourceLabel)
val targetDf = vertexData(targetLabel)
// convert the source and target index column to the primary key column
val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first dataframe of (adj_list_type_str, dataframe) map
val df = Utils.joinEdgesWithVertexPrimaryKey(value.head._2, sourceDf, targetDf, sourcePrimaryKey, targetPrimaryKey) // use the first DataFrame of (adj_list_type_str, DataFrame) map

val properties = if (edgeLabel == "REVIEWED") "rating,summary" else ""

Expand Down
4 changes: 2 additions & 2 deletions docs/user-guide/spark-lib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ To utilize the GAR Spark writer, please refer to the following example code.

.. code:: scala

// generate the vertex index column for vertex dataframe
// generate the vertex index column for vertex DataFrame
val vertex_df = ...
val vertex_df_with_index = IndexGenerator.generateVertexIndexColumn(vertex_df)
// construct the vertex writer
Expand All @@ -145,7 +145,7 @@ To utilize the GAR Spark writer, please refer to the following example code.
// write all properties
writer.writeVertexProperties()

// generate vertex index for edge dataframe
// generate vertex index for edge DataFrame
val edge_df = ...
val edge_df_with_index = IndexGenerator.generateSrcAndDstIndexUnitedlyForEdges(edge_df, "src", "dst")
// construct the edge writer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object GraphAr2Nebula {
targetDF,
sourcePrimaryKey,
targetPrimaryKey
) // use the first dataframe of (adj_list_type_str, dataframe) map
) // use the first DataFrame of (adj_list_type_str, DataFrame) map

writeEdge(edgeType, "src", "dst", "_rank", df)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ object GraphAr2Neo4j {
targetDf,
sourcePrimaryKey,
targetPrimaryKey
) // use the first dataframe of (adj_list_type_str, dataframe) map
) // use the first DataFrame of (adj_list_type_str, DataFrame) map

// FIXME: use properties message in edge info
val properties = if (edgeLabel == "REVIEWED") "rating,summary" else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ object GraphReader {
* @param addVertexIndex
* Whether to add index for the vertex DataFrames.
* @return
* Pair of vertex dataframes and edge dataframes, the vertex dataframes are
* stored as the map of (vertex_label -> DataFrame) the edge dataframes are
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
* stored as a map of ((srcLabel, edgeLabel, dstLabel) -> (adj_list_type_str
* -> DataFrame))
*/
Expand Down Expand Up @@ -145,8 +145,8 @@ object GraphReader {
* @param addVertexIndex
* Whether to add index for the vertex DataFrames.
* @return
* Pair of vertex dataframes and edge dataframes, the vertex dataframes are
* stored as the map of (vertex_label -> DataFrame) the edge dataframes are
* Pair of vertex DataFrames and edge DataFrames, the vertex DataFrames are
* stored as the map of (vertex_label -> DataFrame) the edge DataFrames are
* stored as a map of (srcLabel_edgeLabel_dstLabel -> (adj_list_type_str ->
* DataFrame))
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import java.io.{BufferedWriter, OutputStreamWriter}
class GraphWriter() {

/**
* Put the vertex dataframe into writer.
* Put the vertex DataFrame into writer.
*
* @param label
* label of vertex.
* @param df
* dataframe of the vertex type.
* DataFrame of the vertex type.
* @param primaryKey
* primary key of the vertex type, default is empty, which take the first
* property column as primary key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object DataFrameConcat {
* @param df2
* The second DataFrame.
* @return
* The result DataFrame that concats the two DataFrames.
* The result DataFrame that concatenate the two DataFrames.
*/
def concat(df1: DataFrame, df2: DataFrame): DataFrame = {
val spark = df1.sparkSession
Expand Down
40 changes: 32 additions & 8 deletions spark/src/main/scala/com/alibaba/graphar/util/FileSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,24 @@ import org.apache.hadoop.conf.Configuration

import com.alibaba.graphar.GeneralParams

/** Helper object to write dataframe to chunk files */
/** Helper object to write DataFrame to chunk files */
object FileSystem {

/**
* Write input dataframe to output path with certain file format.
* Write input DataFrame to output path with certain file format.
*
* @param dataframe
* @param dataFrame
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFrame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameter name is dataFrame

* DataFrame to write out.
* @param fileType
* output file format type, the value could be csv|parquet|orc.
* @param outputPrefix
* output path prefix.
* @param startChunkIndex
* the start index of chunk.
* @param offsetStartChunkIndex[Optional]
* the start index of offset chunk, if not empty, that means writing a
* offset DataFrame.
* @param aggNumListOfEdgeChunk[Optional]
* the aggregated number list of edge chunk, if not empty, that means
* writing a edge DataFrame.
*/
def writeDataFrame(
dataFrame: DataFrame,
Expand All @@ -61,7 +65,7 @@ object FileSystem {
}
fs.close()

// write offset chunks dataframe
// write offset chunks DataFrame
if (!offsetStartChunkIndex.isEmpty) {
return dataFrame.write
.mode("append")
Expand All @@ -74,7 +78,7 @@ object FileSystem {
.format("com.alibaba.graphar.datasources.GarDataSource")
.save(outputPrefix)
}
// write edge chunks dataframe
// write edge chunks DataFrame
if (!aggNumListOfEdgeChunk.isEmpty) {
implicit val formats =
DefaultFormats // initialize a default formats for json4s
Expand All @@ -89,7 +93,7 @@ object FileSystem {
.format("com.alibaba.graphar.datasources.GarDataSource")
.save(outputPrefix)
}
// write vertex chunks dataframe
// write vertex chunks DataFrame
dataFrame.write
.mode("append")
.option("header", "true")
Expand All @@ -98,6 +102,16 @@ object FileSystem {
.save(outputPrefix)
}

/**
* Write input value to output path.
*
* @param value
* Value to write out.
* @param outputPrefix
* output path prefix.
* @param hadoopConfig
* hadoop configuration.
*/
def writeValue(
value: Long,
outputPath: String,
Expand All @@ -112,6 +126,16 @@ object FileSystem {
fs.close()
}

/**
* Read a value from input path.
*
* @param inputPath
* Input path.
* @param hadoopConfig
* hadoop configuration.
* @return
* The value read from input path.
*/
def readValue(inputPath: String, hadoopConfig: Configuration): Long = {
val path = new Path(inputPath)
val fs = path.getFileSystem(hadoopConfig)
Expand Down
16 changes: 8 additions & 8 deletions spark/src/main/scala/com/alibaba/graphar/writer/EdgeWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ object EdgeWriter {
edgeChunkSize.toInt
)

// repartition edge dataframe and sort within partitions
// repartition edge DataFrame and sort within partitions
val partitionRDD =
rddWithEid.repartitionAndSortWithinPartitions(partitioner).values
val partitionEdgeDf = spark.createDataFrame(partitionRDD, edgeSchema)
partitionEdgeDf.cache()

// generate offset dataframes
// generate offset DataFrames
if (
adjListType == AdjListType.ordered_by_source || adjListType == AdjListType.ordered_by_dest
) {
Expand Down Expand Up @@ -190,10 +190,10 @@ object EdgeWriter {
}

/**
* Writer for edge dataframe.
* Writer for edge DataFrame.
*
* @constructor
* create a new writer for edge dataframe with edge info.
* create a new writer for edge DataFrame with edge info.
* @param prefix
* the absolute prefix.
* @param edgeInfo
Expand Down Expand Up @@ -288,7 +288,7 @@ class EdgeWriter(
}
}

/** Generate the chunks of AdjList from edge dataframe for this edge type. */
/** Generate the chunks of AdjList from edge DataFrame for this edge type. */
def writeAdjList(): Unit = {
val fileType = edgeInfo.getAdjListFileType(adjListType)
val outputPrefix = prefix + edgeInfo.getAdjListPathPrefix(adjListType)
Expand All @@ -312,7 +312,7 @@ class EdgeWriter(
}

/**
* Generate the chunks of the property group from edge dataframe.
* Generate the chunks of the property group from edge DataFrame.
*
* @param propertyGroup
* property group
Expand Down Expand Up @@ -340,7 +340,7 @@ class EdgeWriter(
)
}

/** Generate the chunks of all property groups from edge dataframe. */
/** Generate the chunks of all property groups from edge DataFrame. */
def writeEdgeProperties(): Unit = {
val property_groups = edgeInfo.getPropertyGroups(adjListType)
val it = property_groups.iterator
Expand All @@ -352,7 +352,7 @@ class EdgeWriter(

/**
* Generate the chunks for the AdjList and all property groups from edge
* dataframe.
* DataFrame.
*/
def writeEdges(): Unit = {
writeAdjList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class VertexWriter(
)

private def validate(): Unit = {
// check if vertex dataframe contains the index_filed
// check if vertex DataFrame contains the index_filed
val index_filed = StructField(GeneralParams.vertexIndexCol, LongType)
if (vertexDf.schema.contains(index_filed) == false) {
throw new IllegalArgumentException
Expand All @@ -95,7 +95,7 @@ class VertexWriter(
}

/**
* Generate chunks of the property group for vertex dataframe.
* Generate chunks of the property group for vertex DataFrame.
*
* @param propertyGroup
* property group
Expand Down Expand Up @@ -124,7 +124,7 @@ class VertexWriter(
)
}

/** Generate chunks of all property groups for vertex dataframe. */
/** Generate chunks of all property groups for vertex DataFrame. */
def writeVertexProperties(): Unit = {
val property_groups = vertexInfo.getProperty_groups()
val it = property_groups.iterator
Expand Down
4 changes: 2 additions & 2 deletions spark/src/test/scala/com/alibaba/graphar/ComputeExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ComputeExampleSuite extends AnyFunSuite {
.getOrCreate()

test("run cc using graphx") {
// read vertex dataframe
// read vertex DataFrame
val file_path = "gar-test/ldbc_sample/parquet/"
val prefix = getClass.getClassLoader.getResource(file_path).getPath
val vertex_yaml = getClass.getClassLoader
Expand All @@ -45,7 +45,7 @@ class ComputeExampleSuite extends AnyFunSuite {
assert(vertex_df.columns.size == 5)
assert(vertex_df.count() == vertices_num)

// read edge dataframe
// read edge DataFrame
val edge_yaml = getClass.getClassLoader
.getResource(file_path + "person_knows_person.edge.yml")
.getPath
Expand Down
18 changes: 9 additions & 9 deletions spark/src/test/scala/com/alibaba/graphar/TestWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class WriterSuite extends AnyFunSuite {
.getOrCreate()

test("test vertex writer with only vertex table") {
// read vertex dataframe
// read vertex DataFrame
val file_path = getClass.getClassLoader
.getResource("gar-test/ldbc_sample/person_0_0.csv")
.getPath
Expand All @@ -50,7 +50,7 @@ class WriterSuite extends AnyFunSuite {
.getPath
val vertex_info = VertexInfo.loadVertexInfo(vertex_yaml_path, spark)

// generate vertex index column for vertex dataframe
// generate vertex index column for vertex DataFrame
val vertex_df_with_index =
util.IndexGenerator.generateVertexIndexColumn(vertex_df)

Expand Down Expand Up @@ -90,7 +90,7 @@ class WriterSuite extends AnyFunSuite {
}

test("test edge writer with only edge table") {
// read edge dataframe
// read edge DataFrame
val file_path = getClass.getClassLoader
.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv")
.getPath
Expand All @@ -111,7 +111,7 @@ class WriterSuite extends AnyFunSuite {
val edge_info = EdgeInfo.loadEdgeInfo(edge_yaml_path, spark)
val adj_list_type = AdjListType.ordered_by_source

// generate vertex index for edge dataframe
// generate vertex index for edge DataFrame
val srcDf = edge_df.select("src").withColumnRenamed("src", "vertex")
val dstDf = edge_df.select("dst").withColumnRenamed("dst", "vertex")
val vertex_num = srcDf.union(dstDf).distinct().count()
Expand Down Expand Up @@ -189,7 +189,7 @@ class WriterSuite extends AnyFunSuite {
assertThrows[IllegalArgumentException](
writer.writeEdgeProperties(invalid_property_group)
)
// throw exception if not generate src index and dst index for edge dataframe
// throw exception if not generate src index and dst index for edge DataFrame
assertThrows[IllegalArgumentException](
new EdgeWriter(
prefix,
Expand Down Expand Up @@ -217,7 +217,7 @@ class WriterSuite extends AnyFunSuite {
}

test("test edge writer with vertex table and edge table") {
// read vertex dataframe
// read vertex DataFrame
val vertex_file_path = getClass.getClassLoader
.getResource("gar-test/ldbc_sample/person_0_0.csv")
.getPath
Expand All @@ -227,7 +227,7 @@ class WriterSuite extends AnyFunSuite {
.csv(vertex_file_path)
val vertex_num = vertex_df.count()

// read edge dataframe
// read edge DataFrame
val file_path = getClass.getClassLoader
.getResource("gar-test/ldbc_sample/person_knows_person_0_0.csv")
.getPath
Expand Down Expand Up @@ -258,12 +258,12 @@ class WriterSuite extends AnyFunSuite {
val vertex_chunk_num =
(vertex_num + vertex_chunk_size - 1) / vertex_chunk_size

// construct person vertex mapping with dataframe
// construct person vertex mapping with DataFrame
val vertex_mapping = util.IndexGenerator.constructVertexIndexMapping(
vertex_df,
vertex_info.getPrimaryKey()
)
// generate src index and dst index for edge datafram with vertex mapping
// generate src index and dst index for edge DataFrame with vertex mapping
val edge_df_with_src_index = util.IndexGenerator
.generateSrcIndexForEdgesFromMapping(edge_df, "src", vertex_mapping)
val edge_df_with_src_dst_index =
Expand Down