Skip to content

Commit 5d6109d

Browse files
committed
Fix inconsistent handling / encoding of record lengths.
1 parent 87b6ed9 commit 5d6109d

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ public boolean hasNext() {
156156

157157
@Override
158158
public void loadNext() {
159+
// This pointer points to a 4-byte record length, followed by the record's bytes
159160
final long recordPointer = sortBuffer[position];
160161
baseObject = memoryManager.getPage(recordPointer);
161162
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length

core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ private void writeIntToBuffer(int v, int offset) throws IOException {
9191
writeBuffer[offset + 3] = (byte)(v >>> 0);
9292
}
9393

94+
/**
95+
* Write a record to a spill file.
96+
*
97+
* @param baseObject the base object / memory page containing the record
98+
* @param baseOffset the base offset which points directly to the record data.
99+
* @param recordLength the length of the record.
100+
* @param keyPrefix a sort key prefix
101+
*/
94102
public void write(
95103
Object baseObject,
96104
long baseOffset,
@@ -105,8 +113,8 @@ public void write(
105113
writeIntToBuffer(recordLength, 0);
106114
writeLongToBuffer(keyPrefix, 4);
107115
int dataRemaining = recordLength;
108-
int freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE - 4 - 8;
109-
long recordReadPosition = baseOffset + 4; // skip over record length
116+
int freeSpaceInWriteBuffer = DISK_WRITE_BUFFER_SIZE - 4 - 8; // space used by prefix + len
117+
long recordReadPosition = baseOffset;
110118
while (dataRemaining > 0) {
111119
final int toTransfer = Math.min(freeSpaceInWriteBuffer, dataRemaining);
112120
PlatformDependent.copyMemory(

core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,13 @@
3434

3535
public class UnsafeInMemorySorterSuite {
3636

37-
private static String getStringFromDataPage(Object baseObject, long baseOffset) {
38-
final int strLength = PlatformDependent.UNSAFE.getInt(baseObject, baseOffset);
39-
final byte[] strBytes = new byte[strLength];
37+
private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) {
38+
final byte[] strBytes = new byte[length];
4039
PlatformDependent.copyMemory(
4140
baseObject,
42-
baseOffset + 4,
41+
baseOffset,
4342
strBytes,
44-
PlatformDependent.BYTE_ARRAY_OFFSET, strLength);
43+
PlatformDependent.BYTE_ARRAY_OFFSET, length);
4544
return new String(strBytes);
4645
}
4746

@@ -116,7 +115,7 @@ public int compare(long prefix1, long prefix2) {
116115
// position now points to the start of a record (which holds its length).
117116
final int recordLength = PlatformDependent.UNSAFE.getInt(baseObject, position);
118117
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);
119-
final String str = getStringFromDataPage(baseObject, position);
118+
final String str = getStringFromDataPage(baseObject, position + 4, recordLength);
120119
final int partitionId = hashPartitioner.getPartition(str);
121120
sorter.insertRecord(address, partitionId);
122121
position += 4 + recordLength;
@@ -127,9 +126,8 @@ public int compare(long prefix1, long prefix2) {
127126
Arrays.sort(dataToSort);
128127
while (iter.hasNext()) {
129128
iter.loadNext();
130-
// TODO: the logic for how we manipulate record length offsets here is confusing; clean
131-
// this up and clarify it in comments.
132-
final String str = getStringFromDataPage(iter.getBaseObject(), iter.getBaseOffset() - 4);
129+
final String str =
130+
getStringFromDataPage(iter.getBaseObject(), iter.getBaseOffset(), iter.getRecordLength());
133131
final long keyPrefix = iter.getKeyPrefix();
134132
assertThat(str, isIn(Arrays.asList(dataToSort)));
135133
assertThat(keyPrefix, greaterThanOrEqualTo(prevPrefix));

0 commit comments

Comments
 (0)