-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20783][SQL] Enhance ColumnVector to keep UnsafeArrayData for array #18014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Jenkins, test this please |
1 similar comment
|
Jenkins, test this please |
|
Test build #77017 has finished for PR 18014 at commit
|
|
Test build #77023 has finished for PR 18014 at commit
|
|
Test build #77024 has finished for PR 18014 at commit
|
|
@hvanhovell @sameeragarwal would it be possible to look at this? |
|
I may miss something, can we just treat array type as binary type and put it in |
|
I thought that idea is for Apache Arrow. Is it better to use existing code? |
|
@cloud-fan What would you think? |
|
@cloud-fan Could you please let us know your thoughts? |
|
I think there is a gap between columnar format and the unsafe row format. The current While changing it to binary makes it faster if we need to read an array from |
|
@cloud-fan Thank you for your comments. Let me confirm your ideas.
We can map the current
The issue is a conversion of On the other hand, current my approach stores the whole What do you think? |
|
I took a look at |
|
Yes. Let me implement new |
|
@cloud-fan When I think about the use case of The following is an example program and current generated code. In the generated code, we will replace At projection, if a type of I think that there are three options.
What do you think? Or, are there any other ideas? val df = sparkContext.parallelize(Seq(Array(0, 1), Array(1, 2)), 1).toDF("a").cache
df.count
df.filter("a[0] > 0").show/* 031 */ protected void processNext() throws java.io.IOException {
/* 032 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 033 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 034 */ boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 035 */ ArrayData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getArray(0));
/* 037 */ if (!(!(inputadapter_isNull))) continue;
/* 039 */ boolean filter_isNull2 = true;
/* 040 */ boolean filter_value2 = false;
/* 042 */ boolean filter_isNull3 = true;
/* 043 */ int filter_value3 = -1;
/* 045 */ filter_isNull3 = false;
/* 047 */ final int filter_index = (int) 0;
/* 048 */ if (filter_index >= inputadapter_value.numElements() || filter_index < 0 || inputadapter_value.isNullAt(filter_index)) {
/* 049 */ filter_isNull3 = true;
/* 050 */ } else {
/* 051 */ filter_value3 = inputadapter_value.getInt(filter_index);
/* 052 */ }
/* 053 */ if (!filter_isNull3) {
/* 054 */ filter_isNull2 = false;
/* 055 */ filter_value2 = filter_value3 > 0;
/* 057 */ }
/* 058 */ if (filter_isNull2 || !filter_value2) continue;
/* 060 */ filter_numOutputRows.add(1);
/* 062 */ filter_holder.reset();
/* 066 */ final int filter_tmpCursor = filter_holder.cursor;
/* 068 */ if (inputadapter_value instanceof UnsafeArrayData) {
/* 069 */ final int filter_sizeInBytes = ((UnsafeArrayData) inputadapter_value).getSizeInBytes();
/* 071 */ filter_holder.grow(filter_sizeInBytes);
/* 072 */ ((UnsafeArrayData) inputadapter_value).writeToMemory(filter_holder.buffer, filter_holder.cursor);
/* 073 */ filter_holder.cursor += filter_sizeInBytes;
/* 075 */ } else {
/* 076 */ final int filter_numElements = inputadapter_value.numElements();
/* 077 */ filter_arrayWriter.initialize(filter_holder, filter_numElements, 4);
/* 079 */ for (int filter_index1 = 0; filter_index1 < filter_numElements; filter_index1++) {
/* 080 */ if (inputadapter_value.isNullAt(filter_index1)) {
/* 081 */ filter_arrayWriter.setNullInt(filter_index1);
/* 082 */ } else {
/* 083 */ final int filter_element = inputadapter_value.getInt(filter_index1);
/* 084 */ filter_arrayWriter.write(filter_index1, filter_element);
/* 085 */ }
/* 086 */ }
/* 087 */ }
/* 089 */ filter_rowWriter.setOffsetAndSize(0, filter_tmpCursor, filter_holder.cursor - filter_tmpCursor);
/* 090 */ filter_result.setTotalSize(filter_holder.totalSize());
/* 091 */ append(filter_result);
/* 092 */ if (shouldStop()) return;
/* 093 */ }
/* 094 */ } |
|
I think option 2 is better. But if we do need to copy the array in the final projection, we should also speed it up by bulk copy. For the write path, |
|
Test build #77384 has finished for PR 18014 at commit
|
|
Test build #77385 has finished for PR 18014 at commit
|
|
@cloud-fan Here is an implementation based on Option 2 only for simple data types (e.g. boolean, int, double, and so on). Used bulk-copy for array body in |
| } else if (et == DataTypes.DoubleType) { | ||
| Platform.copyMemory( | ||
| src, srcOffset, doubleData, Platform.DOUBLE_ARRAY_OFFSET + dstOffset * 8, numElements * 8); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we wanna support nested arrays, do we need to do multiple bulk copies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may need to update UnsafeArrayData to put leaf elements together like ColumnVector did, then we can just use one bulk copy to write a nested array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, current implementation will require do multiple bulk copies for a nested array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, UnsafeArrayData put all of the elements together in a contiguous memory region even when it is a nested array.
On the other hand, current ColumnVector puts elements for each dimension in different ColumnVector.childColumns[0]. Thus, multiple bulk copies are required.
If there is an array Array(Array(1), Array(2)), in ColumnVector, Array(1) and Array(2) are stored into different intData in ColumnVector.childColumns[0].
Am I correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no I think it's the opposite. UnsafeArrayData will blindly put elements together, if the element is also array type, each element will contain null bits, so the leaf elements are not together. For ColumnVector, you can take a look at ColumnarBatchSuite.String APIs, the leaf elements are together
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so, ColumnVector.Array.getArray actually return the same instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about this over weekend since I am busy to prepare a slide for SparkSummit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan I agree that ColumnVector.Array.getArray returns the same instance in ColumnVector.resultArray. This is good for one-dimensional array.
In my understanding, we have to get ColumnVector from this instance to supporting nested array. However, current ColumnVector.Array does not have getter for ColumnVector. It is hard to implement nested array in current ColumnVector implementation. Am I missing something? Or, do you have any idea to enhance ColumnVector?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ping @cloud-fan
|
What is the latest status of this PR? |
|
It is older one. Let me close this. Then, I will submit another PR very soon to do the same thing in different way. |
What changes were proposed in this pull request?
This PR enhances
ColumnVectorto keepUnsafeArrayDatafor array to useColumnVectorfor table cache (e.g. CACHE table, DataFrame.cache). Other complex types such as Map and struct will be addressed by another PR if it is OK.Current
ColumnVectoraccepts only primitive-type Java array as an input for array. It is good to keep data from Parquet.This PR changed or added the following APIs:
ColumnVector ColumnVector.allocate(int capacity, DataType type, MemoryMode mode, boolean useUnsafeArrayData)ColumnVectorcan keepUnsafeArrayData. If it is false, theColumnVectorcannot keepUnsafeArrayData.int ColumnVector.putArray(int rowId, ArrayData array)ColumnVectorwas generated withuseUnsafeArrayData=true, this method storesUnsafeArrayDataintoColumnVector. Otherwise, throw an exception.ArrayData ColumnVector.getArray(int rowId)ColumnVectorwas generated withuseUnsafeArrayData=true, this method returnsUnsafeArrayData.How was this patch tested?
Update existing testsuite