From b939a0819ac9feb4ff779f50a19ceb101cada999 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Tue, 25 Apr 2017 01:56:48 +0900 Subject: [PATCH 1/4] Keep UnsafeArrayData for Array in ColumnVector --- .../execution/vectorized/ColumnVector.java | 60 +++++++++++++-- .../execution/vectorized/ColumnarBatch.java | 14 ++-- .../vectorized/OffHeapColumnVector.java | 11 +++ .../vectorized/OnHeapColumnVector.java | 59 ++++++++++++++- .../vectorized/ColumnarBatchSuite.scala | 73 ++++++++++++++++--- 5 files changed, 190 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index ad267ab0c9c4..4a27e82f5e71 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -25,10 +25,12 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -64,10 +66,14 @@ public abstract class ColumnVector implements AutoCloseable { * in number of elements, not number of bytes. */ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) { + return allocate(capacity, type, mode, false); + } + + public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode, boolean useUnsafeArrayData) { if (mode == MemoryMode.OFF_HEAP) { return new OffHeapColumnVector(capacity, type); } else { - return new OnHeapColumnVector(capacity, type); + return new OnHeapColumnVector(capacity, type, useUnsafeArrayData); } } @@ -556,10 +562,32 @@ public ColumnarBatch.Row getStruct(int rowId, int size) { /** * Returns the array at rowid. */ - public final Array getArray(int rowId) { - resultArray.length = getArrayLength(rowId); - resultArray.offset = getArrayOffset(rowId); - return resultArray; + public abstract UnsafeArrayData getUnsafeArray(int rowId); + + public final ArrayData getArray(int rowId) { + if (!useUnsafeArrayData) { + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); + return resultArray; + } else { + return getUnsafeArray(rowId); + } + } + + public final int putArray(int rowId, ArrayData array) { + if (!useUnsafeArrayData) { + throw new UnsupportedOperationException(); + } + UnsafeArrayData unsafeArray = (UnsafeArrayData)array; + Object baseObjects = unsafeArray.getBaseObject(); + long offset = unsafeArray.getBaseOffset(); + int length = unsafeArray.getSizeInBytes(); + if (offset > Integer.MAX_VALUE) { + throw new UnsupportedOperationException( + "Cannot put this array to ColumnVector as it's too big."); + } + putArray(rowId, baseObjects, (int) offset, length); + return length; } /** @@ -570,6 +598,7 @@ public final Array getArray(int rowId) { /** * Sets the value at rowId to `value`. */ + public abstract void putArray(int rowId, Object value, int offset, int count); public abstract int putByteArray(int rowId, byte[] value, int offset, int count); public final int putByteArray(int rowId, byte[] value) { return putByteArray(rowId, value, 0, value.length); @@ -579,7 +608,9 @@ public final int putByteArray(int rowId, byte[] value) { * Returns the value for rowId. */ private Array getByteArray(int rowId) { - Array array = getArray(rowId); + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); + Array array = resultArray; array.data.loadBytes(array); return array; } @@ -884,7 +915,9 @@ public final int appendStruct(boolean isNull) { /** * Returns true if this column is an array. */ - public final boolean isArray() { return resultArray != null; } + public final boolean isArray() { + return (resultArray != null) || (type instanceof ArrayType); + } /** * Marks this column as being constant. @@ -902,6 +935,8 @@ public final int appendStruct(boolean isNull) { @VisibleForTesting protected int MAX_CAPACITY = Integer.MAX_VALUE; + protected boolean useUnsafeArrayData; + /** * Data type for this column. */ @@ -999,10 +1034,19 @@ public ColumnVector getDictionaryIds() { * type. */ protected ColumnVector(int capacity, DataType type, MemoryMode memMode) { + this(capacity, type, memMode, false); + } + + protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean useUnsafeArrayData) { this.capacity = capacity; this.type = type; + this.useUnsafeArrayData = useUnsafeArrayData; - if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType + if (useUnsafeArrayData && type instanceof ArrayType) { + this.childColumns = null; + this.resultArray = null; + this.resultStruct = null; + } else if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType || DecimalType.isByteArrayDecimalType(type)) { DataType childType; int childCapacity = capacity; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index a6ce4c2edc23..d074719055b7 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -65,15 +65,19 @@ public final class ColumnarBatch { final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { - return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); + return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode, false); } public static ColumnarBatch allocate(StructType type) { - return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); + return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE, false); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { - return new ColumnarBatch(schema, maxRows, memMode); + return allocate(schema, memMode, maxRows, false); + } + + public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows, boolean useUnsafeArrayData) { + return new ColumnarBatch(schema, maxRows, memMode, useUnsafeArrayData); } /** @@ -466,7 +470,7 @@ public void filterNullsInColumn(int ordinal) { nullFilteredColumns.add(ordinal); } - private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) { + private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode, boolean useUnsafeArrayData) { this.schema = schema; this.capacity = maxRows; this.columns = new ColumnVector[schema.size()]; @@ -475,7 +479,7 @@ private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) { for (int i = 0; i < schema.fields().length; ++i) { StructField field = schema.fields()[i]; - columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode, useUnsafeArrayData); } this.row = new Row(this); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index a7d3744d00e9..3e58e80055ef 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -20,6 +20,7 @@ import java.nio.ByteOrder; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; @@ -405,6 +406,16 @@ public void putArray(int rowId, int offset, int length) { Platform.putInt(null, offsetData + 4 * rowId, offset); } + @Override + public void putArray(int rowId, Object src, int offset, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public UnsafeArrayData getUnsafeArray(int rowId) { + throw new UnsupportedOperationException(); + } + @Override public int getArrayLength(int rowId) { return Platform.getInt(null, lengthData + 4 * rowId); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 94ed32294cfa..530203380639 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -21,6 +21,7 @@ import java.util.Arrays; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; @@ -48,15 +49,25 @@ public final class OnHeapColumnVector extends ColumnVector { private double[] doubleData; // Only set if type is Array. + /** + * When `useUnsafeArrayData` is true, data[] keeps UnsafeArray.baseObject for all rows + * a pair of offset & lengths for each row are stored into arrayOffsets[] & arrayLength[] + */ + private byte[] data; + private int dataOffset; private int[] arrayLengths; private int[] arrayOffsets; - protected OnHeapColumnVector(int capacity, DataType type) { - super(capacity, type, MemoryMode.ON_HEAP); + protected OnHeapColumnVector(int capacity, DataType type, boolean useUnsafeArrayData) { + super(capacity, type, MemoryMode.ON_HEAP, useUnsafeArrayData); reserveInternal(capacity); reset(); } + protected OnHeapColumnVector(int capacity, DataType type) { + this(capacity, type, false); + } + @Override public long valuesNativeAddress() { throw new RuntimeException("Cannot get native address for on heap column"); @@ -385,6 +396,39 @@ public void putArray(int rowId, int offset, int length) { arrayLengths[rowId] = length; } + @Override + public void putArray(int rowId, Object src, int offset, int length) { + if (arrayLengths.length < rowId) { + int newCapacity = Math.min(MAX_CAPACITY, arrayLengths.length * 2); + int[] newLengths = new int[newCapacity]; + int[] newOffsets = new int[newCapacity]; + if (arrayLengths != null) { + System.arraycopy(arrayLengths, 0, newLengths, 0, arrayLengths.length); + System.arraycopy(arrayOffsets, 0, newOffsets, 0, arrayLengths.length); + arrayLengths = newLengths; + arrayOffsets = newOffsets; + } + } + putArray(rowId, dataOffset, length); + if (data.length < dataOffset + length) { + int newCapacity = (int) Math.min(MAX_CAPACITY, (dataOffset + length) * 2L); + byte[] newData = new byte[newCapacity]; + System.arraycopy(data, 0, newData,0, dataOffset); + data = newData; + } + Platform.copyMemory(src, offset, data, Platform.BOOLEAN_ARRAY_OFFSET + dataOffset, length); + dataOffset += length; + } + + @Override + public UnsafeArrayData getUnsafeArray(int rowId) { + int offset = getArrayOffset(rowId); + int length = getArrayLength(rowId); + UnsafeArrayData array = new UnsafeArrayData(); + array.pointTo(data, Platform.BYTE_ARRAY_OFFSET + offset, length); + return array; + } + @Override public void loadBytes(ColumnVector.Array array) { array.byteArray = byteData; @@ -406,7 +450,7 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) { // Spilt this function out since it is the slow path. @Override protected void reserveInternal(int newCapacity) { - if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { + if (isArray() || DecimalType.isByteArrayDecimalType(type)) { int[] newLengths = new int[newCapacity]; int[] newOffsets = new int[newCapacity]; if (this.arrayLengths != null) { @@ -415,6 +459,15 @@ protected void reserveInternal(int newCapacity) { } arrayLengths = newLengths; arrayOffsets = newOffsets; + if (useUnsafeArrayData) { + DataType et = ((ArrayType)type).elementType(); + if (data == null || data.length < newCapacity) { + int length = newCapacity * et.defaultSize() + UnsafeArrayData.calculateHeaderPortionInBytes(newCapacity); + byte[] newData = new byte[length]; + if (data != null) System.arraycopy(data, 0, newData, 0, dataOffset); + data = newData; + } + } } else if (type instanceof BooleanType) { if (byteData == null || byteData.length < newCapacity) { byte[] newData = new byte[newCapacity]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index e48e3f640290..2dddfbbd55dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -24,11 +24,11 @@ import java.nio.ByteOrder import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random - import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform import org.apache.spark.unsafe.types.CalendarInterval @@ -672,26 +672,30 @@ class ColumnarBatchSuite extends SparkFunSuite { column.putArray(2, 2, 0) column.putArray(3, 3, 3) - val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] - val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] - val a3 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(2)).asInstanceOf[Array[Int]] - val a4 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(3)).asInstanceOf[Array[Int]] + val a1 = ColumnVectorUtils.toPrimitiveJavaArray( + column.getArray(0).asInstanceOf[ColumnVector.Array]).asInstanceOf[Array[Int]] + val a2 = ColumnVectorUtils.toPrimitiveJavaArray( + column.getArray(1).asInstanceOf[ColumnVector.Array]).asInstanceOf[Array[Int]] + val a3 = ColumnVectorUtils.toPrimitiveJavaArray( + column.getArray(2).asInstanceOf[ColumnVector.Array]).asInstanceOf[Array[Int]] + val a4 = ColumnVectorUtils.toPrimitiveJavaArray( + column.getArray(3).asInstanceOf[ColumnVector.Array]).asInstanceOf[Array[Int]] assert(a1 === Array(0)) assert(a2 === Array(1, 2)) assert(a3 === Array.empty[Int]) assert(a4 === Array(3, 4, 5)) // Verify the ArrayData APIs - assert(column.getArray(0).length == 1) + assert(column.getArray(0).numElements() == 1) assert(column.getArray(0).getInt(0) == 0) - assert(column.getArray(1).length == 2) + assert(column.getArray(1).numElements() == 2) assert(column.getArray(1).getInt(0) == 1) assert(column.getArray(1).getInt(1) == 2) - assert(column.getArray(2).length == 0) + assert(column.getArray(2).numElements() == 0) - assert(column.getArray(3).length == 3) + assert(column.getArray(3).numElements() == 3) assert(column.getArray(3).getInt(0) == 3) assert(column.getArray(3).getInt(1) == 4) assert(column.getArray(3).getInt(2) == 5) @@ -704,8 +708,55 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(data.capacity == array.length * 2) data.putInts(0, array.length, array, 0) column.putArray(0, 0, array.length) - assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] - === array) + assert(ColumnVectorUtils.toPrimitiveJavaArray( + column.getArray(0).asInstanceOf[ColumnVector.Array]).asInstanceOf[Array[Int]] === array) + }} + } + + test("Int UnsafeArray") { + (MemoryMode.ON_HEAP :: Nil).foreach { memMode => { + val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode, true) + + // Populate it with arrays [0], [1, 2], [], [3, 4, 5] + val len1 = column.putArray(0, UnsafeArrayData.fromPrimitiveArray(Array(0))) + val len2 = column.putArray(1, UnsafeArrayData.fromPrimitiveArray(Array(1, 2))) + val len3 = column.putArray(2, UnsafeArrayData.fromPrimitiveArray(Array.empty[Int])) + val len4 = column.putArray(3, UnsafeArrayData.fromPrimitiveArray(Array(3, 4, 5))) + // since UnsafeArrayData.fromPrimitiveArray allocates long[], size should be ceiled by 8 + assert(len1 == ((UnsafeArrayData.calculateHeaderPortionInBytes(1) + 1 * 4 + 7) / 8) * 8) + assert(len2 == ((UnsafeArrayData.calculateHeaderPortionInBytes(2) + 2 * 4 + 7) / 8) * 8) + assert(len3 == ((UnsafeArrayData.calculateHeaderPortionInBytes(0) + 0 * 4 + 7) / 8) * 8) + assert(len4 == ((UnsafeArrayData.calculateHeaderPortionInBytes(3) + 3 * 4 + 7) / 8) * 8) + + val a1 = column.getArray(0).asInstanceOf[UnsafeArrayData].toIntArray + val a2 = column.getArray(1).asInstanceOf[UnsafeArrayData].toIntArray + val a3 = column.getArray(2).asInstanceOf[UnsafeArrayData].toIntArray + val a4 = column.getArray(3).asInstanceOf[UnsafeArrayData].toIntArray + assert(a1 === Array(0)) + assert(a2 === Array(1, 2)) + assert(a3 === Array.empty[Int]) + assert(a4 === Array(3, 4, 5)) + + // Verify the ArrayData APIs + assert(column.getArray(0).numElements() == 1) + assert(column.getArray(0).getInt(0) == 0) + + assert(column.getArray(1).numElements() == 2) + assert(column.getArray(1).getInt(0) == 1) + assert(column.getArray(1).getInt(1) == 2) + + assert(column.getArray(2).numElements() == 0) + + assert(column.getArray(3).numElements() == 3) + assert(column.getArray(3).getInt(0) == 3) + assert(column.getArray(3).getInt(1) == 4) + assert(column.getArray(3).getInt(2) == 5) + + // Add a longer array which requires resizing + column.reset + val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) + column.putArray(0, UnsafeArrayData.fromPrimitiveArray(array)) + assert(column.getArray(0).asInstanceOf[UnsafeArrayData].toIntArray === array) }} } From 3f726ba8ff6d37c5b9b25590b5d76a1376352aa3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 18 May 2017 02:04:04 +0900 Subject: [PATCH 2/4] fix scala style --- .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 2dddfbbd55dd..35a6e91cb3b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -24,6 +24,7 @@ import java.nio.ByteOrder import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.{RandomDataGenerator, Row} From bf6ab2060550671c59e013a3c56accdd698df9ee Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 26 May 2017 04:44:59 +0900 Subject: [PATCH 3/4] use ColumnVector.Array --- .../execution/vectorized/ColumnVector.java | 173 +++++++++++++----- .../execution/vectorized/ColumnarBatch.java | 14 +- .../vectorized/OffHeapColumnVector.java | 28 ++- .../vectorized/OnHeapColumnVector.java | 119 ++++++------ .../vectorized/ColumnarBatchSuite.scala | 14 +- 5 files changed, 229 insertions(+), 119 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 4a27e82f5e71..2c05dc89ed52 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; -import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -66,14 +65,10 @@ public abstract class ColumnVector implements AutoCloseable { * in number of elements, not number of bytes. */ public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) { - return allocate(capacity, type, mode, false); - } - - public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode, boolean useUnsafeArrayData) { if (mode == MemoryMode.OFF_HEAP) { return new OffHeapColumnVector(capacity, type); } else { - return new OnHeapColumnVector(capacity, type, useUnsafeArrayData); + return new OnHeapColumnVector(capacity, type); } } @@ -88,6 +83,15 @@ public static final class Array extends ArrayData { public int length; public int offset; + // reused buffer to return a primitive array + protected boolean[] reuseBooleanArray; + protected byte[] reuseByteArray; + protected short[] reuseShortArray; + protected int[] reuseIntArray; + protected long[] reuseLongArray; + protected float[] reuseFloatArray; + protected double[] reuseDoubleArray; + // Populate if binary data is required for the Array. This is stored here as an optimization // for string data. public byte[] byteArray; @@ -108,6 +112,69 @@ public ArrayData copy() { throw new UnsupportedOperationException(); } + @Override + public boolean[] toBooleanArray() { + if (reuseBooleanArray == null || reuseBooleanArray.length != length) { + reuseBooleanArray = new boolean[length]; + } + data.getBooleanArray(offset, length, reuseBooleanArray); + return reuseBooleanArray; + } + + @Override + public byte[] toByteArray() { + if (reuseByteArray == null || reuseByteArray.length != length) { + reuseByteArray = new byte[length]; + } + data.getByteArray(offset, length, reuseByteArray); + return reuseByteArray; + } + + @Override + public short[] toShortArray() { + if (reuseShortArray == null || reuseShortArray.length != length) { + reuseShortArray = new short[length]; + } + data.getShortArray(offset, length, reuseShortArray); + return reuseShortArray; + } + + @Override + public int[] toIntArray() { + if (reuseIntArray == null || reuseIntArray.length != length) { + reuseIntArray = new int[length]; + } + data.getIntArray(offset, length, reuseIntArray); + return reuseIntArray; + } + + @Override + public long[] toLongArray() { + if (reuseLongArray == null || reuseLongArray.length != length) { + reuseLongArray = new long[length]; + } + data.getLongArray(offset, length, reuseLongArray); + return reuseLongArray; + } + + @Override + public float[] toFloatArray() { + if (reuseFloatArray == null || reuseFloatArray.length != length) { + reuseFloatArray = new float[length]; + } + data.getFloatArray(offset, length, reuseFloatArray); + return reuseFloatArray; + } + + @Override + public double[] toDoubleArray() { + if (reuseDoubleArray == null || reuseDoubleArray.length != length) { + reuseDoubleArray = new double[length]; + } + data.getDoubleArray(offset, length, reuseDoubleArray); + return reuseDoubleArray; + } + // TODO: this is extremely expensive. @Override public Object[] array() { @@ -374,6 +441,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract boolean getBoolean(int rowId); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getBooleanArray(int rowId, int count, boolean[] array); + /** * Sets the value at rowId to `value`. */ @@ -394,6 +466,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract byte getByte(int rowId); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getByteArray(int rowId, int count, byte[] array); + /** * Sets the value at rowId to `value`. */ @@ -409,6 +486,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract void putShorts(int rowId, int count, short[] src, int srcIndex); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getShortArray(int rowId, int count, short[] array); + /** * Returns the value for rowId. */ @@ -440,6 +522,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract int getInt(int rowId); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getIntArray(int rowId, int count, int[] array); + /** * Returns the dictionary Id for rowId. * This should only be called when the ColumnVector is dictionaryIds. @@ -473,6 +560,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract long getLong(int rowId); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getLongArray(int rowId, int count, long[] array); + /** * Sets the value at rowId to `value`. */ @@ -500,6 +592,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract float getFloat(int rowId); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getFloatArray(int rowId, int count, float[] array); + /** * Sets the value at rowId to `value`. */ @@ -527,6 +624,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { */ public abstract double getDouble(int rowId); + /** + * Sets a primitive array for (offset, length) to array. + */ + public abstract void getDoubleArray(int rowId, int count, double[] array); + /** * Puts a byte array that already exists in this column. */ @@ -562,31 +664,29 @@ public ColumnarBatch.Row getStruct(int rowId, int size) { /** * Returns the array at rowid. */ - public abstract UnsafeArrayData getUnsafeArray(int rowId); - - public final ArrayData getArray(int rowId) { - if (!useUnsafeArrayData) { - resultArray.length = getArrayLength(rowId); - resultArray.offset = getArrayOffset(rowId); - return resultArray; - } else { - return getUnsafeArray(rowId); - } + public final Array getArray(int rowId) { + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); + return resultArray; } public final int putArray(int rowId, ArrayData array) { - if (!useUnsafeArrayData) { - throw new UnsupportedOperationException(); - } UnsafeArrayData unsafeArray = (UnsafeArrayData)array; Object baseObjects = unsafeArray.getBaseObject(); - long offset = unsafeArray.getBaseOffset(); int length = unsafeArray.getSizeInBytes(); - if (offset > Integer.MAX_VALUE) { - throw new UnsupportedOperationException( - "Cannot put this array to ColumnVector as it's too big."); + int numElements = unsafeArray.numElements(); + long elementOffset = unsafeArray.getBaseOffset() + UnsafeArrayData.calculateHeaderPortionInBytes(numElements); + childColumns[0].putArray(rowId, baseObjects, (int) elementOffset, elementsAppended, numElements); + putArray(rowId, elementsAppended, numElements); + elementsAppended += numElements; + + if (((ArrayType)type).containsNull()) { + for (int i = 0; i < numElements; i++) { + if (unsafeArray.isNullAt(i)) { + childColumns[0].putNotNull(i); + } + } } - putArray(rowId, baseObjects, (int) offset, length); return length; } @@ -598,7 +698,7 @@ public final int putArray(int rowId, ArrayData array) { /** * Sets the value at rowId to `value`. */ - public abstract void putArray(int rowId, Object value, int offset, int count); + public abstract void putArray(int rowId, Object value, int srcOffset, int dstOffset, int numElements); public abstract int putByteArray(int rowId, byte[] value, int offset, int count); public final int putByteArray(int rowId, byte[] value) { return putByteArray(rowId, value, 0, value.length); @@ -608,9 +708,7 @@ public final int putByteArray(int rowId, byte[] value) { * Returns the value for rowId. */ private Array getByteArray(int rowId) { - resultArray.length = getArrayLength(rowId); - resultArray.offset = getArrayOffset(rowId); - Array array = resultArray; + Array array = getArray(rowId); array.data.loadBytes(array); return array; } @@ -915,9 +1013,7 @@ public final int appendStruct(boolean isNull) { /** * Returns true if this column is an array. */ - public final boolean isArray() { - return (resultArray != null) || (type instanceof ArrayType); - } + public final boolean isArray() { return resultArray != null; } /** * Marks this column as being constant. @@ -935,8 +1031,6 @@ public final boolean isArray() { @VisibleForTesting protected int MAX_CAPACITY = Integer.MAX_VALUE; - protected boolean useUnsafeArrayData; - /** * Data type for this column. */ @@ -965,7 +1059,7 @@ public final boolean isArray() { protected static final int DEFAULT_ARRAY_LENGTH = 4; /** - * Current write cursor (row index) when appending data. + * Current write cursor (row index) when appending or putting data. */ protected int elementsAppended; @@ -1034,19 +1128,10 @@ public ColumnVector getDictionaryIds() { * type. */ protected ColumnVector(int capacity, DataType type, MemoryMode memMode) { - this(capacity, type, memMode, false); - } - - protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean useUnsafeArrayData) { this.capacity = capacity; this.type = type; - this.useUnsafeArrayData = useUnsafeArrayData; - if (useUnsafeArrayData && type instanceof ArrayType) { - this.childColumns = null; - this.resultArray = null; - this.resultStruct = null; - } else if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType + if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType || DecimalType.isByteArrayDecimalType(type)) { DataType childType; int childCapacity = capacity; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java index d074719055b7..a6ce4c2edc23 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java @@ -65,19 +65,15 @@ public final class ColumnarBatch { final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { - return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode, false); + return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); } public static ColumnarBatch allocate(StructType type) { - return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE, false); + return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { - return allocate(schema, memMode, maxRows, false); - } - - public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows, boolean useUnsafeArrayData) { - return new ColumnarBatch(schema, maxRows, memMode, useUnsafeArrayData); + return new ColumnarBatch(schema, maxRows, memMode); } /** @@ -470,7 +466,7 @@ public void filterNullsInColumn(int ordinal) { nullFilteredColumns.add(ordinal); } - private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode, boolean useUnsafeArrayData) { + private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) { this.schema = schema; this.capacity = maxRows; this.columns = new ColumnVector[schema.size()]; @@ -479,7 +475,7 @@ private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode, boolea for (int i = 0; i < schema.fields().length; ++i) { StructField field = schema.fields()[i]; - columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode, useUnsafeArrayData); + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); } this.row = new Row(this); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 3e58e80055ef..02cc415561cc 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -20,7 +20,6 @@ import java.nio.ByteOrder; import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; @@ -135,6 +134,9 @@ public void putBooleans(int rowId, int count, boolean value) { @Override public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; } + @Override + public void getBooleanArray(int offset, int length, boolean[] array) { throw new UnsupportedOperationException(); } + // // APIs dealing with Bytes // @@ -166,6 +168,9 @@ public byte getByte(int rowId) { } } + @Override + public void getByteArray(int offset, int length, byte[] array) { throw new UnsupportedOperationException(); } + // // APIs dealing with shorts // @@ -198,6 +203,9 @@ public short getShort(int rowId) { } } + @Override + public void getShortArray(int offset, int length, short[] array) { throw new UnsupportedOperationException(); } + // // APIs dealing with ints // @@ -256,6 +264,9 @@ public int getDictId(int rowId) { return Platform.getInt(null, data + 4 * rowId); } + @Override + public void getIntArray(int offset, int length, int[] array) { throw new UnsupportedOperationException(); } + // // APIs dealing with Longs // @@ -303,6 +314,9 @@ public long getLong(int rowId) { } } + @Override + public void getLongArray(int offset, int length, long[] array) { throw new UnsupportedOperationException(); } + // // APIs dealing with floats // @@ -349,6 +363,8 @@ public float getFloat(int rowId) { } } + @Override + public void getFloatArray(int offset, int length, float[] array) { throw new UnsupportedOperationException(); } // // APIs dealing with doubles @@ -396,6 +412,9 @@ public double getDouble(int rowId) { } } + @Override + public void getDoubleArray(int offset, int length, double[] array) { throw new UnsupportedOperationException(); } + // // APIs dealing with Arrays. // @@ -407,12 +426,7 @@ public void putArray(int rowId, int offset, int length) { } @Override - public void putArray(int rowId, Object src, int offset, int length) { - throw new UnsupportedOperationException(); - } - - @Override - public UnsafeArrayData getUnsafeArray(int rowId) { + public void putArray(int rowId, Object src, int srcOffset, int dstOffset, int length) { throw new UnsupportedOperationException(); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index 530203380639..4979a18beb71 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -21,7 +21,6 @@ import java.util.Arrays; import org.apache.spark.memory.MemoryMode; -import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; @@ -49,25 +48,15 @@ public final class OnHeapColumnVector extends ColumnVector { private double[] doubleData; // Only set if type is Array. - /** - * When `useUnsafeArrayData` is true, data[] keeps UnsafeArray.baseObject for all rows - * a pair of offset & lengths for each row are stored into arrayOffsets[] & arrayLength[] - */ - private byte[] data; - private int dataOffset; private int[] arrayLengths; private int[] arrayOffsets; - protected OnHeapColumnVector(int capacity, DataType type, boolean useUnsafeArrayData) { - super(capacity, type, MemoryMode.ON_HEAP, useUnsafeArrayData); + protected OnHeapColumnVector(int capacity, DataType type) { + super(capacity, type, MemoryMode.ON_HEAP); reserveInternal(capacity); reset(); } - protected OnHeapColumnVector(int capacity, DataType type) { - this(capacity, type, false); - } - @Override public long valuesNativeAddress() { throw new RuntimeException("Cannot get native address for on heap column"); @@ -141,6 +130,12 @@ public boolean getBoolean(int rowId) { return byteData[rowId] == 1; } + @Override + public void getBooleanArray(int offset, int length, boolean[] array) { + // assume that it is possible to do bulkcopy from byte[] to boolean[] + Platform.copyMemory(byteData, Platform.BYTE_ARRAY_OFFSET + offset, array, Platform.BOOLEAN_ARRAY_OFFSET, length); + } + // // @@ -173,6 +168,11 @@ public byte getByte(int rowId) { } } + @Override + public void getByteArray(int offset, int length, byte[] array) { + Platform.copyMemory(byteData, Platform.BYTE_ARRAY_OFFSET + offset, array, Platform.BYTE_ARRAY_OFFSET, length); + } + // // APIs dealing with Shorts // @@ -203,6 +203,10 @@ public short getShort(int rowId) { } } + @Override + public void getShortArray(int offset, int length, short[] array) { + Platform.copyMemory(shortData, Platform.SHORT_ARRAY_OFFSET + offset * 2, array, Platform.SHORT_ARRAY_OFFSET, length * 2); + } // // APIs dealing with Ints @@ -245,6 +249,12 @@ public int getInt(int rowId) { } } + @Override + public void getIntArray(int offset, int length, int[] array) { + assert(dictionary == null); + Platform.copyMemory(intData, Platform.INT_ARRAY_OFFSET + offset * 4, array, Platform.INT_ARRAY_OFFSET, length * 4); + } + /** * Returns the dictionary Id for rowId. * This should only be called when the ColumnVector is dictionaryIds. @@ -297,6 +307,12 @@ public long getLong(int rowId) { } } + @Override + public void getLongArray(int offset, int length, long[] array) { + assert(dictionary == null); + Platform.copyMemory(longData, Platform.LONG_ARRAY_OFFSET + offset * 8, array, Platform.LONG_ARRAY_OFFSET, length * 8); + } + // // APIs dealing with floats // @@ -336,6 +352,12 @@ public float getFloat(int rowId) { } } + @Override + public void getFloatArray(int offset, int length, float[] array) { + assert(dictionary == null); + Platform.copyMemory(floatData, Platform.FLOAT_ARRAY_OFFSET + offset * 4, array, Platform.FLOAT_ARRAY_OFFSET, length * 4); + } + // // APIs dealing with doubles // @@ -377,6 +399,12 @@ public double getDouble(int rowId) { } } + @Override + public void getDoubleArray(int offset, int length, double[] array) { + assert(dictionary == null); + Platform.copyMemory(doubleData, Platform.DOUBLE_ARRAY_OFFSET + offset * 8, array, Platform.DOUBLE_ARRAY_OFFSET, length * 8); + } + // // APIs dealing with Arrays // @@ -397,36 +425,32 @@ public void putArray(int rowId, int offset, int length) { } @Override - public void putArray(int rowId, Object src, int offset, int length) { - if (arrayLengths.length < rowId) { - int newCapacity = Math.min(MAX_CAPACITY, arrayLengths.length * 2); - int[] newLengths = new int[newCapacity]; - int[] newOffsets = new int[newCapacity]; - if (arrayLengths != null) { - System.arraycopy(arrayLengths, 0, newLengths, 0, arrayLengths.length); - System.arraycopy(arrayOffsets, 0, newOffsets, 0, arrayLengths.length); - arrayLengths = newLengths; - arrayOffsets = newOffsets; - } - } - putArray(rowId, dataOffset, length); - if (data.length < dataOffset + length) { - int newCapacity = (int) Math.min(MAX_CAPACITY, (dataOffset + length) * 2L); - byte[] newData = new byte[newCapacity]; - System.arraycopy(data, 0, newData,0, dataOffset); - data = newData; + public void putArray(int rowId, Object src, int srcOffset, int dstOffset, int numElements) { + DataType et = type; + reserve(dstOffset + numElements); + if (et == DataTypes.BooleanType || et == DataTypes.ByteType) { + Platform.copyMemory( + src, srcOffset, byteData, Platform.BYTE_ARRAY_OFFSET + dstOffset, numElements); + } else if (et == DataTypes.BooleanType || et == DataTypes.ByteType) { + Platform.copyMemory( + src, srcOffset, shortData, Platform.SHORT_ARRAY_OFFSET + dstOffset * 2, numElements * 2); + } else if (et == DataTypes.IntegerType || et == DataTypes.DateType || + DecimalType.is32BitDecimalType(type)) { + Platform.copyMemory( + src, srcOffset, intData, Platform.INT_ARRAY_OFFSET + dstOffset * 4, numElements * 4); + } else if (type instanceof LongType || type instanceof TimestampType || + DecimalType.is64BitDecimalType(type)) { + Platform.copyMemory( + src, srcOffset, longData, Platform.LONG_ARRAY_OFFSET + dstOffset * 8, numElements * 8); + } else if (et == DataTypes.FloatType) { + Platform.copyMemory( + src, srcOffset, floatData, Platform.FLOAT_ARRAY_OFFSET + dstOffset * 4, numElements * 4); + } else if (et == DataTypes.DoubleType) { + Platform.copyMemory( + src, srcOffset, doubleData, Platform.DOUBLE_ARRAY_OFFSET + dstOffset * 8, numElements * 8); + } else { + throw new RuntimeException("Unhandled " + type); } - Platform.copyMemory(src, offset, data, Platform.BOOLEAN_ARRAY_OFFSET + dataOffset, length); - dataOffset += length; - } - - @Override - public UnsafeArrayData getUnsafeArray(int rowId) { - int offset = getArrayOffset(rowId); - int length = getArrayLength(rowId); - UnsafeArrayData array = new UnsafeArrayData(); - array.pointTo(data, Platform.BYTE_ARRAY_OFFSET + offset, length); - return array; } @Override @@ -450,7 +474,7 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) { // Spilt this function out since it is the slow path. @Override protected void reserveInternal(int newCapacity) { - if (isArray() || DecimalType.isByteArrayDecimalType(type)) { + if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { int[] newLengths = new int[newCapacity]; int[] newOffsets = new int[newCapacity]; if (this.arrayLengths != null) { @@ -459,15 +483,6 @@ protected void reserveInternal(int newCapacity) { } arrayLengths = newLengths; arrayOffsets = newOffsets; - if (useUnsafeArrayData) { - DataType et = ((ArrayType)type).elementType(); - if (data == null || data.length < newCapacity) { - int length = newCapacity * et.defaultSize() + UnsafeArrayData.calculateHeaderPortionInBytes(newCapacity); - byte[] newData = new byte[length]; - if (data != null) System.arraycopy(data, 0, newData, 0, dataOffset); - data = newData; - } - } } else if (type instanceof BooleanType) { if (byteData == null || byteData.length < newCapacity) { byte[] newData = new byte[newCapacity]; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 35a6e91cb3b7..63ee66fe16f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -24,11 +24,11 @@ import java.nio.ByteOrder import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random - import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -716,7 +716,7 @@ class ColumnarBatchSuite extends SparkFunSuite { test("Int UnsafeArray") { (MemoryMode.ON_HEAP :: Nil).foreach { memMode => { - val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode, true) + val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) // Populate it with arrays [0], [1, 2], [], [3, 4, 5] val len1 = column.putArray(0, UnsafeArrayData.fromPrimitiveArray(Array(0))) @@ -729,10 +729,10 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(len3 == ((UnsafeArrayData.calculateHeaderPortionInBytes(0) + 0 * 4 + 7) / 8) * 8) assert(len4 == ((UnsafeArrayData.calculateHeaderPortionInBytes(3) + 3 * 4 + 7) / 8) * 8) - val a1 = column.getArray(0).asInstanceOf[UnsafeArrayData].toIntArray - val a2 = column.getArray(1).asInstanceOf[UnsafeArrayData].toIntArray - val a3 = column.getArray(2).asInstanceOf[UnsafeArrayData].toIntArray - val a4 = column.getArray(3).asInstanceOf[UnsafeArrayData].toIntArray + val a1 = column.getArray(0).toIntArray + val a2 = column.getArray(1).toIntArray + val a3 = column.getArray(2).toIntArray + val a4 = column.getArray(3).toIntArray assert(a1 === Array(0)) assert(a2 === Array(1, 2)) assert(a3 === Array.empty[Int]) @@ -757,7 +757,7 @@ class ColumnarBatchSuite extends SparkFunSuite { column.reset val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) column.putArray(0, UnsafeArrayData.fromPrimitiveArray(array)) - assert(column.getArray(0).asInstanceOf[UnsafeArrayData].toIntArray === array) + assert(column.getArray(0).toIntArray === array) }} } From 9954d6b20d3ed5743100d342b8b8102f60f8f9f2 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 26 May 2017 04:53:25 +0900 Subject: [PATCH 4/4] fix scala style error --- .../spark/sql/execution/vectorized/ColumnarBatchSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 63ee66fe16f8..963352920777 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -24,11 +24,11 @@ import java.nio.ByteOrder import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random + import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform