-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21046][SQL] simplify the array offset and length in ColumnVector #18260
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
d4267b7
a61ba71
368c346
1dae660
e6e60e0
fdc0870
04790bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,19 +34,15 @@ public final class OffHeapColumnVector extends ColumnVector { | |
| // The data stored in these two allocations need to maintain binary compatible. We can | ||
| // directly pass this buffer to external components. | ||
| private long nulls; | ||
| // The actually data of this column vector will be store here. If it's an array column vector, | ||
| // we will store the offsets and lengths here, and store the element data in child column vector. | ||
| private long data; | ||
|
|
||
| // Set iff the type is array. | ||
| private long lengthData; | ||
| private long offsetData; | ||
|
|
||
| protected OffHeapColumnVector(int capacity, DataType type) { | ||
| super(capacity, type, MemoryMode.OFF_HEAP); | ||
|
|
||
| nulls = 0; | ||
| data = 0; | ||
| lengthData = 0; | ||
| offsetData = 0; | ||
|
|
||
| reserveInternal(capacity); | ||
| reset(); | ||
|
|
@@ -66,12 +62,8 @@ public long nullsNativeAddress() { | |
| public void close() { | ||
| Platform.freeMemory(nulls); | ||
| Platform.freeMemory(data); | ||
| Platform.freeMemory(lengthData); | ||
| Platform.freeMemory(offsetData); | ||
| nulls = 0; | ||
| data = 0; | ||
| lengthData = 0; | ||
| offsetData = 0; | ||
| } | ||
|
|
||
| // | ||
|
|
@@ -395,35 +387,6 @@ public double getDouble(int rowId) { | |
| } | ||
| } | ||
|
|
||
| // | ||
| // APIs dealing with Arrays. | ||
| // | ||
| @Override | ||
| public void putArray(int rowId, int offset, int length) { | ||
| assert(offset >= 0 && offset + length <= childColumns[0].capacity); | ||
| Platform.putInt(null, lengthData + 4 * rowId, length); | ||
| Platform.putInt(null, offsetData + 4 * rowId, offset); | ||
| } | ||
|
|
||
| @Override | ||
| public int getArrayLength(int rowId) { | ||
| return Platform.getInt(null, lengthData + 4 * rowId); | ||
| } | ||
|
|
||
| @Override | ||
| public int getArrayOffset(int rowId) { | ||
| return Platform.getInt(null, offsetData + 4 * rowId); | ||
| } | ||
|
|
||
| // APIs dealing with ByteArrays | ||
| @Override | ||
| public int putByteArray(int rowId, byte[] value, int offset, int length) { | ||
| int result = arrayData().appendBytes(length, value, offset); | ||
| Platform.putInt(null, lengthData + 4 * rowId, length); | ||
| Platform.putInt(null, offsetData + 4 * rowId, result); | ||
| return result; | ||
| } | ||
|
|
||
| @Override | ||
| public void loadBytes(ColumnVector.Array array) { | ||
| if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length]; | ||
|
|
@@ -438,10 +401,9 @@ public void loadBytes(ColumnVector.Array array) { | |
| protected void reserveInternal(int newCapacity) { | ||
| int oldCapacity = (this.data == 0L) ? 0 : capacity; | ||
| if (this.resultArray != null) { | ||
| this.lengthData = | ||
| Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); | ||
| this.offsetData = | ||
| Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4); | ||
| // need 2 ints as offset and length for each array. | ||
|
||
| this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); | ||
| putInt(0, 0); | ||
|
||
| } else if (type instanceof ByteType || type instanceof BooleanType) { | ||
| this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); | ||
| } else if (type instanceof ShortType) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -42,15 +42,13 @@ public final class OnHeapColumnVector extends ColumnVector { | |
| // Array for each type. Only 1 is populated for any type. | ||
| private byte[] byteData; | ||
| private short[] shortData; | ||
| // This is not used used to store data for int column vector, but also can store offsets and | ||
|
||
| // lengths for array column vector. | ||
| private int[] intData; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One question I just have is, the capacity of
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Is it possible to use
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Even we pass this check, we still face problem when allocating
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @kiszk Do you meant we store a pair of offset/length together as an element in |
||
| private long[] longData; | ||
| private float[] floatData; | ||
| private double[] doubleData; | ||
|
|
||
| // Only set if type is Array. | ||
| private int[] arrayLengths; | ||
| private int[] arrayOffsets; | ||
|
|
||
| protected OnHeapColumnVector(int capacity, DataType type) { | ||
| super(capacity, type, MemoryMode.ON_HEAP); | ||
| reserveInternal(capacity); | ||
|
|
@@ -366,55 +364,23 @@ public double getDouble(int rowId) { | |
| } | ||
| } | ||
|
|
||
| // | ||
| // APIs dealing with Arrays | ||
| // | ||
|
|
||
| @Override | ||
| public int getArrayLength(int rowId) { | ||
| return arrayLengths[rowId]; | ||
| } | ||
| @Override | ||
| public int getArrayOffset(int rowId) { | ||
| return arrayOffsets[rowId]; | ||
| } | ||
|
|
||
| @Override | ||
| public void putArray(int rowId, int offset, int length) { | ||
| arrayOffsets[rowId] = offset; | ||
| arrayLengths[rowId] = length; | ||
| } | ||
|
|
||
| @Override | ||
| public void loadBytes(ColumnVector.Array array) { | ||
| array.byteArray = byteData; | ||
| array.byteArrayOffset = array.offset; | ||
| } | ||
|
|
||
| // | ||
| // APIs dealing with Byte Arrays | ||
| // | ||
|
|
||
| @Override | ||
| public int putByteArray(int rowId, byte[] value, int offset, int length) { | ||
| int result = arrayData().appendBytes(length, value, offset); | ||
| arrayOffsets[rowId] = result; | ||
| arrayLengths[rowId] = length; | ||
| return result; | ||
| } | ||
|
|
||
| // Spilt this function out since it is the slow path. | ||
| @Override | ||
| protected void reserveInternal(int newCapacity) { | ||
| if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { | ||
| int[] newLengths = new int[newCapacity]; | ||
| int[] newOffsets = new int[newCapacity]; | ||
| if (this.arrayLengths != null) { | ||
| System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity); | ||
| System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity); | ||
| // need 2 ints as offset and length for each array. | ||
|
||
| if (intData == null || intData.length < newCapacity * 2) { | ||
| int[] newData = new int[newCapacity * 2]; | ||
|
||
| if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity * 2); | ||
|
||
| intData = newData; | ||
| } | ||
| arrayLengths = newLengths; | ||
| arrayOffsets = newOffsets; | ||
| putInt(0, 0); | ||
|
||
| } else if (type instanceof BooleanType) { | ||
| if (byteData == null || byteData.length < newCapacity) { | ||
| byte[] newData = new byte[newCapacity]; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite { | |
| assert(column.arrayData().elementsAppended == 17) | ||
|
|
||
| // Put the same "ll" at offset. This should not allocate more memory in the column. | ||
| column.putArray(idx, offset, 2) | ||
| column.putArrayOffsetAndLength(idx, offset, 2) | ||
| reference += "ll" | ||
| idx += 1 | ||
| assert(column.arrayData().elementsAppended == 17) | ||
|
|
@@ -644,7 +644,7 @@ class ColumnarBatchSuite extends SparkFunSuite { | |
| assert(column.arrayData().elementsAppended == 17 + (s + s).length) | ||
|
|
||
| reference.zipWithIndex.foreach { v => | ||
| assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode) | ||
| assert(v._1.length == column.getInt(v._2 * 2 + 1), "MemoryMode=" + memMode) | ||
|
||
| assert(v._1 == column.getUTF8String(v._2).toString, | ||
| "MemoryMode" + memMode) | ||
| } | ||
|
|
@@ -659,18 +659,18 @@ class ColumnarBatchSuite extends SparkFunSuite { | |
| val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) | ||
|
|
||
| // Fill the underlying data with all the arrays back to back. | ||
| val data = column.arrayData(); | ||
| val data = column.arrayData() | ||
| var i = 0 | ||
| while (i < 6) { | ||
| data.putInt(i, i) | ||
| i += 1 | ||
| } | ||
|
|
||
| // Populate it with arrays [0], [1, 2], [], [3, 4, 5] | ||
| column.putArray(0, 0, 1) | ||
| column.putArray(1, 1, 2) | ||
| column.putArray(2, 2, 0) | ||
| column.putArray(3, 3, 3) | ||
| column.putArrayOffsetAndLength(0, 0, 1) | ||
| column.putArrayOffsetAndLength(1, 1, 2) | ||
| column.putArrayOffsetAndLength(2, 2, 0) | ||
|
||
| column.putArrayOffsetAndLength(3, 3, 3) | ||
|
|
||
| val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] | ||
| val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] | ||
|
|
@@ -703,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite { | |
| data.reserve(array.length) | ||
| assert(data.capacity == array.length * 2) | ||
| data.putInts(0, array.length, array, 0) | ||
| column.putArray(0, 0, array.length) | ||
| column.putArrayOffsetAndLength(0, 0, array.length) | ||
| assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] | ||
| === array) | ||
| }} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
will be store->will be stored?