-
Notifications
You must be signed in to change notification settings - Fork 29.3k
[SPARK-31425][SQL][CORE] UnsafeKVExternalSorter/VariableLengthRowBasedKeyValueBatch should also respect UnsafeAlignedOffset #28195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
eb9e5bc
6943425
0b8ebd3
c70c5db
b1be35e
31bd15e
bda5823
5b3adba
0d0f97a
d441e53
bc93b52
0fb6024
c6f39b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,7 @@ | |
| * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should | ||
| * probably be using sorting instead of hashing for better cache locality. | ||
| * | ||
| * The key and values under the hood are stored together, in the following format: | ||
| * The key and values under the hood are stored together, in the following format(uaoSize = 4): | ||
| * Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4 | ||
| * Bytes 4 to 8: len(k) | ||
| * Bytes 8 to 8 + len(k): key data | ||
|
|
@@ -706,7 +706,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff | |
| // Here, we'll copy the data into our data pages. Because we only store a relative offset from | ||
| // the key address instead of storing the absolute address of the value, the key and value | ||
| // must be stored in the same memory page. | ||
| // (8 byte key length) (key) (value) (8 byte pointer to next value) | ||
| // (uao: klen + vlen + uaolen) (uao: klen) (key) (value) (8 byte pointer to next value) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what does
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, if
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about using the similar description at lines 56-61?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be a little bit redundant? Maybe just,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM |
||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| final long recordLength = (2L * uaoSize) + klen + vlen + 8; | ||
| if (currentPage == null || currentPage.size() - pageCursor < recordLength) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ | |
| import org.apache.spark.storage.BlockManager; | ||
| import org.apache.spark.unsafe.KVIterator; | ||
| import org.apache.spark.unsafe.Platform; | ||
| import org.apache.spark.unsafe.UnsafeAlignedOffset; | ||
| import org.apache.spark.unsafe.array.LongArray; | ||
| import org.apache.spark.unsafe.map.BytesToBytesMap; | ||
| import org.apache.spark.unsafe.memory.MemoryBlock; | ||
|
|
@@ -141,9 +142,10 @@ public UnsafeKVExternalSorter( | |
|
|
||
| // Get encoded memory address | ||
| // baseObject + baseOffset point to the beginning of the key data in the map, but that | ||
| // the KV-pair's length data is stored in the word immediately before that address | ||
| // the KV-pair's length data is stored at 2 * uaoSize bytes immediately before that address | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sorry I don't get it here why the address is related to uaoSize?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The record format is: And we now get keyOffset, so we need back walk for 2 usao sizes to get the address of the record.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so you mean we need to add back 2 * uaoSize to cover the space of storing total length and key length? If that's the case then it makes sense.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. |
||
| MemoryBlock page = loc.getMemoryPage(); | ||
| long address = taskMemoryManager.encodePageNumberAndOffset(page, baseOffset - 8); | ||
| long address = taskMemoryManager.encodePageNumberAndOffset(page, | ||
| baseOffset - 2 * UnsafeAlignedOffset.getUaoSize()); | ||
|
|
||
| // Compute prefix | ||
| row.pointTo(baseObject, baseOffset, loc.getKeyLength()); | ||
|
|
@@ -262,10 +264,11 @@ public int compare( | |
| Object baseObj2, | ||
| long baseOff2, | ||
| int baseLen2) { | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| // Note that since ordering doesn't need the total length of the record, we just pass 0 | ||
| // into the row. | ||
| row1.pointTo(baseObj1, baseOff1 + 4, 0); | ||
| row2.pointTo(baseObj2, baseOff2 + 4, 0); | ||
| row1.pointTo(baseObj1, baseOff1 + uaoSize, 0); | ||
| row2.pointTo(baseObj2, baseOff2 + uaoSize, 0); | ||
| return ordering.compare(row1, row2); | ||
| } | ||
| } | ||
|
|
@@ -289,11 +292,12 @@ public boolean next() throws IOException { | |
| long recordOffset = underlying.getBaseOffset(); | ||
| int recordLen = underlying.getRecordLength(); | ||
|
|
||
| // Note that recordLen = keyLen + valueLen + 4 bytes (for the keyLen itself) | ||
| // Note that recordLen = keyLen + valueLen + uaoSize (for the keyLen itself) | ||
| int uaoSize = UnsafeAlignedOffset.getUaoSize(); | ||
| int keyLen = Platform.getInt(baseObj, recordOffset); | ||
| int valueLen = recordLen - keyLen - 4; | ||
| key.pointTo(baseObj, recordOffset + 4, keyLen); | ||
| value.pointTo(baseObj, recordOffset + 4 + keyLen, valueLen); | ||
| int valueLen = recordLen - keyLen - uaoSize; | ||
| key.pointTo(baseObj, recordOffset + uaoSize, keyLen); | ||
| value.pointTo(baseObj, recordOffset + uaoSize + keyLen, valueLen); | ||
|
|
||
| return true; | ||
| } else { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in case of uaoSize = 4