-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-31425][SQL][CORE] UnsafeKVExternalSorter/VariableLengthRowBasedKeyValueBatch should also respect UnsafeAlignedOffset #28195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
eb9e5bc
6943425
0b8ebd3
c70c5db
b1be35e
31bd15e
bda5823
5b3adba
0d0f97a
d441e53
bc93b52
0fb6024
c6f39b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,8 @@ | |
| * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should | ||
| * probably be using sorting instead of hashing for better cache locality. | ||
| * | ||
| * The key and values under the hood are stored together, in the following format: | ||
| * The key and values under the hood are stored together, in the following format(in case of | ||
| * uaoSize = 4): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not replace the number below with
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was supposed to do so but it requires more changes in the original content and it is not so clean compares to a concrete number. So I am not sure if we should do. Do you have strong prefer to change in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current way doesn't make it clear whether each number
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. I'll update later. |
||
| * Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4 | ||
| * Bytes 4 to 8: len(k) | ||
| * Bytes 8 to 8 + len(k): key data | ||
|
|
@@ -706,7 +707,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff | |
| // Here, we'll copy the data into our data pages. Because we only store a relative offset from | ||
| // the key address instead of storing the absolute address of the value, the key and value | ||
| // must be stored in the same memory page. | ||
| // (8 byte key length) (key) (value) (8 byte pointer to next value) | ||
| // (total length) (key length) (key) (value) (8 byte pointer to next value) | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| final long recordLength = (2L * uaoSize) + klen + vlen + 8; | ||
| if (currentPage == null || currentPage.size() - pageCursor < recordLength) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,12 @@ | |
| import org.apache.spark.memory.TaskMemoryManager; | ||
| import org.apache.spark.sql.types.*; | ||
| import org.apache.spark.unsafe.Platform; | ||
| import org.apache.spark.unsafe.UnsafeAlignedOffset; | ||
|
|
||
| /** | ||
| * An implementation of `RowBasedKeyValueBatch` in which key-value records have variable lengths. | ||
| * | ||
| * The format for each record looks like this: | ||
| * The format for each record looks like this (in case of uaoSize = 4): | ||
| * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen] | ||
| * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen] | ||
| * [8 bytes pointer to next] | ||
|
|
@@ -41,18 +42,20 @@ public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueB | |
| @Override | ||
| public UnsafeRow appendRow(Object kbase, long koff, int klen, | ||
| Object vbase, long voff, int vlen) { | ||
| final long recordLength = 8L + klen + vlen + 8; | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| int doubleUaoSize = 2 * uaoSize; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| final long recordLength = doubleUaoSize + klen + vlen + 8L; | ||
| // if run out of max supported rows or page size, return null | ||
| if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) { | ||
| return null; | ||
| } | ||
|
|
||
| long offset = page.getBaseOffset() + pageCursor; | ||
| final long recordOffset = offset; | ||
| Platform.putInt(base, offset, klen + vlen + 4); | ||
| Platform.putInt(base, offset + 4, klen); | ||
| UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize); | ||
| UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen); | ||
|
|
||
| offset += 8; | ||
| offset += doubleUaoSize; | ||
| Platform.copyMemory(kbase, koff, base, offset, klen); | ||
| offset += klen; | ||
| Platform.copyMemory(vbase, voff, base, offset, vlen); | ||
|
|
@@ -61,11 +64,11 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen, | |
|
|
||
| pageCursor += recordLength; | ||
|
|
||
| keyOffsets[numRows] = recordOffset + 8; | ||
| keyOffsets[numRows] = recordOffset + doubleUaoSize; | ||
|
|
||
| keyRowId = numRows; | ||
| keyRow.pointTo(base, recordOffset + 8, klen); | ||
| valueRow.pointTo(base, recordOffset + 8 + klen, vlen); | ||
| keyRow.pointTo(base, recordOffset + doubleUaoSize, klen); | ||
| valueRow.pointTo(base, recordOffset + doubleUaoSize + klen, vlen); | ||
| numRows++; | ||
| return valueRow; | ||
| } | ||
|
|
@@ -79,7 +82,7 @@ public UnsafeRow getKeyRow(int rowId) { | |
| assert(rowId < numRows); | ||
| if (keyRowId != rowId) { // if keyRowId == rowId, desired keyRow is already cached | ||
| long offset = keyOffsets[rowId]; | ||
| int klen = Platform.getInt(base, offset - 4); | ||
| int klen = UnsafeAlignedOffset.getSize(base, offset - UnsafeAlignedOffset.getUaoSize()); | ||
| keyRow.pointTo(base, offset, klen); | ||
| // set keyRowId so we can check if desired row is cached | ||
| keyRowId = rowId; | ||
|
|
@@ -99,9 +102,10 @@ public UnsafeRow getValueFromKey(int rowId) { | |
| getKeyRow(rowId); | ||
| } | ||
| assert(rowId >= 0); | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| long offset = keyRow.getBaseOffset(); | ||
| int klen = keyRow.getSizeInBytes(); | ||
| int vlen = Platform.getInt(base, offset - 8) - klen - 4; | ||
| int vlen = UnsafeAlignedOffset.getSize(base, offset - uaoSize * 2) - klen - uaoSize; | ||
| valueRow.pointTo(base, offset + klen, vlen); | ||
| return valueRow; | ||
| } | ||
|
|
@@ -141,14 +145,16 @@ public boolean next() { | |
| return false; | ||
| } | ||
|
|
||
| totalLength = Platform.getInt(base, offsetInPage) - 4; | ||
| currentklen = Platform.getInt(base, offsetInPage + 4); | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| int doubleUaoSize = 2 * uaoSize; | ||
| totalLength = UnsafeAlignedOffset.getSize(base, offsetInPage) - uaoSize; | ||
| currentklen = UnsafeAlignedOffset.getSize(base, offsetInPage + uaoSize); | ||
| currentvlen = totalLength - currentklen; | ||
|
|
||
| key.pointTo(base, offsetInPage + 8, currentklen); | ||
| value.pointTo(base, offsetInPage + 8 + currentklen, currentvlen); | ||
| key.pointTo(base, offsetInPage + doubleUaoSize, currentklen); | ||
| value.pointTo(base, offsetInPage + doubleUaoSize + currentklen, currentvlen); | ||
|
|
||
| offsetInPage += 8 + totalLength + 8; | ||
| offsetInPage += doubleUaoSize + totalLength + 8; | ||
| recordsInPage -= 1; | ||
| return true; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |
| import org.apache.spark.storage.BlockManager; | ||
| import org.apache.spark.unsafe.KVIterator; | ||
| import org.apache.spark.unsafe.Platform; | ||
| import org.apache.spark.unsafe.UnsafeAlignedOffset; | ||
| import org.apache.spark.unsafe.array.LongArray; | ||
| import org.apache.spark.unsafe.map.BytesToBytesMap; | ||
| import org.apache.spark.unsafe.memory.MemoryBlock; | ||
|
|
@@ -141,9 +142,10 @@ public UnsafeKVExternalSorter( | |
|
|
||
| // Get encoded memory address | ||
| // baseObject + baseOffset point to the beginning of the key data in the map, but that | ||
| // the KV-pair's length data is stored in the word immediately before that address | ||
| // the KV-pair's length data is stored at 2 * uaoSize bytes immediately before that address | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry I don't get it here why the address is related to uaoSize?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The record format is: And we now get keyOffset, so we need back walk for 2 usao sizes to get the address of the record.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so you mean we need to add back 2 * uaoSize to cover the space of storing total length and key length? If that's the case then it makes sense.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. |
||
| MemoryBlock page = loc.getMemoryPage(); | ||
| long address = taskMemoryManager.encodePageNumberAndOffset(page, baseOffset - 8); | ||
| long address = taskMemoryManager.encodePageNumberAndOffset(page, | ||
| baseOffset - 2 * UnsafeAlignedOffset.getUaoSize()); | ||
|
|
||
| // Compute prefix | ||
| row.pointTo(baseObject, baseOffset, loc.getKeyLength()); | ||
|
|
@@ -262,10 +264,11 @@ public int compare( | |
| Object baseObj2, | ||
| long baseOff2, | ||
| int baseLen2) { | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| // Note that since ordering doesn't need the total length of the record, we just pass 0 | ||
| // into the row. | ||
| row1.pointTo(baseObj1, baseOff1 + 4, 0); | ||
| row2.pointTo(baseObj2, baseOff2 + 4, 0); | ||
| row1.pointTo(baseObj1, baseOff1 + uaoSize, 0); | ||
| row2.pointTo(baseObj2, baseOff2 + uaoSize, 0); | ||
| return ordering.compare(row1, row2); | ||
| } | ||
| } | ||
|
|
@@ -289,11 +292,12 @@ public boolean next() throws IOException { | |
| long recordOffset = underlying.getBaseOffset(); | ||
| int recordLen = underlying.getRecordLength(); | ||
|
|
||
| // Note that recordLen = keyLen + valueLen + 4 bytes (for the keyLen itself) | ||
| // Note that recordLen = keyLen + valueLen + uaoSize (for the keyLen itself) | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| int keyLen = Platform.getInt(baseObj, recordOffset); | ||
| int valueLen = recordLen - keyLen - 4; | ||
| key.pointTo(baseObj, recordOffset + 4, keyLen); | ||
| value.pointTo(baseObj, recordOffset + 4 + keyLen, valueLen); | ||
| int valueLen = recordLen - keyLen - uaoSize; | ||
| key.pointTo(baseObj, recordOffset + uaoSize, keyLen); | ||
| value.pointTo(baseObj, recordOffset + uaoSize + keyLen, valueLen); | ||
|
|
||
| return true; | ||
| } else { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton | |
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.test.SQLTestUtils | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.UnsafeAlignedOffset | ||
|
|
||
|
|
||
| class ScalaAggregateFunction(schema: StructType) extends UserDefinedAggregateFunction { | ||
|
|
@@ -1055,30 +1056,35 @@ class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySu | |
| Seq("true", "false").foreach { enableTwoLevelMaps => | ||
| withSQLConf(SQLConf.ENABLE_TWOLEVEL_AGG_MAP.key -> | ||
| enableTwoLevelMaps) { | ||
| (1 to 3).foreach { fallbackStartsAt => | ||
| withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> | ||
| s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") { | ||
| // Create a new df to make sure its physical operator picks up | ||
| // spark.sql.TungstenAggregate.testFallbackStartsAt. | ||
| // todo: remove it? | ||
| val newActual = Dataset.ofRows(spark, actual.logicalPlan) | ||
|
|
||
| QueryTest.getErrorMessageInCheckAnswer(newActual, expectedAnswer) match { | ||
| case Some(errorMessage) => | ||
| val newErrorMessage = | ||
| s""" | ||
| |The following aggregation query failed when using HashAggregate with | ||
| |controlled fallback (it falls back to bytes to bytes map once it has processed | ||
| |${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has | ||
| |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution} | ||
| | | ||
| |$errorMessage | ||
| """.stripMargin | ||
|
|
||
| fail(newErrorMessage) | ||
| case None => // Success | ||
| Seq(4, 8).foreach { uaoSize => | ||
| UnsafeAlignedOffset.setUaoSize(uaoSize) | ||
| (1 to 3).foreach { fallbackStartsAt => | ||
| withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> | ||
| s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") { | ||
| // Create a new df to make sure its physical operator picks up | ||
| // spark.sql.TungstenAggregate.testFallbackStartsAt. | ||
| // todo: remove it? | ||
| val newActual = Dataset.ofRows(spark, actual.logicalPlan) | ||
|
|
||
| QueryTest.getErrorMessageInCheckAnswer(newActual, expectedAnswer) match { | ||
| case Some(errorMessage) => | ||
| val newErrorMessage = | ||
| s""" | ||
| |The following aggregation query failed when using HashAggregate with | ||
| |controlled fallback (it falls back to bytes to bytes map once it has | ||
| |processed ${fallbackStartsAt - 1} input rows and to sort-based aggregation | ||
| |once it has processed $fallbackStartsAt input rows). | ||
| |The query is ${actual.queryExecution} | ||
| |$errorMessage | ||
| """.stripMargin | ||
|
|
||
| fail(newErrorMessage) | ||
| case None => // Success | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not confident, but do we need to call
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, yeah. I think we have to reset it. |
||
| } | ||
| } | ||
| // reset static uaoSize to avoid affect other tests | ||
| UnsafeAlignedOffset.setUaoSize(0) | ||
| } | ||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can use
Utils.isTestinginsteadThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other tests might not set UAO size manually, then we'll get 0 in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should handle the logic of setting/reverting the size in the test code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then, we need to figure out all the test cases where used
UnsafeAlignedOffset, right?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to do that? The default value of TEST_UAO_SIZE should be the same as UAO_SIZE, only if you want to test other values then you need to change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I get your point. Let me update it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm...unfortunately,
unsafeproject doesn't depend oncore. So we can not access theUtils.