Skip to content

Commit 894ecde

Browse files
jegonzalankurdave
authored andcommitted
Synthetic GraphX Benchmark
This PR accomplishes two things: 1. It introduces a Synthetic Benchmark application that generates an arbitrarily large log-normal graph and executes either PageRank or connected components on the graph. This can be used to profile GraphX system on arbitrary clusters without access to large graph datasets 2. This PR improves the implementation of the log-normal graph generator. Author: Joseph E. Gonzalez <[email protected]> Author: Ankur Dave <[email protected]> Closes #720 from jegonzal/graphx_synth_benchmark and squashes the following commits: e40812a [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 bccccad [Ankur Dave] Fix long lines 374678a [Ankur Dave] Bugfix and style changes 1bdf39a [Joseph E. Gonzalez] updating options d943972 [Joseph E. Gonzalez] moving the benchmark application into the examples folder. f4f839a [Joseph E. Gonzalez] Creating a synthetic benchmark script.
1 parent aa41a52 commit 894ecde

File tree

4 files changed

+171
-11
lines changed

4 files changed

+171
-11
lines changed
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.graphx
19+
20+
import org.apache.spark.SparkContext._
21+
import org.apache.spark.graphx.PartitionStrategy
22+
import org.apache.spark.{SparkContext, SparkConf}
23+
import org.apache.spark.graphx.util.GraphGenerators
24+
import java.io.{PrintWriter, FileOutputStream}
25+
26+
/**
27+
* The SynthBenchmark application can be used to run various GraphX algorithms on
28+
* synthetic log-normal graphs. The intent of this code is to enable users to
29+
* profile the GraphX system without access to large graph datasets.
30+
*/
31+
object SynthBenchmark {
32+
33+
/**
34+
* To run this program use the following:
35+
*
36+
* MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
37+
*
38+
* Options:
39+
* -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
40+
* -niters the number of iterations of pagerank to use (Default: 10)
41+
* -numVertices the number of vertices in the graph (Default: 1000000)
42+
* -numEPart the number of edge partitions in the graph (Default: number of cores)
43+
* -partStrategy the graph partitioning strategy to use
44+
* -mu the mean parameter for the log-normal graph (Default: 4.0)
45+
* -sigma the stdev parameter for the log-normal graph (Default: 1.3)
46+
* -degFile the local file to save the degree information (Default: Empty)
47+
*/
48+
def main(args: Array[String]) {
49+
val options = args.map {
50+
arg =>
51+
arg.dropWhile(_ == '-').split('=') match {
52+
case Array(opt, v) => (opt -> v)
53+
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
54+
}
55+
}
56+
57+
var app = "pagerank"
58+
var niter = 10
59+
var numVertices = 100000
60+
var numEPart: Option[Int] = None
61+
var partitionStrategy: Option[PartitionStrategy] = None
62+
var mu: Double = 4.0
63+
var sigma: Double = 1.3
64+
var degFile: String = ""
65+
66+
options.foreach {
67+
case ("app", v) => app = v
68+
case ("niter", v) => niter = v.toInt
69+
case ("nverts", v) => numVertices = v.toInt
70+
case ("numEPart", v) => numEPart = Some(v.toInt)
71+
case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
72+
case ("mu", v) => mu = v.toDouble
73+
case ("sigma", v) => sigma = v.toDouble
74+
case ("degFile", v) => degFile = v
75+
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
76+
}
77+
78+
val conf = new SparkConf()
79+
.setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
80+
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
81+
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
82+
83+
val sc = new SparkContext(conf)
84+
85+
// Create the graph
86+
println(s"Creating graph...")
87+
val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
88+
numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
89+
// Repartition the graph
90+
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()
91+
92+
var startTime = System.currentTimeMillis()
93+
val numEdges = graph.edges.count()
94+
println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
95+
val loadTime = System.currentTimeMillis() - startTime
96+
97+
// Collect the degree distribution (if desired)
98+
if (!degFile.isEmpty) {
99+
val fos = new FileOutputStream(degFile)
100+
val pos = new PrintWriter(fos)
101+
val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
102+
.map(p => p._2).countByValue()
103+
hist.foreach {
104+
case (deg, count) => pos.println(s"$deg \t $count")
105+
}
106+
}
107+
108+
// Run PageRank
109+
startTime = System.currentTimeMillis()
110+
if (app == "pagerank") {
111+
println("Running PageRank")
112+
val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
113+
println(s"Total PageRank = $totalPR")
114+
} else if (app == "cc") {
115+
println("Running Connected Components")
116+
val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
117+
println(s"Number of components = $numComponents")
118+
}
119+
val runTime = System.currentTimeMillis() - startTime
120+
121+
println(s"Num Vertices = $numVertices")
122+
println(s"Num Edges = $numEdges")
123+
println(s"Creation time = ${loadTime/1000.0} seconds")
124+
println(s"Run time = ${runTime/1000.0} seconds")
125+
126+
sc.stop()
127+
}
128+
}

graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,13 @@ object PartitionStrategy {
119119
math.abs((lower, higher).hashCode()) % numParts
120120
}
121121
}
122+
123+
/** Returns the PartitionStrategy with the specified name. */
124+
def fromString(s: String): PartitionStrategy = s match {
125+
case "RandomVertexCut" => RandomVertexCut
126+
case "EdgePartition1D" => EdgePartition1D
127+
case "EdgePartition2D" => EdgePartition2D
128+
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
129+
case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s)
130+
}
122131
}

graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,42 @@ object GraphGenerators {
3838
val RMATa = 0.45
3939
val RMATb = 0.15
4040
val RMATd = 0.25
41+
4142
/**
4243
* Generate a graph whose vertex out degree is log normal.
44+
*
45+
* The default values for mu and sigma are taken from the Pregel paper:
46+
*
47+
* Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert,
48+
* Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
49+
* Pregel: a system for large-scale graph processing. SIGMOD '10.
50+
*
51+
* @param sc
52+
* @param numVertices
53+
* @param mu
54+
* @param sigma
55+
* @return
4356
*/
44-
def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
45-
// based on Pregel settings
46-
val mu = 4
47-
val sigma = 1.3
48-
49-
val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
50-
src => (src, sampleLogNormal(mu, sigma, numVertices))
57+
def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
58+
mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
59+
val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
60+
// Initialize the random number generator with the source vertex id
61+
val rand = new Random(src)
62+
val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong)
63+
(src.toLong, degree)
5164
}
52-
val edges = vertices.flatMap { v =>
53-
generateRandomEdges(v._1.toInt, v._2, numVertices)
65+
val edges = vertices.flatMap { case (src, degree) =>
66+
new Iterator[Edge[Int]] {
67+
// Initialize the random number generator with the source vertex id
68+
val rand = new Random(src)
69+
var i = 0
70+
override def hasNext(): Boolean = { i < degree }
71+
override def next(): Edge[Int] = {
72+
val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
73+
i += 1
74+
nextEdge
75+
}
76+
}
5477
}
5578
Graph(vertices, edges, 0)
5679
}

project/MimaExcludes.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ object MimaExcludes {
3535
val excludes =
3636
SparkBuild.SPARK_VERSION match {
3737
case v if v.startsWith("1.1") =>
38-
Seq()
38+
Seq(
39+
MimaBuild.excludeSparkPackage("graphx"))
3940
case v if v.startsWith("1.0") =>
4041
Seq(
4142
MimaBuild.excludeSparkPackage("api.java"),
@@ -58,4 +59,3 @@ object MimaExcludes {
5859
case _ => Seq()
5960
}
6061
}
61-

0 commit comments

Comments
 (0)