Skip to content

Commit 87e721b

Browse files
committed
Renaming and comments
1 parent d3cc310 commit 87e721b

File tree

4 files changed

+64
-30
lines changed

4 files changed

+64
-30
lines changed

core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSortDataFormat.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,35 +17,34 @@
1717

1818
package org.apache.spark.unsafe.sort;
1919

20-
import static org.apache.spark.unsafe.sort.UnsafeSorter.KeyPointerAndPrefix;
20+
import static org.apache.spark.unsafe.sort.UnsafeSorter.RecordPointerAndKeyPrefix;
2121
import org.apache.spark.util.collection.SortDataFormat;
2222

2323
/**
24-
* TODO: finish writing this description
24+
* Supports sorting an array of (record pointer, key prefix) pairs. Used in {@link UnsafeSorter}.
2525
*
2626
* Within each long[] buffer, position {@code 2 * i} holds a pointer pointer to the record at
2727
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
2828
*/
29-
final class UnsafeSortDataFormat
30-
extends SortDataFormat<KeyPointerAndPrefix, long[]> {
29+
final class UnsafeSortDataFormat extends SortDataFormat<RecordPointerAndKeyPrefix, long[]> {
3130

3231
public static final UnsafeSortDataFormat INSTANCE = new UnsafeSortDataFormat();
3332

3433
private UnsafeSortDataFormat() { }
3534

3635
@Override
37-
public KeyPointerAndPrefix getKey(long[] data, int pos) {
36+
public RecordPointerAndKeyPrefix getKey(long[] data, int pos) {
3837
// Since we re-use keys, this method shouldn't be called.
3938
throw new UnsupportedOperationException();
4039
}
4140

4241
@Override
43-
public KeyPointerAndPrefix newKey() {
44-
return new KeyPointerAndPrefix();
42+
public RecordPointerAndKeyPrefix newKey() {
43+
return new RecordPointerAndKeyPrefix();
4544
}
4645

4746
@Override
48-
public KeyPointerAndPrefix getKey(long[] data, int pos, KeyPointerAndPrefix reuse) {
47+
public RecordPointerAndKeyPrefix getKey(long[] data, int pos, RecordPointerAndKeyPrefix reuse) {
4948
reuse.recordPointer = data[pos * 2];
5049
reuse.keyPrefix = data[pos * 2 + 1];
5150
return reuse;

core/src/main/java/org/apache/spark/unsafe/sort/UnsafeSorter.java

Lines changed: 50 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,16 @@
2323
import org.apache.spark.util.collection.Sorter;
2424
import org.apache.spark.unsafe.memory.TaskMemoryManager;
2525

26+
/**
27+
* Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records
28+
* alongside a user-defined prefix of the record's sorting key. When the underlying sort algorithm
29+
* compares records, it will first compare the stored key prefixes; if the prefixes are not equal,
30+
* then we do not need to traverse the record pointers to compare the actual records. Avoiding these
31+
* random memory accesses improves cache hit rates.
32+
*/
2633
public final class UnsafeSorter {
2734

28-
public static final class KeyPointerAndPrefix {
35+
public static final class RecordPointerAndKeyPrefix {
2936
/**
3037
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
3138
* description of how these addresses are encoded.
@@ -37,6 +44,7 @@ public static final class KeyPointerAndPrefix {
3744
*/
3845
public long keyPrefix;
3946

47+
// TODO: this was a carryover from test code; may want to remove this
4048
@Override
4149
public int hashCode() {
4250
throw new UnsupportedOperationException();
@@ -48,39 +56,56 @@ public boolean equals(Object obj) {
4856
}
4957
}
5058

59+
/**
60+
* Compares records for ordering. In cases where the entire sorting key can fit in the 8-byte
61+
* prefix, this may simply return 0.
62+
*/
5163
public static abstract class RecordComparator {
64+
/**
65+
* Compare two records for order.
66+
*
67+
* @return a negative integer, zero, or a positive integer as the first record is less than,
68+
* equal to, or greater than the second.
69+
*/
5270
public abstract int compare(
5371
Object leftBaseObject,
5472
long leftBaseOffset,
5573
Object rightBaseObject,
5674
long rightBaseOffset);
5775
}
5876

77+
/**
78+
* Given a pointer to a record, computes a prefix.
79+
*/
5980
public static abstract class PrefixComputer {
6081
public abstract long computePrefix(Object baseObject, long baseOffset);
6182
}
6283

6384
/**
64-
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific comparisons,
65-
* such as lexicographic comparison for strings.
85+
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific
86+
* comparisons, such as lexicographic comparison for strings.
6687
*/
6788
public static abstract class PrefixComparator {
6889
public abstract int compare(long prefix1, long prefix2);
6990
}
7091

7192
private final TaskMemoryManager memoryManager;
7293
private final PrefixComputer prefixComputer;
73-
private final Sorter<KeyPointerAndPrefix, long[]> sorter;
74-
private final Comparator<KeyPointerAndPrefix> sortComparator;
94+
private final Sorter<RecordPointerAndKeyPrefix, long[]> sorter;
95+
private final Comparator<RecordPointerAndKeyPrefix> sortComparator;
7596

7697
/**
7798
* Within this buffer, position {@code 2 * i} holds a pointer pointer to the record at
7899
* index {@code i}, while position {@code 2 * i + 1} in the array holds an 8-byte key prefix.
79100
*/
80101
private long[] sortBuffer;
81102

103+
/**
104+
* The position in the sort buffer where new records can be inserted.
105+
*/
82106
private int sortBufferInsertPosition = 0;
83107

108+
84109
private void expandSortBuffer(int newSize) {
85110
assert (newSize > sortBuffer.length);
86111
final long[] oldBuffer = sortBuffer;
@@ -99,24 +124,31 @@ public UnsafeSorter(
99124
this.memoryManager = memoryManager;
100125
this.prefixComputer = prefixComputer;
101126
this.sorter =
102-
new Sorter<KeyPointerAndPrefix, long[]>(UnsafeSortDataFormat.INSTANCE);
103-
this.sortComparator = new Comparator<KeyPointerAndPrefix>() {
127+
new Sorter<RecordPointerAndKeyPrefix, long[]>(UnsafeSortDataFormat.INSTANCE);
128+
this.sortComparator = new Comparator<RecordPointerAndKeyPrefix>() {
104129
@Override
105-
public int compare(KeyPointerAndPrefix left, KeyPointerAndPrefix right) {
106-
if (left.keyPrefix == right.keyPrefix) {
130+
public int compare(RecordPointerAndKeyPrefix left, RecordPointerAndKeyPrefix right) {
131+
final int prefixComparisonResult =
132+
prefixComparator.compare(left.keyPrefix, right.keyPrefix);
133+
if (prefixComparisonResult == 0) {
107134
final Object leftBaseObject = memoryManager.getPage(left.recordPointer);
108135
final long leftBaseOffset = memoryManager.getOffsetInPage(left.recordPointer);
109136
final Object rightBaseObject = memoryManager.getPage(right.recordPointer);
110137
final long rightBaseOffset = memoryManager.getOffsetInPage(right.recordPointer);
111138
return recordComparator.compare(
112139
leftBaseObject, leftBaseOffset, rightBaseObject, rightBaseOffset);
113140
} else {
114-
return prefixComparator.compare(left.keyPrefix, right.keyPrefix);
141+
return prefixComparisonResult;
115142
}
116143
}
117144
};
118145
}
119146

147+
/**
148+
* Insert a record into the sort buffer.
149+
*
150+
* @param objectAddress pointer to a record in a data page, encoded by {@link TaskMemoryManager}.
151+
*/
120152
public void insertRecord(long objectAddress) {
121153
if (sortBufferInsertPosition + 2 == sortBuffer.length) {
122154
expandSortBuffer(sortBuffer.length * 2);
@@ -130,19 +162,23 @@ public void insertRecord(long objectAddress) {
130162
sortBufferInsertPosition++;
131163
}
132164

133-
public Iterator<KeyPointerAndPrefix> getSortedIterator() {
165+
/**
166+
* Return an iterator over record pointers in sorted order. For efficiency, all calls to
167+
* {@code next()} will return the same mutable object.
168+
*/
169+
public Iterator<RecordPointerAndKeyPrefix> getSortedIterator() {
134170
sorter.sort(sortBuffer, 0, sortBufferInsertPosition / 2, sortComparator);
135-
return new Iterator<KeyPointerAndPrefix>() {
171+
return new Iterator<RecordPointerAndKeyPrefix>() {
136172
private int position = 0;
137-
private final KeyPointerAndPrefix keyPointerAndPrefix = new KeyPointerAndPrefix();
173+
private final RecordPointerAndKeyPrefix keyPointerAndPrefix = new RecordPointerAndKeyPrefix();
138174

139175
@Override
140176
public boolean hasNext() {
141177
return position < sortBufferInsertPosition;
142178
}
143179

144180
@Override
145-
public KeyPointerAndPrefix next() {
181+
public RecordPointerAndKeyPrefix next() {
146182
keyPointerAndPrefix.recordPointer = sortBuffer[position];
147183
keyPointerAndPrefix.keyPrefix = sortBuffer[position + 1];
148184
position += 2;
@@ -155,5 +191,4 @@ public void remove() {
155191
}
156192
};
157193
}
158-
159194
}

core/src/main/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleManager.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.spark.storage.{BlockObjectWriter, ShuffleBlockId}
3232
import org.apache.spark.unsafe.PlatformDependent
3333
import org.apache.spark.unsafe.memory.{MemoryBlock, TaskMemoryManager}
3434
import org.apache.spark.unsafe.sort.UnsafeSorter
35-
import org.apache.spark.unsafe.sort.UnsafeSorter.{KeyPointerAndPrefix, PrefixComparator, PrefixComputer, RecordComparator}
35+
import org.apache.spark.unsafe.sort.UnsafeSorter.{RecordPointerAndKeyPrefix, PrefixComparator, PrefixComputer, RecordComparator}
3636

3737
private class UnsafeShuffleHandle[K, V](
3838
shuffleId: Int,
@@ -122,7 +122,7 @@ private class UnsafeShuffleWriter[K, V](
122122
private[this] val serializer = Serializer.getSerializer(dep.serializer).newInstance()
123123

124124
private def sortRecords(
125-
records: Iterator[_ <: Product2[K, V]]): java.util.Iterator[KeyPointerAndPrefix] = {
125+
records: Iterator[_ <: Product2[K, V]]): java.util.Iterator[RecordPointerAndKeyPrefix] = {
126126
val sorter = new UnsafeSorter(
127127
context.taskMemoryManager(),
128128
DummyRecordComparator,
@@ -194,7 +194,7 @@ private class UnsafeShuffleWriter[K, V](
194194
}
195195

196196
private def writeSortedRecordsToFile(
197-
sortedRecords: java.util.Iterator[KeyPointerAndPrefix]): Array[Long] = {
197+
sortedRecords: java.util.Iterator[RecordPointerAndKeyPrefix]): Array[Long] = {
198198
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
199199
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
200200
val partitionLengths = new Array[Long](partitioner.numPartitions)
@@ -223,7 +223,7 @@ private class UnsafeShuffleWriter[K, V](
223223
}
224224

225225
while (sortedRecords.hasNext) {
226-
val keyPointerAndPrefix: KeyPointerAndPrefix = sortedRecords.next()
226+
val keyPointerAndPrefix: RecordPointerAndKeyPrefix = sortedRecords.next()
227227
val partition = keyPointerAndPrefix.keyPrefix.toInt
228228
if (partition != currentPartition) {
229229
switchToPartition(partition)

core/src/test/java/org/apache/spark/unsafe/sort/UnsafeSorterSuite.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void testSortingEmptyInput() {
5252
mock(UnsafeSorter.PrefixComputer.class),
5353
mock(UnsafeSorter.PrefixComparator.class),
5454
100);
55-
final Iterator<UnsafeSorter.KeyPointerAndPrefix> iter = sorter.getSortedIterator();
55+
final Iterator<UnsafeSorter.RecordPointerAndKeyPrefix> iter = sorter.getSortedIterator();
5656
assert(!iter.hasNext());
5757
}
5858

@@ -130,12 +130,12 @@ public int compare(long prefix1, long prefix2) {
130130
sorter.insertRecord(address);
131131
position += 8 + recordLength;
132132
}
133-
final Iterator<UnsafeSorter.KeyPointerAndPrefix> iter = sorter.getSortedIterator();
133+
final Iterator<UnsafeSorter.RecordPointerAndKeyPrefix> iter = sorter.getSortedIterator();
134134
int iterLength = 0;
135135
long prevPrefix = -1;
136136
Arrays.sort(dataToSort);
137137
while (iter.hasNext()) {
138-
final UnsafeSorter.KeyPointerAndPrefix pointerAndPrefix = iter.next();
138+
final UnsafeSorter.RecordPointerAndKeyPrefix pointerAndPrefix = iter.next();
139139
final Object recordBaseObject = memoryManager.getPage(pointerAndPrefix.recordPointer);
140140
final long recordBaseOffset = memoryManager.getOffsetInPage(pointerAndPrefix.recordPointer);
141141
final String str = getStringFromDataPage(recordBaseObject, recordBaseOffset);

0 commit comments

Comments
 (0)