Skip to content

Commit b4ea3d9

Browse files
ankurdaverxin
authored andcommitted
SPARK-1329: Create pid2vid with correct number of partitions
Each vertex partition is co-located with a pid2vid array created in RoutingTable.scala. This array maps edge partition IDs to the list of vertices in the current vertex partition that are mentioned by edges in that partition. Therefore the pid2vid array should have one entry per edge partition. GraphX currently creates one entry per *vertex* partition, which is a bug that leads to an ArrayIndexOutOfBoundsException when there are more edge partitions than vertex partitions. This commit fixes the bug and adds a test for this case. Resolves SPARK-1329. Thanks to Daniel Darabos for reporting this bug. Author: Ankur Dave <[email protected]> Closes #368 from ankurdave/fix-pid2vid-size and squashes the following commits: 5a5c52a [Ankur Dave] SPARK-1329: Create pid2vid with correct number of partitions (cherry picked from commit 17d3234) Signed-off-by: Reynold Xin <[email protected]>
1 parent 602b9ea commit b4ea3d9

File tree

2 files changed

+14
-2
lines changed

2 files changed

+14
-2
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) {
6969
vSet.iterator.map { vid => (vid, pid) }
7070
}
7171

72-
val numPartitions = vertices.partitions.size
72+
val numEdgePartitions = edges.partitions.size
7373
vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter =>
74-
val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId])
74+
val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId])
7575
for ((vid, pid) <- iter) {
7676
pid2vid(pid) += vid
7777
}

graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,4 +297,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
297297
}
298298
}
299299

300+
test("more edge partitions than vertex partitions") {
301+
withSpark { sc =>
302+
val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1)
303+
val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2)
304+
val graph = Graph(verts, edges)
305+
val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr))
306+
.collect.toSet
307+
assert(triplets ===
308+
Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a")))
309+
}
310+
}
311+
300312
}

0 commit comments

Comments
 (0)