diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java index a88a315bf479..df52f9c2d549 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/FixedLengthRowBasedKeyValueBatch.java @@ -62,7 +62,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, keyRowId = numRows; keyRow.pointTo(base, recordOffset, klen); - valueRow.pointTo(base, recordOffset + klen, vlen + 4); + valueRow.pointTo(base, recordOffset + klen, vlen); numRows++; return valueRow; } @@ -95,7 +95,7 @@ protected UnsafeRow getValueFromKey(int rowId) { getKeyRow(rowId); } assert(rowId >= 0); - valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen + 4); + valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen); return valueRow; } @@ -131,7 +131,7 @@ public boolean next() { } key.pointTo(base, offsetInPage, klen); - value.pointTo(base, offsetInPage + klen, vlen + 4); + value.pointTo(base, offsetInPage + klen, vlen); offsetInPage += recordLength; recordsInPage -= 1; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 86de90984ca0..655b1d0d4b41 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -167,6 +167,7 @@ public UnsafeRow() {} */ public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) { assert numFields >= 0 : "numFields (" + numFields + ") should >= 0"; + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.baseObject = baseObject; this.baseOffset = baseOffset; this.sizeInBytes = sizeInBytes; @@ -183,6 +184,7 @@ public void pointTo(byte[] buf, int sizeInBytes) { } public void setTotalSize(int sizeInBytes) { + assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8"; this.sizeInBytes = sizeInBytes; } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java index ea4f984be24e..905e6820ce6e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/VariableLengthRowBasedKeyValueBatch.java @@ -65,7 +65,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, keyRowId = numRows; keyRow.pointTo(base, recordOffset + 8, klen); - valueRow.pointTo(base, recordOffset + 8 + klen, vlen + 4); + valueRow.pointTo(base, recordOffset + 8 + klen, vlen); numRows++; return valueRow; } @@ -102,7 +102,7 @@ public UnsafeRow getValueFromKey(int rowId) { long offset = keyRow.getBaseOffset(); int klen = keyRow.getSizeInBytes(); int vlen = Platform.getInt(base, offset - 8) - klen - 4; - valueRow.pointTo(base, offset + klen, vlen + 4); + valueRow.pointTo(base, offset + klen, vlen); return valueRow; } @@ -146,7 +146,7 @@ public boolean next() { currentvlen = totalLength - currentklen; key.pointTo(base, offsetInPage + 8, currentklen); - value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen + 4); + value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen); offsetInPage += 8 + totalLength + 8; recordsInPage -= 1; diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index c29b002a998c..5d1f8136bd96 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -208,9 +208,10 @@ private static final class RowComparator extends RecordComparator { @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { - // TODO: Why are the sizes -1? - row1.pointTo(baseObj1, baseOff1, -1); - row2.pointTo(baseObj2, baseOff2, -1); + // Note that since ordering doesn't need the total length of the record, we just pass 0 + // into the row. + row1.pointTo(baseObj1, baseOff1, 0); + row2.pointTo(baseObj2, baseOff2, 0); return ordering.compare(row1, row2); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index ee5bcfd02c79..d8acf11a9791 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -238,10 +238,10 @@ private static final class KVComparator extends RecordComparator { @Override public int compare(Object baseObj1, long baseOff1, Object baseObj2, long baseOff2) { - // Note that since ordering doesn't need the total length of the record, we just pass -1 + // Note that since ordering doesn't need the total length of the record, we just pass 0 // into the row. - row1.pointTo(baseObj1, baseOff1 + 4, -1); - row2.pointTo(baseObj2, baseOff2 + 4, -1); + row1.pointTo(baseObj1, baseOff1 + 4, 0); + row2.pointTo(baseObj2, baseOff2 + 4, 0); return ordering.compare(row1, row2); } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index bae7a15165e4..5f4161bf28e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -363,7 +363,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueSize) + // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. + // This is a workaround for the following: + // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in + // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data + valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } } @@ -427,7 +431,11 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit val valueRowBuffer = new Array[Byte](valueSize) ByteStreams.readFully(input, valueRowBuffer, 0, valueSize) val valueRow = new UnsafeRow(valueSchema.fields.length) - valueRow.pointTo(valueRowBuffer, valueSize) + // If valueSize in existing file is not multiple of 8, floor it to multiple of 8. + // This is a workaround for the following: + // Prior to Spark 2.3 mistakenly append 4 bytes to the value row in + // `RowBasedKeyValueBatch`, which gets persisted into the checkpoint data + valueRow.pointTo(valueRowBuffer, (valueSize / 8) * 8) map.put(keyRow, valueRow) } }