Skip to content

Commit 9f603fc

Browse files
Brennon Yorkankurdave
authored andcommitted
[SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing
Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York <[email protected]> Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs
1 parent a777c65 commit 9f603fc

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] (
104104
this.mapVertexPartitions(_.map(f))
105105

106106
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
107+
val otherPartition = other match {
108+
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
109+
other.partitionsRDD
110+
case _ =>
111+
VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD
112+
}
107113
val newPartitionsRDD = partitionsRDD.zipPartitions(
108-
other.partitionsRDD, preservesPartitioning = true
114+
otherPartition, preservesPartitioning = true
109115
) { (thisIter, otherIter) =>
110116
val thisPart = thisIter.next()
111117
val otherPart = otherIter.next()
@@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] (
133139
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
134140
// If the other set is a VertexRDD then we use the much more efficient leftZipJoin
135141
other match {
136-
case other: VertexRDD[_] =>
142+
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
137143
leftZipJoin(other)(f)
138144
case _ =>
139145
this.withPartitionsRDD[VD3](
@@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] (
162168
// Test if the other vertex is a VertexRDD to choose the optimal join strategy.
163169
// If the other set is a VertexRDD then we use the much more efficient innerZipJoin
164170
other match {
165-
case other: VertexRDD[_] =>
171+
case other: VertexRDD[_] if this.partitioner == other.partitioner =>
166172
innerZipJoin(other)(f)
167173
case _ =>
168174
this.withPartitionsRDD(

0 commit comments

Comments
 (0)