-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder classes #20850
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #88342 has finished for PR 20850 at commit
|
|
Test build #88343 has finished for PR 20850 at commit
|
|
I think the failure of |
|
cc @hvanhovell |
|
retest this please |
|
Test build #88347 has finished for PR 20850 at commit
|
|
retest this please |
|
Test build #88373 has finished for PR 20850 at commit
|
|
This test is consistently failed. While I did not change file reader, I am investigating the reason in my environment. |
| final int $tmpCursor = $bufferHolder.cursor; | ||
| ${writeStructToBuffer(ctx, input.value, t.map(_.dataType), bufferHolder)} | ||
| $rowWriter.setOffsetAndSize($index, $tmpCursor, $bufferHolder.cursor - $tmpCursor); | ||
| final int $tmpCursor = $rowWriter.cursor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems a bit weird that we have to are storing state internal to the UnsafeWriter/BufferHolder here. It would be very nice if we can internalize this code into the UnsafeWriter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree with you. I will internalize this code that are frequently used.
|
@kiszk this is a good start! This is very performance critical code, can you please extend/update/run the existing |
|
|
||
| int getCursor() { return cursor; } | ||
|
|
||
| void addCursor(int val) { cursor += val; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incrementCursor?
| super(writer.getBufferHolder()); | ||
| } | ||
|
|
||
| public void initialize(int numElements, int elementSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we move elementSize into the constructor? I don't think there are case where we are reusing UnsafeArrayWriter s.
| * if the fields of row are all fixed-length, as the size of result row is also fixed. | ||
| */ | ||
| public class BufferHolder { | ||
| public final class BufferHolder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this still public since you are making everything package private?
| } | ||
|
|
||
| public void reset() { | ||
| byte[] buffer() { return buffer; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: need line feeds to make styles along with other code?
| public int numBytes() { |
byte[] buffer() {
return buffer;
}
|
|
||
| int getCursor() { return cursor; } | ||
|
|
||
| void addCursor(int val) { cursor += val; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
advanceCursor?
| return holder; | ||
| } | ||
|
|
||
| public final byte[] buffer() { return holder.buffer(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need these delegator methods? How about making holder protected same with WritableColumnVector?
spark/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
Line 130 in 4de638c
| protected Dictionary dictionary; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if we make holder default in the org.apache.spark.sql.catalyst.expressions.codegen package, it is inaccessible from the org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection class.
We do not want to expose BufferHolder class outside Unsafe*Row classes, too.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No performance impact?
| this.startingOffset = cursor(); | ||
| } | ||
|
|
||
| public void setTotalSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming flip along with java ByteBuffer? If we call row.setTotalSize(totalSize) and reset BufferHolder positions inside flip, can we remove UnsafeWriter.reset in the head?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be. Beyond that, the current approach using reset and setTotalSize() looks easy to read the generated code.
It is clear to understand the beginning and end of the region. If it is critical to remove the UnsafeWriter.reset method, I agree with renaming to flip.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we'd like to make generated code blocks easy-to-read, we should depend on generated comments instead of api names, I think. Anyway, this decision depends on other dev's thoughts.
|
|
||
| public UnsafeRowWriter(BufferHolder holder, int numFields) { | ||
| this.holder = holder; | ||
| public UnsafeRowWriter(UnsafeRow row, int initialBufferSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need two UnsafeRow constructors?
For the the top level row writer I also think it might be nice to create row internally, and just have a constructor that takes a numFields and (optionally) size argument.
| addCursor(16); | ||
| } | ||
|
|
||
| protected final void _write(long offset, boolean value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the _write names? Just call themwriteBoolean etc...
|
Test build #88378 has finished for PR 20850 at commit
|
|
Test build #88380 has finished for PR 20850 at commit
|
| cursor += val; | ||
| } | ||
|
|
||
| int pushCursor() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a little bit less complex? I think just storing the cursor in the UnsafeWriter is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since one BufferHolder is shared by multiple UnsafeWriters, it seems to be simple to store cursors into BufferHolders.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also feel the current code is a bit complicated. Can't we avoid the sharing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it is complicated. pushCursor will be dropped.
|
@hvanhovell btw, (this is not related to this pr thought...) the most part of code in |
| val tmpCursor = bufferHolder.cursor | ||
| writeArray(bufferHolder, arrayWriter, elementWriter, v.getArray(i), elementSize) | ||
| writer.setOffsetAndSize(i, tmpCursor, bufferHolder.cursor - tmpCursor) | ||
| arrayWriter.markCursor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the performance view, this abstraction may have more performance impact since we move temporal value on local frame into that on Java stack
arrayWriter.markCursor()
writeArray(arrayWriter, elementWriter, v.getArray(i))
writer.setOffsetAndSizeFromMark(i)
Is this implementation enough from the balance of performance and abstraction? Or, is it better to do like this?
val mark = arrayWriter.cursor()
writeArray(arrayWriter, elementWriter, v.getArray(i))
writer.setOffsetAndSizeFromMark(i, mark)
@Maropo @hvanhovell WDYT?
|
Test build #88411 has finished for PR 20850 at commit
|
|
Test build #88728 has finished for PR 20850 at commit
|
|
retest this please |
|
Test build #88760 has finished for PR 20850 at commit
|
| return cursor; | ||
| } | ||
|
|
||
| void incrementCursor(int val) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo, should be increaseCursor
| final long offsetAndSize = (relativeOffset << 32) | (long)size; | ||
|
|
||
| write(ordinal, offsetAndSize); | ||
| _setOffsetAndSizeFromPreviousCursor(ordinal, mark); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_setOffsetAndSizeFromPreviousCursor calls setOffsetAndSize, which calls write and then calls assertIndexIsValid.
So we don't need _setOffsetAndSizeFromPreviousCursor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good catch
|
Test build #88787 has finished for PR 20850 at commit
|
|
retest this please |
|
Test build #88789 has finished for PR 20850 at commit
|
| } | ||
|
|
||
| public void reset() { | ||
| byte[] buffer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: getBuffer is more java-style
| BitSetMethods.set(buffer(), startingOffset + 8, ordinal); | ||
| } | ||
|
|
||
| public void setNull1Bytes(int ordinal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems now we only need a single setNullAt method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, UnsafeRowWriter need a single setNullAt method for 8-byte width field.
On the other hand, UnsafeArrayWriter needs multiple setNull?Bytes() for different element size. Generated code also uses setNull?Bytes for array elements..
Could you elaborate your thought?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need the various setNull* methods because of arrays.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, I thought we can pattern match the elementSize, but that may hurt performance a lot for the codegen version.
| final long offset = getFieldOffset(ordinal); | ||
| Platform.putLong(holder.buffer, offset, 0L); | ||
| Platform.putBoolean(holder.buffer, offset, value); | ||
| Platform.putLong(buffer(), offset, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeLong(0)?
| final long offset = getFieldOffset(ordinal); | ||
| Platform.putLong(holder.buffer, offset, 0L); | ||
| Platform.putBoolean(holder.buffer, offset, value); | ||
| Platform.putLong(buffer(), offset, 0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeLong(offset, 0L)?
| public void write(int ordinal, Decimal input, int precision, int scale) { | ||
| if (precision <= Decimal.MAX_LONG_DIGITS()) { | ||
| // make sure Decimal object has the same scale as DecimalType | ||
| if (input.changePrecision(precision, scale)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need input != null here like https://github.com/apache/spark/pull/20850/files#diff-85658ffc242280699a331c90530f54baR149
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will add input != null.
I am also curious about the differences between these two methods.
| unsafeRow.setTotalSize(bufferHolder.totalSize()); | ||
| return unsafeRow; | ||
| rowWriter.setTotalSize(); | ||
| return rowWriter.getRow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any place where we call getRow() without calling setTotalSize() before that? If there aren't then I'd combine the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that here is only a place where we call getRow() without calling setTotalSize() if numVarLenFields == 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we call reset() and setTotalSize() as the interpreted version does?
|
Test build #88801 has finished for PR 20850 at commit
|
| * | ||
| * Generally we should call `UnsafeRowWriter.setTotalSize` to update the size of the result row, | ||
| * after writing a record to the buffer. However, we can skip this step if the fields of row are | ||
| * all fixed-length, as the size of result row is also fixed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if this optimization is really necessary. Maybe we can always update total size in getRow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. We will merge setTotalSize and getRow into getRow.
|
LGTM |
|
@kiszk can you rerun the Otherwise LGTM. |
|
@hvanhovell Here are results of With master With SPARK-23713 |
|
Merging to master. Thanks for your hard work and patience. |
## What changes were proposed in this pull request? This PR implemented the following cleanups related to `UnsafeWriter` class: - Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter` - Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter` - Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()` ## How was this patch tested? Tested by existing UTs Author: Kazuaki Ishizaki <[email protected]> Closes apache#20850 from kiszk/SPARK-23713.
## What changes were proposed in this pull request? This PR implemented the following cleanups related to `UnsafeWriter` class: - Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter` - Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter` - Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()` ## How was this patch tested? Tested by existing UTs Author: Kazuaki Ishizaki <[email protected]> Closes apache#20850 from kiszk/SPARK-23713.
## What changes were proposed in this pull request? In #20850 when writing non-null decimals, instead of zero-ing all the 16 allocated bytes, we zero-out only the padding bytes. Since we always allocate 16 bytes, if the number of bytes needed for a decimal is lower than 9, then this means that the bytes between 8 and 16 are not zero-ed. I see 2 solutions here: - we can zero-out all the bytes in advance as it was done before #20850 (safer solution IMHO); - we can allocate only the needed bytes (may be a bit more efficient in terms of memory used, but I have not investigated the feasibility of this option). Hence I propose here the first solution in order to fix the correctness issue. We can eventually switch to the second if we think is more efficient later. ## How was this patch tested? Running the test attached in the JIRA + added UT Closes #22602 from mgaido91/SPARK-25582. Authored-by: Marco Gaido <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit d7ae36a) Signed-off-by: Dongjoon Hyun <[email protected]>
## What changes were proposed in this pull request? In apache#20850 when writing non-null decimals, instead of zero-ing all the 16 allocated bytes, we zero-out only the padding bytes. Since we always allocate 16 bytes, if the number of bytes needed for a decimal is lower than 9, then this means that the bytes between 8 and 16 are not zero-ed. I see 2 solutions here: - we can zero-out all the bytes in advance as it was done before apache#20850 (safer solution IMHO); - we can allocate only the needed bytes (may be a bit more efficient in terms of memory used, but I have not investigated the feasibility of this option). Hence I propose here the first solution in order to fix the correctness issue. We can eventually switch to the second if we think is more efficient later. ## How was this patch tested? Running the test attached in the JIRA + added UT Closes apache#22602 from mgaido91/SPARK-25582. Authored-by: Marco Gaido <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
This PR implemented the following cleanups related to `UnsafeWriter` class: - Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter` - Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter` - Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()` Tested by existing UTs Author: Kazuaki Ishizaki <[email protected]> Closes apache#20850 from kiszk/SPARK-23713. Ref: LIHADOOP-48531
What changes were proposed in this pull request?
This PR implemented the following cleanups related to
UnsafeWriterclass:UnsafeRowWriterandUnsafeArrayWriterBufferHolderclass internal by delegating its accessor methods toUnsafeWriterUnsafeRow.setTotalSize(...)withUnsafeRowWriter.setTotalSize()How was this patch tested?
Tested by existing UTs