Skip to content

Commit e995d1a

Browse files
committed
Introduce MAX_SHUFFLE_OUTPUT_PARTITIONS.
1 parent e58a6b4 commit e995d1a

File tree

3 files changed

+12
-3
lines changed

3 files changed

+12
-3
lines changed

core/src/main/java/org/apache/spark/shuffle/unsafe/PackedRecordPointer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ final class PackedRecordPointer {
3737

3838
static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27; // 128 megabytes
3939

40+
/**
41+
* The maximum partition identifier that can be encoded. Note that partition ids start from 0.
42+
*/
4043
static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1; // 16777215
4144

4245
/** Bit mask for the lower 40 bits of a long. */

core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ private class UnsafeShuffleHandle[K, V](
3333
}
3434

3535
private[spark] object UnsafeShuffleManager extends Logging {
36+
37+
/**
38+
* The maximum number of shuffle output partitions that UnsafeShuffleManager supports.
39+
*/
40+
val MAX_SHUFFLE_OUTPUT_PARTITIONS = PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
41+
3642
/**
3743
* Helper method for determining whether a shuffle should use the optimized unsafe shuffle
3844
* path or whether it should fall back to the original sort-based shuffle.
@@ -50,9 +56,9 @@ private[spark] object UnsafeShuffleManager extends Logging {
5056
} else if (dependency.keyOrdering.isDefined) {
5157
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because a key ordering is defined")
5258
false
53-
} else if (dependency.partitioner.numPartitions > PackedRecordPointer.MAXIMUM_PARTITION_ID) {
59+
} else if (dependency.partitioner.numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS) {
5460
log.debug(s"Can't use UnsafeShuffle for shuffle $shufId because it has more than " +
55-
s"${PackedRecordPointer.MAXIMUM_PARTITION_ID} partitions")
61+
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS partitions")
5662
false
5763
} else {
5864
log.debug(s"Can use UnsafeShuffle for shuffle $shufId")

core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ class UnsafeShuffleManagerSuite extends FunSuite with Matchers {
9393

9494
// We do not support shuffles with more than 16 million output partitions
9595
assert(!canUseUnsafeShuffle(shuffleDep(
96-
partitioner = new HashPartitioner(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1),
96+
partitioner = new HashPartitioner(UnsafeShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS + 1),
9797
serializer = kryo,
9898
keyOrdering = None,
9999
aggregator = None,

0 commit comments

Comments
 (0)