File tree Expand file tree Collapse file tree 3 files changed +22
-2
lines changed
core/src/main/java/org/apache/spark/shuffle/unsafe Expand file tree Collapse file tree 3 files changed +22
-2
lines changed Original file line number Diff line number Diff line change @@ -37,6 +37,8 @@ final class PackedRecordPointer {
3737
3838 static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27 ; // 128 megabytes
3939
40+ static final int MAXIMUM_PARTITION_ID = 1 << 24 ; // 16777216
41+
4042 /** Bit mask for the lower 40 bits of a long. */
4143 private static final long MASK_LONG_LOWER_40_BITS = 0xFFFFFFFFFFL ;
4244
@@ -62,6 +64,7 @@ final class PackedRecordPointer {
6264 * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
6365 */
6466 public static long packPointer (long recordPointer , int partitionId ) {
67+ assert (partitionId <= MAXIMUM_PARTITION_ID );
6568 // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
6669 // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
6770 final int pageNumber = (int ) ((recordPointer & MASK_LONG_UPPER_13_BITS ) >>> 51 );
Original file line number Diff line number Diff line change 1717
1818package org .apache .spark .shuffle .unsafe ;
1919
20+ import java .io .IOException ;
2021import java .util .Comparator ;
2122
23+ import org .apache .spark .unsafe .memory .MemoryBlock ;
2224import org .apache .spark .util .collection .Sorter ;
2325
2426final class UnsafeShuffleSorter {
@@ -59,8 +61,17 @@ public long getMemoryUsage() {
5961 return sortBuffer .length * 8L ;
6062 }
6163
62- // TODO: clairify assumption that pointer points to record length.
63- public void insertRecord (long recordPointer , int partitionId ) {
64+ /**
65+ * Inserts a record to be sorted.
66+ *
67+ * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
68+ * certain pointer compression techniques used by the sorter, the sort can
69+ * only operate on pointers that point to locations in the first
70+ * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
71+ * @param partitionId the partition id, which must be less than or equal to
72+ * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
73+ */
74+ public void insertRecord (long recordPointer , int partitionId ) throws IOException {
6475 if (!hasSpaceForAnotherRecord ()) {
6576 expandSortBuffer ();
6677 }
Original file line number Diff line number Diff line change @@ -100,6 +100,12 @@ public UnsafeShuffleWriter(
100100 int mapId ,
101101 TaskContext taskContext ,
102102 SparkConf sparkConf ) {
103+ final int numPartitions = handle .dependency ().partitioner ().numPartitions ();
104+ if (numPartitions > PackedRecordPointer .MAXIMUM_PARTITION_ID ) {
105+ throw new IllegalArgumentException (
106+ "UnsafeShuffleWriter can only be used for shuffles with at most " +
107+ PackedRecordPointer .MAXIMUM_PARTITION_ID + " reduce partitions" );
108+ }
103109 this .blockManager = blockManager ;
104110 this .shuffleBlockResolver = shuffleBlockResolver ;
105111 this .memoryManager = memoryManager ;
You can’t perform that action at this time.
0 commit comments