Skip to content

Commit 602b9ea

Browse files
ankurdaverxin
authored andcommitted
Rebuild routing table after Graph.reverse
GraphImpl.reverse used to reverse edges in each partition of the edge RDD but preserve the routing table and replicated vertex view, since reversing should not affect partitioning. However, the old routing table would then have incorrect information for srcAttrOnly and dstAttrOnly. These RDDs should be switched. A simple fix is for Graph.reverse to rebuild the routing table and replicated vertex view. Thanks to Bogdan Ghidireac for reporting this issue on the [mailing list](http://apache-spark-user-list.1001560.n3.nabble.com/graph-reverse-amp-Pregel-API-td4338.html). Author: Ankur Dave <[email protected]> Closes #431 from ankurdave/fix-reverse-bug and squashes the following commits: 75d63cb [Ankur Dave] Rebuild routing table after Graph.reverse (cherry picked from commit 235a47c) Signed-off-by: Reynold Xin <[email protected]>
1 parent 87a7c4f commit 602b9ea

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
102102

103103
override def reverse: Graph[VD, ED] = {
104104
val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
105-
new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
105+
GraphImpl(vertices, newETable)
106106
}
107107

108108
override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = {

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
172172
}
173173
}
174174

175+
test("reverse with join elimination") {
176+
withSpark { sc =>
177+
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2)))
178+
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
179+
val graph = Graph(vertices, edges).reverse
180+
val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, et.srcAttr)), _ + _)
181+
assert(result.collect.toSet === Set((1L, 2)))
182+
}
183+
}
184+
175185
test("subgraph") {
176186
withSpark { sc =>
177187
// Create a star graph of 10 veritces.

0 commit comments

Comments
 (0)