From d4267b72a1aeecd057be86711d7f4e704f9b00c2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 10 Jun 2017 02:04:42 -0700 Subject: [PATCH 1/7] simplify the array offset and length in ColumnVector --- .../execution/vectorized/ColumnVector.java | 34 ++++++------- .../vectorized/OffHeapColumnVector.java | 46 ++---------------- .../vectorized/OnHeapColumnVector.java | 48 +++---------------- .../vectorized/ColumnarBatchSuite.scala | 16 +++---- 4 files changed, 34 insertions(+), 110 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 24260a60197f..f02cbfaade4a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.execution.vectorized; import java.math.BigDecimal; @@ -518,19 +519,13 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { public abstract double getDouble(int rowId); /** - * Puts a byte array that already exists in this column. - */ - public abstract void putArray(int rowId, int offset, int length); - - /** - * Returns the length of the array at rowid. + * After writing array elements to the child column vector, call this method to set the offset and + * length of the written array. */ - public abstract int getArrayLength(int rowId); - - /** - * Returns the offset of the array at rowid. - */ - public abstract int getArrayOffset(int rowId); + public void arrayWriteEnd(int rowId, int offset, int length) { + putInt(2 * rowId, offset); + putInt(2 * rowId + 1, length); + } /** * Returns a utility object to get structs. @@ -553,8 +548,8 @@ public ColumnarBatch.Row getStruct(int rowId, int size) { * Returns the array at rowid. */ public final Array getArray(int rowId) { - resultArray.length = getArrayLength(rowId); - resultArray.offset = getArrayOffset(rowId); + resultArray.offset = getInt(2 * rowId); + resultArray.length = getInt(2 * rowId + 1); return resultArray; } @@ -566,7 +561,12 @@ public final Array getArray(int rowId) { /** * Sets the value at rowId to `value`. */ - public abstract int putByteArray(int rowId, byte[] value, int offset, int count); + public int putByteArray(int rowId, byte[] value, int offset, int length) { + int result = arrayData().appendBytes(length, value, offset); + arrayWriteEnd(rowId, result, length); + return result; + } + public final int putByteArray(int rowId, byte[] value) { return putByteArray(rowId, value, 0, value.length); } @@ -829,13 +829,13 @@ public final int appendDoubles(int length, double[] src, int offset) { public final int appendByteArray(byte[] value, int offset, int length) { int copiedOffset = arrayData().appendBytes(length, value, offset); reserve(elementsAppended + 1); - putArray(elementsAppended, copiedOffset, length); + arrayWriteEnd(elementsAppended, copiedOffset, length); return elementsAppended++; } public final int appendArray(int length) { reserve(elementsAppended + 1); - putArray(elementsAppended, arrayData().elementsAppended, length); + arrayWriteEnd(elementsAppended, arrayData().elementsAppended, length); return elementsAppended++; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index a7d3744d00e9..72197803aa94 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -36,17 +36,11 @@ public final class OffHeapColumnVector extends ColumnVector { private long nulls; private long data; - // Set iff the type is array. - private long lengthData; - private long offsetData; - protected OffHeapColumnVector(int capacity, DataType type) { super(capacity, type, MemoryMode.OFF_HEAP); nulls = 0; data = 0; - lengthData = 0; - offsetData = 0; reserveInternal(capacity); reset(); @@ -66,12 +60,8 @@ public long nullsNativeAddress() { public void close() { Platform.freeMemory(nulls); Platform.freeMemory(data); - Platform.freeMemory(lengthData); - Platform.freeMemory(offsetData); nulls = 0; data = 0; - lengthData = 0; - offsetData = 0; } // @@ -395,35 +385,6 @@ public double getDouble(int rowId) { } } - // - // APIs dealing with Arrays. - // - @Override - public void putArray(int rowId, int offset, int length) { - assert(offset >= 0 && offset + length <= childColumns[0].capacity); - Platform.putInt(null, lengthData + 4 * rowId, length); - Platform.putInt(null, offsetData + 4 * rowId, offset); - } - - @Override - public int getArrayLength(int rowId) { - return Platform.getInt(null, lengthData + 4 * rowId); - } - - @Override - public int getArrayOffset(int rowId) { - return Platform.getInt(null, offsetData + 4 * rowId); - } - - // APIs dealing with ByteArrays - @Override - public int putByteArray(int rowId, byte[] value, int offset, int length) { - int result = arrayData().appendBytes(length, value, offset); - Platform.putInt(null, lengthData + 4 * rowId, length); - Platform.putInt(null, offsetData + 4 * rowId, result); - return result; - } - @Override public void loadBytes(ColumnVector.Array array) { if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length]; @@ -438,10 +399,9 @@ public void loadBytes(ColumnVector.Array array) { protected void reserveInternal(int newCapacity) { int oldCapacity = (this.data == 0L) ? 0 : capacity; if (this.resultArray != null) { - this.lengthData = - Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); - this.offsetData = - Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4); + // need 2 ints as offset and length for each array. + this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); + putInt(0, 0); } else if (type instanceof ByteType || type instanceof BooleanType) { this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); } else if (type instanceof ShortType) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 94ed32294cfa..d4cc39481d11 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -47,10 +47,6 @@ public final class OnHeapColumnVector extends ColumnVector { private float[] floatData; private double[] doubleData; - // Only set if type is Array. - private int[] arrayLengths; - private int[] arrayOffsets; - protected OnHeapColumnVector(int capacity, DataType type) { super(capacity, type, MemoryMode.ON_HEAP); reserveInternal(capacity); @@ -366,55 +362,23 @@ public double getDouble(int rowId) { } } - // - // APIs dealing with Arrays - // - - @Override - public int getArrayLength(int rowId) { - return arrayLengths[rowId]; - } - @Override - public int getArrayOffset(int rowId) { - return arrayOffsets[rowId]; - } - - @Override - public void putArray(int rowId, int offset, int length) { - arrayOffsets[rowId] = offset; - arrayLengths[rowId] = length; - } - @Override public void loadBytes(ColumnVector.Array array) { array.byteArray = byteData; array.byteArrayOffset = array.offset; } - // - // APIs dealing with Byte Arrays - // - - @Override - public int putByteArray(int rowId, byte[] value, int offset, int length) { - int result = arrayData().appendBytes(length, value, offset); - arrayOffsets[rowId] = result; - arrayLengths[rowId] = length; - return result; - } - // Spilt this function out since it is the slow path. @Override protected void reserveInternal(int newCapacity) { if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { - int[] newLengths = new int[newCapacity]; - int[] newOffsets = new int[newCapacity]; - if (this.arrayLengths != null) { - System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity); - System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity); + // need 2 ints as offset and length for each array. + if (intData == null || intData.length < newCapacity * 2) { + int[] newData = new int[newCapacity * 2]; + if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity * 2); + intData = newData; } - arrayLengths = newLengths; - arrayOffsets = newOffsets; + putInt(0, 0); } else if (type instanceof BooleanType) { if (byteData == null || byteData.length < newCapacity) { byte[] newData = new byte[newCapacity]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index e48e3f640290..36c591a9bee4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17) // Put the same "ll" at offset. This should not allocate more memory in the column. - column.putArray(idx, offset, 2) + column.arrayWriteEnd(idx, offset, 2) reference += "ll" idx += 1 assert(column.arrayData().elementsAppended == 17) @@ -644,7 +644,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17 + (s + s).length) reference.zipWithIndex.foreach { v => - assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode) + assert(v._1.length == column.getInt(v._2 * 2 + 1), "MemoryMode=" + memMode) assert(v._1 == column.getUTF8String(v._2).toString, "MemoryMode" + memMode) } @@ -659,7 +659,7 @@ class ColumnarBatchSuite extends SparkFunSuite { val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) // Fill the underlying data with all the arrays back to back. - val data = column.arrayData(); + val data = column.arrayData() var i = 0 while (i < 6) { data.putInt(i, i) @@ -667,10 +667,10 @@ class ColumnarBatchSuite extends SparkFunSuite { } // Populate it with arrays [0], [1, 2], [], [3, 4, 5] - column.putArray(0, 0, 1) - column.putArray(1, 1, 2) - column.putArray(2, 2, 0) - column.putArray(3, 3, 3) + column.arrayWriteEnd(0, 0, 1) + column.arrayWriteEnd(1, 1, 2) + column.arrayWriteEnd(2, 2, 0) + column.arrayWriteEnd(3, 3, 3) val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] @@ -703,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite { data.reserve(array.length) assert(data.capacity == array.length * 2) data.putInts(0, array.length, array, 0) - column.putArray(0, 0, array.length) + column.arrayWriteEnd(0, 0, array.length) assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] === array) }} From a61ba71ec6bf8245fcce423958d83cbc86b27adc Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Jun 2017 08:18:08 +0800 Subject: [PATCH 2/7] address comments --- .../spark/sql/execution/vectorized/ColumnVector.java | 8 ++++---- .../execution/vectorized/OffHeapColumnVector.java | 2 ++ .../sql/execution/vectorized/OnHeapColumnVector.java | 2 ++ .../execution/vectorized/ColumnarBatchSuite.scala | 12 ++++++------ 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index f02cbfaade4a..ea008cd8964b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -522,7 +522,7 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { * After writing array elements to the child column vector, call this method to set the offset and * length of the written array. */ - public void arrayWriteEnd(int rowId, int offset, int length) { + public void putArrayOffsetAndLength(int rowId, int offset, int length) { putInt(2 * rowId, offset); putInt(2 * rowId + 1, length); } @@ -563,7 +563,7 @@ public final Array getArray(int rowId) { */ public int putByteArray(int rowId, byte[] value, int offset, int length) { int result = arrayData().appendBytes(length, value, offset); - arrayWriteEnd(rowId, result, length); + putArrayOffsetAndLength(rowId, result, length); return result; } @@ -829,13 +829,13 @@ public final int appendDoubles(int length, double[] src, int offset) { public final int appendByteArray(byte[] value, int offset, int length) { int copiedOffset = arrayData().appendBytes(length, value, offset); reserve(elementsAppended + 1); - arrayWriteEnd(elementsAppended, copiedOffset, length); + putArrayOffsetAndLength(elementsAppended, copiedOffset, length); return elementsAppended++; } public final int appendArray(int length) { reserve(elementsAppended + 1); - arrayWriteEnd(elementsAppended, arrayData().elementsAppended, length); + putArrayOffsetAndLength(elementsAppended, arrayData().elementsAppended, length); return elementsAppended++; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 72197803aa94..f5fd833c7ec1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -34,6 +34,8 @@ public final class OffHeapColumnVector extends ColumnVector { // The data stored in these two allocations need to maintain binary compatible. We can // directly pass this buffer to external components. private long nulls; + // The actually data of this column vector will be store here. If it's an array column vector, + // we will store the offsets and lengths here, and store the element data in child column vector. private long data; protected OffHeapColumnVector(int capacity, DataType type) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index d4cc39481d11..b926447d8968 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -42,6 +42,8 @@ public final class OnHeapColumnVector extends ColumnVector { // Array for each type. Only 1 is populated for any type. private byte[] byteData; private short[] shortData; + // This is not used used to store data for int column vector, but also can store offsets and + // lengths for array column vector. private int[] intData; private long[] longData; private float[] floatData; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 36c591a9bee4..935c55b5cffe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17) // Put the same "ll" at offset. This should not allocate more memory in the column. - column.arrayWriteEnd(idx, offset, 2) + column.putArrayOffsetAndLength(idx, offset, 2) reference += "ll" idx += 1 assert(column.arrayData().elementsAppended == 17) @@ -667,10 +667,10 @@ class ColumnarBatchSuite extends SparkFunSuite { } // Populate it with arrays [0], [1, 2], [], [3, 4, 5] - column.arrayWriteEnd(0, 0, 1) - column.arrayWriteEnd(1, 1, 2) - column.arrayWriteEnd(2, 2, 0) - column.arrayWriteEnd(3, 3, 3) + column.putArrayOffsetAndLength(0, 0, 1) + column.putArrayOffsetAndLength(1, 1, 2) + column.putArrayOffsetAndLength(2, 2, 0) + column.putArrayOffsetAndLength(3, 3, 3) val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] @@ -703,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite { data.reserve(array.length) assert(data.capacity == array.length * 2) data.putInts(0, array.length, array, 0) - column.arrayWriteEnd(0, 0, array.length) + column.putArrayOffsetAndLength(0, 0, array.length) assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] === array) }} From 368c3462667bdb6822be01ecc95dfcdb04ce747a Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Jun 2017 09:06:51 +0800 Subject: [PATCH 3/7] more comments --- .../spark/sql/execution/vectorized/OffHeapColumnVector.java | 3 +-- .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index f5fd833c7ec1..e0ac47a80968 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -34,7 +34,7 @@ public final class OffHeapColumnVector extends ColumnVector { // The data stored in these two allocations need to maintain binary compatible. We can // directly pass this buffer to external components. private long nulls; - // The actually data of this column vector will be store here. If it's an array column vector, + // The actually data of this column vector will be stored here. If it's an array column vector, // we will store the offsets and lengths here, and store the element data in child column vector. private long data; @@ -403,7 +403,6 @@ protected void reserveInternal(int newCapacity) { if (this.resultArray != null) { // need 2 ints as offset and length for each array. this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); - putInt(0, 0); } else if (type instanceof ByteType || type instanceof BooleanType) { this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); } else if (type instanceof ShortType) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index b926447d8968..e9e123ad75d0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -42,7 +42,7 @@ public final class OnHeapColumnVector extends ColumnVector { // Array for each type. Only 1 is populated for any type. private byte[] byteData; private short[] shortData; - // This is not used used to store data for int column vector, but also can store offsets and + // This is not only used to store data for int column vector, but also can store offsets and // lengths for array column vector. private int[] intData; private long[] longData; @@ -377,10 +377,9 @@ protected void reserveInternal(int newCapacity) { // need 2 ints as offset and length for each array. if (intData == null || intData.length < newCapacity * 2) { int[] newData = new int[newCapacity * 2]; - if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity * 2); + if (intData != null) System.arraycopy(intData, 0, newData, 0, intData.length); intData = newData; } - putInt(0, 0); } else if (type instanceof BooleanType) { if (byteData == null || byteData.length < newCapacity) { byte[] newData = new byte[newCapacity]; From 1dae6604c0613a0b9e2a2a0dbc53a709cc232d09 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Jun 2017 10:02:31 +0800 Subject: [PATCH 4/7] minor --- .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 935c55b5cffe..c54f4bfd4eb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -669,7 +669,7 @@ class ColumnarBatchSuite extends SparkFunSuite { // Populate it with arrays [0], [1, 2], [], [3, 4, 5] column.putArrayOffsetAndLength(0, 0, 1) column.putArrayOffsetAndLength(1, 1, 2) - column.putArrayOffsetAndLength(2, 2, 0) + column.putArrayOffsetAndLength(2, 3, 0) column.putArrayOffsetAndLength(3, 3, 3) val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] From e6e60e0905dbc8693d840bf2c5e90148cccc8a97 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Jun 2017 13:06:56 +0800 Subject: [PATCH 5/7] use long array --- .../execution/vectorized/ColumnVector.java | 19 ++++++++++--------- .../vectorized/OffHeapColumnVector.java | 2 +- .../vectorized/OnHeapColumnVector.java | 12 ++++++------ .../vectorized/ColumnarBatchSuite.scala | 12 ++++++------ 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index ea008cd8964b..ff488ed4385c 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -520,11 +520,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { /** * After writing array elements to the child column vector, call this method to set the offset and - * length of the written array. + * size of the written array. */ - public void putArrayOffsetAndLength(int rowId, int offset, int length) { - putInt(2 * rowId, offset); - putInt(2 * rowId + 1, length); + public void putArrayOffsetAndSize(int rowId, int offset, int size) { + long offsetAndSize = (offset << 32) | size; + putLong(rowId, offsetAndSize); } /** @@ -548,8 +548,9 @@ public ColumnarBatch.Row getStruct(int rowId, int size) { * Returns the array at rowid. */ public final Array getArray(int rowId) { - resultArray.offset = getInt(2 * rowId); - resultArray.length = getInt(2 * rowId + 1); + long offsetAndSize = getLong(rowId); + resultArray.offset = (int) (offsetAndSize >> 32); + resultArray.length = (int) offsetAndSize; return resultArray; } @@ -563,7 +564,7 @@ public final Array getArray(int rowId) { */ public int putByteArray(int rowId, byte[] value, int offset, int length) { int result = arrayData().appendBytes(length, value, offset); - putArrayOffsetAndLength(rowId, result, length); + putArrayOffsetAndSize(rowId, result, length); return result; } @@ -829,13 +830,13 @@ public final int appendDoubles(int length, double[] src, int offset) { public final int appendByteArray(byte[] value, int offset, int length) { int copiedOffset = arrayData().appendBytes(length, value, offset); reserve(elementsAppended + 1); - putArrayOffsetAndLength(elementsAppended, copiedOffset, length); + putArrayOffsetAndSize(elementsAppended, copiedOffset, length); return elementsAppended++; } public final int appendArray(int length) { reserve(elementsAppended + 1); - putArrayOffsetAndLength(elementsAppended, arrayData().elementsAppended, length); + putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length); return elementsAppended++; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index e0ac47a80968..4dc4d34db37f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -401,7 +401,7 @@ public void loadBytes(ColumnVector.Array array) { protected void reserveInternal(int newCapacity) { int oldCapacity = (this.data == 0L) ? 0 : capacity; if (this.resultArray != null) { - // need 2 ints as offset and length for each array. + // need a long as offset and length for each array. this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8); } else if (type instanceof ByteType || type instanceof BooleanType) { this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index e9e123ad75d0..1eb14ae3763d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -42,9 +42,9 @@ public final class OnHeapColumnVector extends ColumnVector { // Array for each type. Only 1 is populated for any type. private byte[] byteData; private short[] shortData; + private int[] intData; // This is not only used to store data for int column vector, but also can store offsets and // lengths for array column vector. - private int[] intData; private long[] longData; private float[] floatData; private double[] doubleData; @@ -374,11 +374,11 @@ public void loadBytes(ColumnVector.Array array) { @Override protected void reserveInternal(int newCapacity) { if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { - // need 2 ints as offset and length for each array. - if (intData == null || intData.length < newCapacity * 2) { - int[] newData = new int[newCapacity * 2]; - if (intData != null) System.arraycopy(intData, 0, newData, 0, intData.length); - intData = newData; + // need 1 long as offset and length for each array. + if (longData == null || longData.length < newCapacity) { + long[] newData = new long[newCapacity]; + if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity); + longData = newData; } } else if (type instanceof BooleanType) { if (byteData == null || byteData.length < newCapacity) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index c54f4bfd4eb4..f96fc9e69583 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17) // Put the same "ll" at offset. This should not allocate more memory in the column. - column.putArrayOffsetAndLength(idx, offset, 2) + column.putArrayOffsetAndSize(idx, offset, 2) reference += "ll" idx += 1 assert(column.arrayData().elementsAppended == 17) @@ -667,10 +667,10 @@ class ColumnarBatchSuite extends SparkFunSuite { } // Populate it with arrays [0], [1, 2], [], [3, 4, 5] - column.putArrayOffsetAndLength(0, 0, 1) - column.putArrayOffsetAndLength(1, 1, 2) - column.putArrayOffsetAndLength(2, 3, 0) - column.putArrayOffsetAndLength(3, 3, 3) + column.putArrayOffsetAndSize(0, 0, 1) + column.putArrayOffsetAndSize(1, 1, 2) + column.putArrayOffsetAndSize(2, 3, 0) + column.putArrayOffsetAndSize(3, 3, 3) val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] @@ -703,7 +703,7 @@ class ColumnarBatchSuite extends SparkFunSuite { data.reserve(array.length) assert(data.capacity == array.length * 2) data.putInts(0, array.length, array, 0) - column.putArrayOffsetAndLength(0, 0, array.length) + column.putArrayOffsetAndSize(0, 0, array.length) assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] === array) }} From fdc08707e3e147b310ca928cc49c6babebb0d502 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Jun 2017 19:39:50 +0800 Subject: [PATCH 6/7] fix bug --- .../org/apache/spark/sql/execution/vectorized/ColumnVector.java | 2 +- .../spark/sql/execution/vectorized/OnHeapColumnVector.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index ff488ed4385c..e50799eeb27b 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -523,7 +523,7 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { * size of the written array. */ public void putArrayOffsetAndSize(int rowId, int offset, int size) { - long offsetAndSize = (offset << 32) | size; + long offsetAndSize = (((long) offset) << 32) | size; putLong(rowId, offsetAndSize); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 1eb14ae3763d..4d23405dc7b1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -43,7 +43,7 @@ public final class OnHeapColumnVector extends ColumnVector { private byte[] byteData; private short[] shortData; private int[] intData; - // This is not only used to store data for int column vector, but also can store offsets and + // This is not only used to store data for long column vector, but also can store offsets and // lengths for array column vector. private long[] longData; private float[] floatData; From 04790bbc0a6d10c805731e9dc45719a2ad9211d4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 12 Jun 2017 21:32:27 +0800 Subject: [PATCH 7/7] fix test --- .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index f96fc9e69583..5c4128a70dd8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -644,7 +644,8 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(column.arrayData().elementsAppended == 17 + (s + s).length) reference.zipWithIndex.foreach { v => - assert(v._1.length == column.getInt(v._2 * 2 + 1), "MemoryMode=" + memMode) + val offsetAndLength = column.getLong(v._2) + assert(v._1.length == offsetAndLength.toInt, "MemoryMode=" + memMode) assert(v._1 == column.getUTF8String(v._2).toString, "MemoryMode" + memMode) }