Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ abstract class RDD[T: ClassTag](
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
var position = (new Random(index)).nextInt(numPartitions)
var position = (new Random()).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
Expand Down
6 changes: 4 additions & 2 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
val partitions = repartitioned.glom().collect()
// assert all elements are present
assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
// assert no bucket is overloaded
// assert no bucket is overloaded or empty
for (partition <- partitions) {
val avg = input.size / finalPartitions
val maxPossible = avg + initialPartitions
assert(partition.length <= maxPossible)
assert(partition.length <= maxPossible)
assert(!partition.isEmpty)
}
}

testSplitPartitions(Array.fill(100)(1), 10, 20)
testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
testSplitPartitions(Array.fill(1000)(1), 250, 128)
}

test("coalesced RDDs") {
Expand Down