Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ class Graph[VD, ED] {
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED]
def mapVerticesConserve(map: (VertexID, VD) => VD): Graph[VD, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
Expand All @@ -338,6 +339,9 @@ class Graph[VD, ED] {
def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD2)
: Graph[VD2, ED]
def outerJoinVerticesConserve[U](other: RDD[(VertexID, U)])
(mapFunc: (VertexID, VD, Option[U]) => VD)
: Graph[VD, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]]
Expand Down Expand Up @@ -369,6 +373,7 @@ graph contains the following:
{% highlight scala %}
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapVerticesConserve(map: (VertexId, VD) => VD): Graph[VD, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
Expand All @@ -392,6 +397,10 @@ val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))

[Graph.mapVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)⇒VD2)(ClassTag[VD2]):Graph[VD2,ED]

When a call to `mapVertices` would not change the vertex attribute type, use the
`mapVerticesConserve` operator for better performance. This version of the operator avoids moving
unchanged vertex attributes when updating the triplets view.

These operators are often used to initialize the graph for a particular computation or project away
unnecessary properties. For example, given a graph with the out-degrees as the vertex properties
(we describe how to construct such a graph later), we initialize it for PageRank:
Expand Down Expand Up @@ -506,6 +515,8 @@ class Graph[VD, ED] {
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
def outerJoinVerticesConserve[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD)
: Graph[VD, ED]
}
{% endhighlight %}

Expand Down Expand Up @@ -533,6 +544,10 @@ property type. Because not all vertices may have a matching value in the input
function takes an `Option` type. For example, we can setup a graph for PageRank by initializing
vertex properties with their `outDegree`.

Similarly to `mapVerticesConserve`, when a call to `outerJoinVertices` would not change the vertex
attribute type, use the `outerJoinVerticesConserve` operator for better performance. This version of
the operator avoids moving unchanged vertex attributes when updating the triplets view.

[Graph.outerJoinVertices]: api/scala/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])⇒VD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED]


