Skip to content

Commit c49d156

Browse files
Brennon Yorksrowen
authored andcommitted
[SPARK-5790][GraphX]: VertexRDD's won't zip properly for diff capability (added tests)
Added tests that maropu [created](https://github.com/maropu/spark/blob/1f64794b2ce33e64f340e383d4e8a60639a7eb4b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala) for vertices with differing partition counts. Wanted to make sure his work got captured /merged as its not in the master branch and I don't believe there's a PR out already for it. Author: Brennon York <[email protected]> Closes #5023 from brennonyork/SPARK-5790 and squashes the following commits: 83bbd29 [Brennon York] added maropu's tests for vertices with differing partition counts
1 parent 127268b commit c49d156

File tree

1 file changed

+37
-1
lines changed

1 file changed

+37
-1
lines changed

graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.graphx
1919

2020
import org.scalatest.FunSuite
2121

22-
import org.apache.spark.SparkContext
22+
import org.apache.spark.{HashPartitioner, SparkContext}
2323
import org.apache.spark.storage.StorageLevel
2424

2525
class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +58,16 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
5858
}
5959
}
6060

61+
test("diff vertices with the non-equal number of partitions") {
62+
withSpark { sc =>
63+
val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0)))
64+
val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => (i.toLong, 1)))
65+
assert(vertexA.partitions.size != vertexB.partitions.size)
66+
val vertexC = vertexA.diff(vertexB)
67+
assert(vertexC.map(_._1).collect.toSet === (8 until 16).toSet)
68+
}
69+
}
70+
6171
test("leftJoin") {
6272
withSpark { sc =>
6373
val n = 100
@@ -73,6 +83,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
7383
}
7484
}
7585

86+
test("leftJoin vertices with the non-equal number of partitions") {
87+
withSpark { sc =>
88+
val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
89+
val vertexB = VertexRDD(
90+
vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3)))
91+
assert(vertexA.partitions.size != vertexB.partitions.size)
92+
val vertexC = vertexA.leftJoin(vertexB) { (vid, old, newOpt) =>
93+
old - newOpt.getOrElse(0)
94+
}
95+
assert(vertexC.filter(v => v._2 != 0).map(_._1).collect.toSet == (1 to 99 by 2).toSet)
96+
}
97+
}
98+
7699
test("innerJoin") {
77100
withSpark { sc =>
78101
val n = 100
@@ -87,6 +110,19 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext {
87110
(0 to n by 2).map(x => (x.toLong, 0)).toSet) }
88111
}
89112

113+
test("innerJoin vertices with the non-equal number of partitions") {
114+
withSpark { sc =>
115+
val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => (i.toLong, 1)))
116+
val vertexB = VertexRDD(
117+
vertexA.filter(v => v._1 % 2 == 0).partitionBy(new HashPartitioner(3)))
118+
assert(vertexA.partitions.size != vertexB.partitions.size)
119+
val vertexC = vertexA.innerJoin(vertexB) { (vid, old, newVal) =>
120+
old - newVal
121+
}
122+
assert(vertexC.filter(v => v._2 == 0).map(_._1).collect.toSet == (0 to 98 by 2).toSet)
123+
}
124+
}
125+
90126
test("aggregateUsingIndex") {
91127
withSpark { sc =>
92128
val n = 100

0 commit comments

Comments
 (0)