Skip to content

Commit aac1810

Browse files
author
Brennon York
committed
updated to aggregateUsingIndex and added test to ensure that method works properly
1 parent 2af0b88 commit aac1810

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class VertexRDDImpl[VD] private[graphx] (
104104
this.mapVertexPartitions(_.map(f))
105105

106106
override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
107-
diff(VertexRDD(other))
107+
diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
108108
}
109109

110110
override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.graphx
1919

20+
import org.apache.spark.rdd.RDD
2021
import org.scalatest.FunSuite
2122

2223
import org.apache.spark.SparkContext
@@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
5859
}
5960
}
6061

62+
test("diff with RDD[(VertexId, VD)]") {
63+
withSpark { sc =>
64+
val n = 100
65+
val verts = vertices(sc, n).cache()
66+
val flipEvens: RDD[(VertexId, Int)] =
67+
sc.parallelize(0L to 100L)
68+
.map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache()
69+
// diff should keep only the changed vertices
70+
assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet)
71+
}
72+
}
73+
6174
test("leftJoin") {
6275
withSpark { sc =>
6376
val n = 100

0 commit comments

Comments
 (0)