Expand Down Expand Up @@ -748,7 +763,7 @@ class GraphOps[VD, ED] {
// Run the vertex program on all vertices that receive messages
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Merge the new vertex values back into the graph
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
g = g.outerJoinVerticesConserve(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
// Send Messages: ------------------------------------------------------------------------------
// Vertices that didn't receive a message above don't appear in newVerts and therefore don't
// get to send messages. More precisely the map phase of mapReduceTriplets is only invoked
Expand Down
32 changes: 32 additions & 0 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,19 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
*/
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

/**
* Transforms each vertex attribute in the graph using the map function. Like [[mapVertices]], but
* since the type is conserved, is able to avoid moving unchanged vertex attributes when updating
* the triplets view.
*
* @note The new graph has the same structure. As a consequence the underlying index structures
* can be reused.
*
* @param map the function from a vertex object to a new vertex value of the same type
*
*/
def mapVerticesConserve(map: (VertexId, VD) => VD): Graph[VD, ED]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we just use mapVertices, but compare the two classtags and call the right mapVerticesConserve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying to detect whether to call mapVerticesConserve by comparing ClassTags? That's the point of the change -- comparing ClassTags can give a false positive when the type is erased (e.g., classTag[Option[Int]] == classTag[Option[String]]), so it's unsafe to rely on it. See https://issues.apache.org/jira/browse/SPARK-1552.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would TypeTags solve this problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.


/**
* Transforms each edge attribute in the graph using the map function. The map function is not
* passed the vertex value for the vertices adjacent to the edge. If vertex values are desired,
Expand Down Expand Up @@ -341,6 +354,25 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]

/**
* Joins the vertices with entries in the `table` RDD and merges the results using `mapFunc`. Like
* [[outerJoinVertices]], but since the type is conserved, is able to avoid moving unchanged
* vertex attributes when updating the triplets view.
*
* The input table should contain at most one entry for each vertex. If no entry in `other` is
* provided for a particular vertex in the graph, the map function receives `None`.
*
* @tparam U the type of entry in the table of updates
*
* @param other the table to join with the vertices in the graph.
* The table should contain at most one entry for each vertex.
* @param mapFunc the function used to compute the new vertex values. The map function is invoked
* for all vertices, even those that do not have a corresponding entry in the table. It must
* conserve the original vertex attribute type.
*/
def outerJoinVerticesConserve[U: ClassTag](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD): Graph[VD, ED]

/**
* The associated [[GraphOps]] object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
case None => data
}
}
graph.outerJoinVertices(table)(uf)
graph.outerJoinVerticesConserve(table)(uf)
}

/**
Expand Down
4 changes: 2 additions & 2 deletions graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object Pregel extends Logging {
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
var g = graph.mapVerticesConserve((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// compute the messages
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
Expand All @@ -131,7 +131,7 @@ object Pregel extends Logging {
val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
// Update the graph with the new vertices.
prevG = g
g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
g = g.outerJoinVerticesConserve(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }
g.cache()

val oldMessages = messages
Expand Down
50 changes: 25 additions & 25 deletions graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,17 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
}

override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
vertices.cache()
// The map preserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// The map does not preserve type, so we must re-replicate all vertices
GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
}
// The map does not conserve type, so we must re-replicate all vertices
GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges)
}

override def mapVerticesConserve(f: (VertexId, VD) => VD): Graph[VD, ED] = {
vertices.cache()
// The map conserves type, so we can use incremental replication
val newVerts = vertices.mapVertexPartitions(_.map(f)).cache()
val changedVerts = vertices.diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
}

override def mapEdges[ED2: ClassTag](
Expand Down Expand Up @@ -229,19 +228,20 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def outerJoinVertices[U: ClassTag, VD2: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = {
if (classTag[VD] equals classTag[VD2]) {
vertices.cache()
// updateF preserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]]
.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
} else {
// updateF does not preserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, replicatedVertexView.edges)
}
// updateF does not conserve type, so we must re-replicate all vertices
val newVerts = vertices.leftJoin(other)(updateF)
GraphImpl(newVerts, replicatedVertexView.edges)
}

override def outerJoinVerticesConserve[U: ClassTag]
(other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD): Graph[VD, ED] = {
vertices.cache()
// updateF conserves type, so we can use incremental replication
val newVerts = vertices.leftJoin(other)(updateF).cache()
val changedVerts = vertices.diff(newVerts)
val newReplicatedVertexView = replicatedVertexView.updateVertices(changedVerts)
new GraphImpl(newVerts, newReplicatedVertexView)
}

/** Test whether the closure accesses the the attribute with name `attrName`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object SVDPlusPlus {
et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
(g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))

g = g.outerJoinVertices(t0) {
g = g.outerJoinVerticesConserve(t0) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
Expand Down Expand Up @@ -112,7 +112,7 @@ object SVDPlusPlus {
val t1 = g.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr._2)),
(g1: DoubleMatrix, g2: DoubleMatrix) => g1.addColumnVector(g2))
g = g.outerJoinVertices(t1) {
g = g.outerJoinVerticesConserve(t1) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[DoubleMatrix]) =>
if (msg.isDefined) (vd._1, vd._1
Expand All @@ -125,7 +125,7 @@ object SVDPlusPlus {
mapTrainF(conf, u),
(g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
(g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
g = g.outerJoinVertices(t2) {
g = g.outerJoinVerticesConserve(t2) {
(vid: VertexId,
vd: (DoubleMatrix, DoubleMatrix, Double, Double),
msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
Expand All @@ -149,7 +149,7 @@ object SVDPlusPlus {
}
g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
g = g.outerJoinVertices(t3) {
g = g.outerJoinVerticesConserve(t3) {
(vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ object StronglyConnectedComponents {
iter += 1
do {
numVertices = sccWorkGraph.numVertices
sccWorkGraph = sccWorkGraph.outerJoinVertices(sccWorkGraph.outDegrees) {
sccWorkGraph = sccWorkGraph.outerJoinVerticesConserve(sccWorkGraph.outDegrees) {
(vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
}.outerJoinVertices(sccWorkGraph.inDegrees) {
}.outerJoinVerticesConserve(sccWorkGraph.inDegrees) {
(vid, data, degreeOpt) => if (degreeOpt.isDefined) data else (vid, true)
}.cache()

Expand All @@ -60,14 +60,16 @@ object StronglyConnectedComponents {
.mapValues { (vid, data) => data._1}

// write values to sccGraph
sccGraph = sccGraph.outerJoinVertices(finalVertices) {
sccGraph = sccGraph.outerJoinVerticesConserve(finalVertices) {
(vid, scc, opt) => opt.getOrElse(scc)
}
// only keep vertices that are not final
sccWorkGraph = sccWorkGraph.subgraph(vpred = (vid, data) => !data._2).cache()
} while (sccWorkGraph.numVertices < numVertices)

sccWorkGraph = sccWorkGraph.mapVertices{ case (vid, (color, isFinal)) => (vid, isFinal) }
sccWorkGraph = sccWorkGraph.mapVerticesConserve {
case (vid, (color, isFinal)) => (vid, isFinal)
}

// collect min of all my neighbor's scc values, update if it's smaller than mine
// then notify any neighbors with scc values larger than mine
Expand Down
52 changes: 50 additions & 2 deletions graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
// mapVertices preserving type
// mapVertices conserving type
val mappedVAttrs = star.mapVertices((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
// mapVertices changing type
Expand All @@ -159,6 +159,40 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("mapVertices changing type with same erased type") {
withSpark { sc =>
val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])](
(1L, Some(1)),
(2L, Some(2)),
(3L, Some(3))
))
val edges = sc.parallelize(Array(
Edge(1L, 2L, 0),
Edge(2L, 3L, 0),
Edge(3L, 1L, 0)
))
val graph0 = Graph(vertices, edges)
// Trigger initial vertex replication
graph0.triplets.foreach(x => {})
// Change type of replicated vertices, but conserve erased type
val graph1 = graph0.mapVertices {
case (vid, integerOpt) => integerOpt.map((x: java.lang.Integer) => (x.toDouble): java.lang.Double)
}
// Access replicated vertices, exposing the erased type
val graph2 = graph1.mapTriplets(t => t.srcAttr.get)
assert(graph2.edges.map(_.attr).collect.toSet === Set[java.lang.Double](1.0, 2.0, 3.0))
}
}

test("mapVerticesConserve") {
withSpark { sc =>
val n = 5
val star = starGraph(sc, n)
val mappedVAttrs = star.mapVerticesConserve((vid, attr) => attr + "2")
assert(mappedVAttrs.vertices.collect.toSet === (0 to n).map(x => (x: VertexId, "v2")).toSet)
}
}

test("mapEdges") {
withSpark { sc =>
val n = 3
Expand Down Expand Up @@ -297,14 +331,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val n = 5
val reverseStar = starGraph(sc, n).reverse.cache()

// outerJoinVertices changing type
val reverseStarDegrees =
reverseStar.outerJoinVertices(reverseStar.outDegrees) { (vid, a, bOpt) => bOpt.getOrElse(0) }
val neighborDegreeSums = reverseStarDegrees.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr), (et.dstId, et.srcAttr)),
(a: Int, b: Int) => a + b).collect.toSet
assert(neighborDegreeSums === Set((0: VertexId, n)) ++ (1 to n).map(x => (x: VertexId, 0)))
// outerJoinVertices preserving type

// outerJoinVertices conserving type
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
reverseStar.outerJoinVertices(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
Expand All @@ -313,6 +349,18 @@ class GraphSuite extends FunSuite with LocalSparkContext {
}
}

test("outerJoinVerticesConserve") {
withSpark { sc =>
val n = 5
val reverseStar = starGraph(sc, n).reverse.cache()
val messages = reverseStar.vertices.mapValues { (vid, attr) => vid.toString }
val newReverseStar =
reverseStar.outerJoinVerticesConserve(messages) { (vid, a, bOpt) => a + bOpt.getOrElse("") }
assert(newReverseStar.vertices.map(_._2).collect.toSet ===
(0 to n).map(x => "v%d".format(x)).toSet)
}
}

test("more edge partitions than vertex partitions") {
withSpark { sc =>
val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
Expand Down