Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.unsafe.sort.RecordComparator;

import java.nio.ByteOrder;

public final class RecordBinaryComparator extends RecordComparator {

private static final boolean LITTLE_ENDIAN =
ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);

@Override
public int compare(
Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) {
Expand All @@ -38,32 +43,39 @@ public int compare(
// check if stars align and we can get both offsets to be aligned
if ((leftOff % 8) == (rightOff % 8)) {
while ((leftOff + i) % 8 != 0 && i < leftLen) {
final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
final int v1 = Platform.getByte(leftObj, leftOff + i);
final int v2 = Platform.getByte(rightObj, rightOff + i);
if (v1 != v2) {
return v1 > v2 ? 1 : -1;
return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1;
}
i += 1;
}
}
// for architectures that support unaligned accesses, chew it up 8 bytes at a time
if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) {
while (i <= leftLen - 8) {
final long v1 = Platform.getLong(leftObj, leftOff + i);
final long v2 = Platform.getLong(rightObj, rightOff + i);
long v1 = Platform.getLong(leftObj, leftOff + i);
long v2 = Platform.getLong(rightObj, rightOff + i);
if (v1 != v2) {
return v1 > v2 ? 1 : -1;
if (LITTLE_ENDIAN) {
// if read as little-endian, we have to reverse bytes so that the long comparison result
// is equivalent to byte-by-byte comparison result.
// See discussion in https://github.com/apache/spark/pull/26548#issuecomment-554645859
v1 = Long.reverseBytes(v1);
v2 = Long.reverseBytes(v2);
}
return Long.compareUnsigned(v1, v2);
}
i += 8;
}
}
// this will finish off the unaligned comparisons, or do the entire aligned comparison
// whichever is needed.
while (i < leftLen) {
final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff;
final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff;
final int v1 = Platform.getByte(leftObj, leftOff + i);
final int v2 = Platform.getByte(rightObj, rightOff + i);
if (v1 != v2) {
return v1 > v2 ? 1 : -1;
return (v1 & 0xff) > (v2 & 0xff) ? 1 : -1;
}
i += 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.util.collection.unsafe.sort.*;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -273,7 +274,7 @@ public void testBinaryComparatorWhenSubtractionIsDivisibleByMaxIntValue() throws
insertRow(row1);
insertRow(row2);

assert(compare(0, 1) < 0);
assert(compare(0, 1) > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, do you mean this is wrong before this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change definitely changes the ordering, as bytes are compared in a different order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, do you mean this is wrong before this PR?

RecordBinaryComparator is used as a local sort comparator in RoundRobinPartition to make sure the order of each records are the same after rerun. But the relative order is not important.

}

@Test
Expand Down Expand Up @@ -321,4 +322,48 @@ public void testBinaryComparatorWhenOnlyTheLastColumnDiffers() throws Exception

assert(compare(0, 1) < 0);
}

@Test
public void testCompareLongsAsLittleEndian() {
long arrayOffset = 12;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases in this PR look problematic: the long arrayOffset = 12; is hard coded with implicit assumption on the underlying JVM's object layout in memory.
The object layout isn't guaranteed to be the same among different JVM, and in fact can be different even with the same JVM on the same machine but just different VM options.

It's preferable to use Platform.LONG_ARRAY_OFFSET as your mental model of where "0-offset" is. You should never write anything into the offset range [0, Platform.LONG_ARRAY_OFFSET[ because there's no guarantee what's there.
e.g. on the HotSpot JVM, that's where the object header lives.

For example, on the HotSpot JVM, the object header size for long[] is:

  • 32-bit: 16 bytes = 4 mark work + 4 klassptr + 4 array length + 4 padding
  • 64-bit with Compressed Class pointer: 16 bytes = 8 mark word + 4 klassptr + 4 array length
  • 64-bit without Compressed Class pointer: 24 bytes = 8 mark work + 8 klassptr + 4 array length + 4 padding.
    12 is always a bad offset to use in this test case on HotSpot, regardless of which mode above it's in -- writing to this offset will corrupt the header of the long[] and if you happen to go into a GC right after the corruption, you'll find the VM doing weird stuff (including crashing with a SIGSEGV).


long[] arr1 = new long[2];
Platform.putLong(arr1, arrayOffset, 0x0100000000000000L);
long[] arr2 = new long[2];
Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000001L);
// leftBaseOffset is not aligned while rightBaseOffset is aligned,
// it will start by comparing long
int result1 = binaryComparator.compare(arr1, arrayOffset, 8, arr2, arrayOffset + 4, 8);

long[] arr3 = new long[2];
Platform.putLong(arr3, arrayOffset, 0x0100000000000000L);
long[] arr4 = new long[2];
Platform.putLong(arr4, arrayOffset, 0x0000000000000001L);
// both left and right offset is not aligned, it will start with byte-by-byte comparison
int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider the uaoSize here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, good question. I don't think it comes up here as it's just messing with an long[] that was allocated normally? rather than data we wrote into an arbitrary location. But I may not be fully aware of the issue you have in mind.


Assert.assertEquals(result1, result2);
}

@Test
public void testCompareLongsAsUnsigned() {
long arrayOffset = 12;

long[] arr1 = new long[2];
Platform.putLong(arr1, arrayOffset + 4, 0xa000000000000000L);
long[] arr2 = new long[2];
Platform.putLong(arr2, arrayOffset + 4, 0x0000000000000000L);
// both leftBaseOffset and rightBaseOffset are aligned, so it will start by comparing long
int result1 = binaryComparator.compare(arr1, arrayOffset + 4, 8, arr2, arrayOffset + 4, 8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think several of these lines are too long (> 100 chars)


long[] arr3 = new long[2];
Platform.putLong(arr3, arrayOffset, 0xa000000000000000L);
long[] arr4 = new long[2];
Platform.putLong(arr4, arrayOffset, 0x0000000000000000L);
// both leftBaseOffset and rightBaseOffset are not aligned,
// so it will start with byte-by-byte comparison
int result2 = binaryComparator.compare(arr3, arrayOffset, 8, arr4, arrayOffset, 8);

Assert.assertEquals(result1, result2);
}
}