From de17650e7f9e9d4784acfd7a192f510d43f84247 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 14 Oct 2016 08:53:57 +0900 Subject: [PATCH 1/2] add OnHeapUnsafeColumnVector --- .../org/apache/spark/memory/MemoryMode.java | 3 +- .../execution/vectorized/ColumnVector.java | 165 +++++- .../vectorized/ColumnVectorUtils.java | 30 +- .../vectorized/OnHeapUnsafeColumnVector.java | 520 ++++++++++++++++++ .../apache/spark/sql/internal/SQLConf.scala | 10 +- .../vectorized/ColumnarBatchSuite.scala | 173 +++--- 6 files changed, 800 insertions(+), 101 deletions(-) create mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapUnsafeColumnVector.java diff --git a/core/src/main/java/org/apache/spark/memory/MemoryMode.java b/core/src/main/java/org/apache/spark/memory/MemoryMode.java index 3a5e72d8aaec..ca26a77ee9f6 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryMode.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryMode.java @@ -22,5 +22,6 @@ @Private public enum MemoryMode { ON_HEAP, - OFF_HEAP + OFF_HEAP, + ON_HEAP_UNSAFE } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 354c878aca00..954b08a29b89 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -25,10 +25,14 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; +import org.apache.spark.sql.catalyst.expressions.UnsafeMapData; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.Platform; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -58,6 +62,8 @@ * ColumnVectors are intended to be reused. */ public abstract class ColumnVector implements AutoCloseable { + ColumnVector() { } + /** * Allocates a column to store elements of `type` on or off heap. * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is @@ -66,6 +72,8 @@ public abstract class ColumnVector implements AutoCloseable { public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) { if (mode == MemoryMode.OFF_HEAP) { return new OffHeapColumnVector(capacity, type); + } else if (mode == MemoryMode.ON_HEAP_UNSAFE) { + return new OnHeapUnsafeColumnVector(capacity, type); } else { return new OnHeapColumnVector(capacity, type); } @@ -548,18 +556,69 @@ public ColumnarBatch.Row getStruct(int rowId) { * Returns a utility object to get structs. * provided to keep API compatibility with InternalRow for code generation */ - public ColumnarBatch.Row getStruct(int rowId, int size) { - resultStruct.rowId = rowId; - return resultStruct; + public InternalRow getStruct(int rowId, int size) { + if (!unsafeDirectCopy) { + resultStruct.rowId = rowId; + return resultStruct; + } + resultArray.data.loadBytes(resultArray); + int offset = getArrayOffset(rowId); + int length = getArrayLength(rowId); + UnsafeRow map = new UnsafeRow(size); + map.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length); + return map; + } + + public int putStruct(int rowId, InternalRow row) { + if (!unsafeDirectCopy) { + throw new UnsupportedOperationException(); + } + assert (row instanceof UnsafeRow); + UnsafeRow unsafeRow = (UnsafeRow) row; + Object base = unsafeRow.getBaseObject(); + long offset = unsafeRow.getBaseOffset(); + int length = unsafeRow.getSizeInBytes(); + if (offset > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Cannot put this map to ColumnVector as " + + "it's too big."); + } + putUnsafeArray(rowId, base, offset, length); + return length; } /** * Returns the array at rowid. */ - public final Array getArray(int rowId) { - resultArray.length = getArrayLength(rowId); - resultArray.offset = getArrayOffset(rowId); - return resultArray; + public final ArrayData getArray(int rowId) { + if (!unsafeDirectCopy) { + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); + return resultArray; + } else { + resultArray.data.loadBytes(resultArray); // update resultArray.byteData + int offset = getArrayOffset(rowId); + int length = getArrayLength(rowId); + UnsafeArrayData array = new UnsafeArrayData(); + array.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length); + return array; + } + } + + public final int putArray(int rowId, ArrayData array) { + if (!unsafeDirectCopy) { + throw new UnsupportedOperationException(); + } + assert(array instanceof UnsafeArrayData); + UnsafeArrayData unsafeArray = (UnsafeArrayData)array; + Object base = unsafeArray.getBaseObject(); + long offset = unsafeArray.getBaseOffset(); + int length = unsafeArray.getSizeInBytes(); + if (offset > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Cannot put this array to ColumnVector as " + + "it's too big."); + } + putUnsafeArray(rowId, base, offset, length); + return length; } /** @@ -579,7 +638,9 @@ public final int putByteArray(int rowId, byte[] value) { * Returns the value for rowId. */ private Array getByteArray(int rowId) { - Array array = getArray(rowId); + resultArray.length = getArrayLength(rowId); + resultArray.offset = getArrayOffset(rowId); + Array array = resultArray; array.data.loadBytes(array); return array; } @@ -587,8 +648,33 @@ private Array getByteArray(int rowId) { /** * Returns the value for rowId. */ - public MapData getMap(int ordinal) { - throw new UnsupportedOperationException(); + public MapData getMap(int rowId) { + if (!unsafeDirectCopy) { + throw new UnsupportedOperationException(); + } + resultArray.data.loadBytes(resultArray); + int offset = getArrayOffset(rowId); + int length = getArrayLength(rowId); + UnsafeMapData map = new UnsafeMapData(); + map.pointTo(resultArray.byteArray, Platform.BYTE_ARRAY_OFFSET + offset, length); + return map; + } + + public int putMap(int rowId, MapData map) { + if (!unsafeDirectCopy) { + throw new UnsupportedOperationException(); + } + assert(map instanceof UnsafeMapData); + UnsafeMapData unsafeMap = (UnsafeMapData)map; + byte[] value = (byte[])unsafeMap.getBaseObject(); + long offset = unsafeMap.getBaseOffset() - Platform.BYTE_ARRAY_OFFSET; + int length = unsafeMap.getSizeInBytes(); + if (offset > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Cannot put this map to ColumnVector as " + + "it's too big."); + } + putByteArray(rowId, value, (int)offset, length); + return length; } /** @@ -609,14 +695,18 @@ public final Decimal getDecimal(int rowId, int precision, int scale) { } - public final void putDecimal(int rowId, Decimal value, int precision) { + public final int putDecimal(int rowId, Decimal value, int precision) { if (precision <= Decimal.MAX_INT_DIGITS()) { putInt(rowId, (int) value.toUnscaledLong()); + return 4; } else if (precision <= Decimal.MAX_LONG_DIGITS()) { putLong(rowId, value.toUnscaledLong()); + return 8; } else { BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue(); - putByteArray(rowId, bigInteger.toByteArray()); + byte[] array = bigInteger.toByteArray(); + putByteArray(rowId, array); + return array.length; } } @@ -633,6 +723,13 @@ public final UTF8String getUTF8String(int rowId) { } } + public final int putUTF8String(int rowId, UTF8String string) { + assert(dictionary == null); + byte[] array = string.getBytes(); + putByteArray(rowId, array); + return array.length; + } + /** * Returns the byte array for rowId. */ @@ -648,6 +745,22 @@ public final byte[] getBinary(int rowId) { } } + public final int putBinary(int rowId, byte[] bytes) { + putByteArray(rowId, bytes); + return bytes.length; + } + + /** + * APIs dealing with Unsafe Data + */ + public long putUnsafeArray(int rowId, Object base, long offset, int length) { + throw new UnsupportedOperationException(); + } + + public void putUnsafeData(int rowId, int count, Object src, long offset) { + throw new UnsupportedOperationException(); + } + /** * Append APIs. These APIs all behave similarly and will append data to the current vector. It * is not valid to mix the put and append APIs. The append APIs are slower and should only be @@ -858,6 +971,14 @@ public final int appendStruct(boolean isNull) { return elementsAppended; } + public final int appendUnsafeData(int length, Object base, long offset) { + reserve(elementsAppended + length); + int result = elementsAppended; + putUnsafeData(elementsAppended, length, base, offset); + elementsAppended += length; + return result; + } + /** * Returns the data for the underlying array. */ @@ -894,10 +1015,12 @@ public final int appendStruct(boolean isNull) { @VisibleForTesting protected int MAX_CAPACITY = Integer.MAX_VALUE; + protected boolean unsafeDirectCopy; + /** * Data type for this column. */ - protected final DataType type; + protected DataType type; /** * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks. @@ -929,17 +1052,17 @@ public final int appendStruct(boolean isNull) { /** * If this is a nested type (array or struct), the column for the child data. */ - protected final ColumnVector[] childColumns; + protected ColumnVector[] childColumns; /** * Reusable Array holder for getArray(). */ - protected final Array resultArray; + protected Array resultArray; /** * Reusable Struct holder for getStruct(). */ - protected final ColumnarBatch.Row resultStruct; + protected ColumnarBatch.Row resultStruct; /** * The Dictionary for this column. @@ -991,14 +1114,20 @@ public ColumnVector getDictionaryIds() { * type. */ protected ColumnVector(int capacity, DataType type, MemoryMode memMode) { + this(capacity, type, memMode, false); + } + + protected ColumnVector(int capacity, DataType type, MemoryMode memMode, boolean unsafeDirectCopy) { this.capacity = capacity; this.type = type; + this.unsafeDirectCopy = unsafeDirectCopy; if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType - || DecimalType.isByteArrayDecimalType(type)) { + || DecimalType.isByteArrayDecimalType(type) + || unsafeDirectCopy && (type instanceof MapType || type instanceof StructType)) { DataType childType; int childCapacity = capacity; - if (type instanceof ArrayType) { + if (!unsafeDirectCopy && type instanceof ArrayType) { childType = ((ArrayType)type).elementType(); } else { childType = DataTypes.ByteType; diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java index 900d7c431e72..b047ab133a11 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java @@ -26,6 +26,7 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.util.ArrayData; import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.types.CalendarInterval; @@ -98,20 +99,25 @@ public static void populate(ColumnVector col, InternalRow row, int fieldIdx) { * For example, an array of IntegerType will return an int[]. * Throws exceptions for unhandled schemas. */ - public static Object toPrimitiveJavaArray(ColumnVector.Array array) { - DataType dt = array.data.dataType(); - if (dt instanceof IntegerType) { - int[] result = new int[array.length]; - ColumnVector data = array.data; - for (int i = 0; i < result.length; i++) { - if (data.isNullAt(array.offset + i)) { - throw new RuntimeException("Cannot handle NULL values."); + public static Object toPrimitiveJavaArray(ArrayData a) { + if (!(a instanceof ColumnVector.Array)) { + return a.toIntArray(); + } else { + ColumnVector.Array array = (ColumnVector.Array)a; + DataType dt = array.data.dataType(); + if (dt instanceof IntegerType) { + int[] result = new int[array.length]; + ColumnVector data = array.data; + for (int i = 0; i < result.length; i++) { + if (data.isNullAt(array.offset + i)) { + throw new RuntimeException("Cannot handle NULL values."); + } + result[i] = data.getInt(array.offset + i); } - result[i] = data.getInt(array.offset + i); + return result; + } else { + throw new UnsupportedOperationException(); } - return result; - } else { - throw new UnsupportedOperationException(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapUnsafeColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapUnsafeColumnVector.java new file mode 100644 index 000000000000..e9c85f3ec8f4 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapUnsafeColumnVector.java @@ -0,0 +1,520 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.vectorized; + +import java.io.*; +import java.nio.ByteOrder; + +import org.apache.commons.io.IOUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.io.CompressionCodec; +import org.apache.spark.io.CompressionCodec$; +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.*; +import org.apache.spark.unsafe.Platform; + +/** + * A column backed by an in memory JVM array. But, all of data types are stored into byte[]. + * This stores the NULLs as a byte per value and a java array for the values. + */ +public final class OnHeapUnsafeColumnVector extends ColumnVector implements Serializable { + + // The data stored in these arrays need to maintain binary compatible. We can + // directly pass this buffer to external components. + + // This is faster than a boolean array and we optimize this over memory footprint. + private byte[] nulls; + private byte[] compressedNulls; + + // Array for all types + private byte[] data; + private byte[] compressedData; + + // Only set if type is Array. + private int[] arrayLengths; + private int[] arrayOffsets; + + private boolean compressed; + private transient CompressionCodec codec = null; + + OnHeapUnsafeColumnVector() { } + + protected OnHeapUnsafeColumnVector(int capacity, DataType type) { + super(capacity, type, MemoryMode.ON_HEAP_UNSAFE, true); + reserveInternal(capacity); + reset(); + } + + @Override + public long valuesNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + @Override + public long nullsNativeAddress() { + throw new RuntimeException("Cannot get native address for on heap column"); + } + + @Override + public void close() { + } + + public void compress(SparkConf conf) { + if (compressed) return; + if (codec == null) { + String codecName = conf.get(SQLConf.CACHE_COMPRESSION_CODEC()); + codec = CompressionCodec$.MODULE$.createCodec(conf, codecName); + } + ByteArrayOutputStream bos; + OutputStream out; + + if (data != null) { + bos = new ByteArrayOutputStream(); + out = codec.compressedOutputStream(bos); + try { + try { + out.write(data); + } finally { + out.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (bos.size() < data.length) { + compressedData = bos.toByteArray(); + data = null; + } + } + + if (nulls != null) { + bos = new ByteArrayOutputStream(); + out = codec.compressedOutputStream(bos); + try { + try { + out.write(nulls); + } finally { + out.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (bos.size() < nulls.length) { + compressedNulls = bos.toByteArray(); + nulls = null; + } + } + compressed = (compressedData != null) || (compressedNulls != null); + } + + public void decompress(SparkConf conf) { + if (!compressed) return; + if (codec == null) { + String codecName = conf.get(SQLConf.CACHE_COMPRESSION_CODEC()); + codec = CompressionCodec$.MODULE$.createCodec(conf, codecName); + } + ByteArrayInputStream bis; + InputStream in; + + if (compressedData != null) { + bis = new ByteArrayInputStream(compressedData); + in = codec.compressedInputStream(bis); + try { + try { + data = IOUtils.toByteArray(in); + } finally { + in.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + compressedData = null; + } + + if (compressedNulls != null) { + bis = new ByteArrayInputStream(compressedNulls); + in = codec.compressedInputStream(bis); + try { + try { + nulls = IOUtils.toByteArray(in); + } finally { + in.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + compressedNulls = null; + } + compressed = false; + } + + // + // APIs dealing with nulls + // + + @Override + public void putNotNull(int rowId) { + Platform.putByte(nulls, Platform.BYTE_ARRAY_OFFSET + rowId, (byte)0); + } + + @Override + public void putNull(int rowId) { + Platform.putByte(nulls, Platform.BYTE_ARRAY_OFFSET + rowId, (byte)1); + ++numNulls; + anyNullsSet = true; + } + + @Override + public void putNulls(int rowId, int count) { + for (int i = 0; i < count; i++) { + Platform.putByte(nulls, Platform.BYTE_ARRAY_OFFSET + rowId + i, (byte)1); + } + numNulls += count; + anyNullsSet = true; + } + + @Override + public void putNotNulls(int rowId, int count) { + for (int i = 0; i < count; i++) { + Platform.putByte(nulls, Platform.BYTE_ARRAY_OFFSET + rowId + i, (byte)0); + } + } + + @Override + public boolean isNullAt(int rowId) { + return Platform.getByte(nulls, Platform.BYTE_ARRAY_OFFSET + rowId) == 1; + } + + // + // APIs dealing with Booleans + // + + @Override + public void putBoolean(int rowId, boolean value) { + Platform.putBoolean(data, Platform.BYTE_ARRAY_OFFSET + rowId, value); + } + + @Override + public void putBooleans(int rowId, int count, boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int rowId) { + return Platform.getBoolean(data, Platform.BYTE_ARRAY_OFFSET + rowId); + } + + // + + // + // APIs dealing with Bytes + // + + @Override + public void putByte(int rowId, byte value) { + Platform.putByte(data, Platform.BYTE_ARRAY_OFFSET + rowId, value); + } + + @Override + public void putBytes(int rowId, int count, byte value) { + for (int i = 0; i < count; ++i) { + Platform.putByte(data, Platform.BYTE_ARRAY_OFFSET + rowId + i, value); + } + } + + @Override + public void putBytes(int rowId, int count, byte[] src, int srcIndex) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, + data, Platform.BYTE_ARRAY_OFFSET + rowId, count); + } + + @Override + public byte getByte(int rowId) { + return Platform.getByte(data, Platform.BYTE_ARRAY_OFFSET + rowId); + } + + // + // APIs dealing with Shorts + // + + @Override + public void putShort(int rowId, short value) { + Platform.putShort(data, Platform.BYTE_ARRAY_OFFSET + rowId * 2, value); + } + + @Override + public void putShorts(int rowId, int count, short value) { + for (int i = 0; i < count; i++) { + Platform.putShort(data, Platform.BYTE_ARRAY_OFFSET + (rowId + i) * 2, value); + } + } + + @Override + public void putShorts(int rowId, int count, short[] src, int srcIndex) { + Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 2, count * 2); + } + + @Override + public short getShort(int rowId) { + return Platform.getShort(data, Platform.BYTE_ARRAY_OFFSET + rowId * 2); + } + + + // + // APIs dealing with Ints + // + + @Override + public void putInt(int rowId, int value) { + Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET + rowId * 4, value); + } + + @Override + public void putInts(int rowId, int count, int value) { + for (int i = 0; i < count; i++) { + Platform.putInt(data, Platform.BYTE_ARRAY_OFFSET + (rowId + i) * 4, value); + } + } + + @Override + public void putInts(int rowId, int count, int[] src, int srcIndex) { + Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 4, count * 4); + } + + @Override + public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int rowId) { + return Platform.getInt(data, Platform.BYTE_ARRAY_OFFSET + rowId * 4); + } + + /** + * Returns the dictionary Id for rowId. + * This should only be called when the ColumnVector is dictionaryIds. + * We have this separate method for dictionaryIds as per SPARK-16928. + */ + public int getDictId(int rowId) { throw new UnsupportedOperationException(); } + + // + // APIs dealing with Longs + // + + @Override + public void putLong(int rowId, long value) { + Platform.putLong(data, Platform.BYTE_ARRAY_OFFSET + rowId * 8, value); + } + + @Override + public void putLongs(int rowId, int count, long value) { + for (int i = 0; i < count; i++) { + Platform.putLong(data, Platform.BYTE_ARRAY_OFFSET + (rowId + i) * 8, value); + } + } + + @Override + public void putLongs(int rowId, int count, long[] src, int srcIndex) { + Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 8, count * 8); + } + + @Override + public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int rowId) { + return Platform.getLong(data, Platform.BYTE_ARRAY_OFFSET + rowId * 8); + } + + // + // APIs dealing with floats + // + + @Override + public void putFloat(int rowId, float value) { + Platform.putFloat(data, Platform.BYTE_ARRAY_OFFSET + rowId * 4, value); + } + + @Override + public void putFloats(int rowId, int count, float value) { + for (int i = 0; i < count; i++) { + Platform.putFloat(data, Platform.BYTE_ARRAY_OFFSET + (rowId + i) * 8, value); + } + } + + @Override + public void putFloats(int rowId, int count, float[] src, int srcIndex) { + Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 4, count * 4); + } + + @Override + public void putFloats(int rowId, int count, byte[] src, int srcIndex) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex * 4, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 4, count * 4); + } + + @Override + public float getFloat(int rowId) { + return Platform.getFloat(data, Platform.BYTE_ARRAY_OFFSET + rowId * 4); + } + + // + // APIs dealing with doubles + // + + @Override + public void putDouble(int rowId, double value) { + Platform.putDouble(data, Platform.BYTE_ARRAY_OFFSET + rowId * 8, value); + } + + @Override + public void putDoubles(int rowId, int count, double value) { + for (int i = 0; i < count; i++) { + Platform.putDouble(data, Platform.BYTE_ARRAY_OFFSET + (rowId + i) * 8, value); + } + } + + @Override + public void putDoubles(int rowId, int count, double[] src, int srcIndex) { + Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 8, count * 8); + } + + @Override + public void putDoubles(int rowId, int count, byte[] src, int srcIndex) { + Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex * 8, + data, Platform.BYTE_ARRAY_OFFSET + rowId * 8, count * 8); + } + + @Override + public double getDouble(int rowId) { + return Platform.getDouble(data, Platform.BYTE_ARRAY_OFFSET + rowId * 8); + } + + // + // APIs dealing with Arrays + // + + @Override + public int getArrayLength(int rowId) { + return arrayLengths[rowId]; + } + @Override + public int getArrayOffset(int rowId) { + return arrayOffsets[rowId]; + } + + @Override + public void putArray(int rowId, int offset, int length) { + arrayOffsets[rowId] = offset; + arrayLengths[rowId] = length; + } + + @Override + public void loadBytes(ColumnVector.Array array) { + array.byteArray = data; + array.byteArrayOffset = array.offset; + } + + // + // APIs dealing with Byte Arrays + // + + @Override + public int putByteArray(int rowId, byte[] value, int offset, int length) { + int result = arrayData().appendBytes(length, value, offset); + arrayOffsets[rowId] = result; + arrayLengths[rowId] = length; + return result; + } + + // + // APIs dealing with Unsafe Data + // + + @Override + public long putUnsafeArray(int rowId, Object base, long offset, int length) { + int result = arrayData().appendUnsafeData(length, base, offset); + arrayOffsets[rowId] = result; + arrayLengths[rowId] = length; + return result; + } + + @Override + public void putUnsafeData(int rowId, int count, Object src, long offset) { + Platform.copyMemory(src, offset, data, Platform.BYTE_ARRAY_OFFSET + rowId, count); + } + + // Spilt this function out since it is the slow path. + @Override + protected void reserveInternal(int newCapacity) { + int factor = 0; + if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) { + int[] newLengths = new int[newCapacity]; + int[] newOffsets = new int[newCapacity]; + if (this.arrayLengths != null) { + System.arraycopy(this.arrayLengths, 0, newLengths, 0, elementsAppended); + System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, elementsAppended); + } + arrayLengths = newLengths; + arrayOffsets = newOffsets; + factor = -1; + } else if (resultStruct != null || type instanceof NullType) { + // Nothing to store. + factor = -1; + } else if (type instanceof BooleanType) { + factor = 1; + } else if (type instanceof ByteType) { + factor = 1; + } else if (type instanceof ShortType) { + factor = 2; + } else if (type instanceof IntegerType || type instanceof DateType || + DecimalType.is32BitDecimalType(type)) { + factor = 4; + } else if (type instanceof LongType || type instanceof TimestampType || + DecimalType.is64BitDecimalType(type)) { + factor = 8; + } else if (type instanceof FloatType) { + factor = 4; + } else if (type instanceof DoubleType) { + factor = 8; + } + if (factor > 0) { + if (data == null || capacity < newCapacity) { + byte[] newData = new byte[newCapacity * factor]; + if (data != null) + Platform.copyMemory(data, Platform.BYTE_ARRAY_OFFSET, + newData, Platform.BYTE_ARRAY_OFFSET, elementsAppended * factor); + data = newData; + } + } else if (factor == 0) { + throw new RuntimeException("Unhandled " + type); + } + + byte[] newNulls = new byte[newCapacity]; + if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, elementsAppended); + nulls = newNulls; + + capacity = newCapacity; + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dc0f13040693..776f1cc0a5dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -101,7 +101,15 @@ object SQLConf { .booleanConf .createWithDefault(true) - val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin") + val CACHE_COMPRESSION_CODEC = + SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compression.codec") + .internal() + .doc("Sets the compression codec use when columnar caching is compressed.") + .stringConf + .transform(_.toLowerCase()) + .createWithDefault("lz4") + + val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") .booleanConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala index 8184d7d909f4..0f27fc060afd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala @@ -25,9 +25,10 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Random -import org.apache.spark.SparkFunSuite import org.apache.spark.memory.MemoryMode +import org.apache.spark.SparkFunSuite import org.apache.spark.sql.{RandomDataGenerator, Row} +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types._ import org.apache.spark.unsafe.Platform @@ -35,7 +36,8 @@ import org.apache.spark.unsafe.types.CalendarInterval class ColumnarBatchSuite extends SparkFunSuite { test("Null Apis") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val reference = mutable.ArrayBuffer.empty[Boolean] val column = ColumnVector.allocate(1024, IntegerType, memMode) @@ -81,7 +83,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Byte Apis") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val reference = mutable.ArrayBuffer.empty[Byte] val column = ColumnVector.allocate(1024, ByteType, memMode) @@ -183,7 +186,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Int Apis") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Int] @@ -209,16 +213,18 @@ class ColumnarBatchSuite extends SparkFunSuite { littleEndian(4) = 6 littleEndian(6) = 1 - column.putIntsLittleEndian(idx, 1, littleEndian, 4) - column.putIntsLittleEndian(idx + 1, 1, littleEndian, 0) - reference += 6 + (1 << 16) - reference += 7 + (1 << 8) - idx += 2 - - column.putIntsLittleEndian(idx, 2, littleEndian, 0) - reference += 7 + (1 << 8) - reference += 6 + (1 << 16) - idx += 2 + if (memMode == MemoryMode.ON_HEAP || memMode == MemoryMode.OFF_HEAP) { + column.putIntsLittleEndian(idx, 1, littleEndian, 4) + column.putIntsLittleEndian(idx + 1, 1, littleEndian, 0) + reference += 6 + (1 << 16) + reference += 7 + (1 << 8) + idx += 2 + + column.putIntsLittleEndian(idx, 2, littleEndian, 0) + reference += 7 + (1 << 8) + reference += 6 + (1 << 16) + idx += 2 + } while (idx < column.capacity) { val single = random.nextBoolean() @@ -251,7 +257,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Long Apis") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Long] @@ -277,16 +284,18 @@ class ColumnarBatchSuite extends SparkFunSuite { littleEndian(8) = 6 littleEndian(10) = 1 - column.putLongsLittleEndian(idx, 1, littleEndian, 8) - column.putLongsLittleEndian(idx + 1, 1, littleEndian, 0) - reference += 6 + (1 << 16) - reference += 7 + (1 << 8) - idx += 2 - - column.putLongsLittleEndian(idx, 2, littleEndian, 0) - reference += 7 + (1 << 8) - reference += 6 + (1 << 16) - idx += 2 + if (memMode == MemoryMode.ON_HEAP || memMode == MemoryMode.OFF_HEAP) { + column.putLongsLittleEndian(idx, 1, littleEndian, 8) + column.putLongsLittleEndian(idx + 1, 1, littleEndian, 0) + reference += 6 + (1 << 16) + reference += 7 + (1 << 8) + idx += 2 + + column.putLongsLittleEndian(idx, 2, littleEndian, 0) + reference += 7 + (1 << 8) + reference += 6 + (1 << 16) + idx += 2 + } while (idx < column.capacity) { val single = random.nextBoolean() @@ -321,7 +330,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Double APIs") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val seed = System.currentTimeMillis() val random = new Random(seed) val reference = mutable.ArrayBuffer.empty[Double] @@ -345,23 +355,25 @@ class ColumnarBatchSuite extends SparkFunSuite { Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234) Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123) - if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - // Ensure array contains Liitle Endian doubles - var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) - Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0)) - Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8)) - } + if (memMode == MemoryMode.ON_HEAP || memMode == MemoryMode.OFF_HEAP) { + if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { + // Ensure array contains Liitle Endian doubles + var bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN) + Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0)) + Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, bb.getDouble(8)) + } - column.putDoubles(idx, 1, buffer, 8) - column.putDoubles(idx + 1, 1, buffer, 0) - reference += 1.123 - reference += 2.234 - idx += 2 + column.putDoubles(idx, 1, buffer, 8) + column.putDoubles(idx + 1, 1, buffer, 0) + reference += 1.123 + reference += 2.234 + idx += 2 - column.putDoubles(idx, 2, buffer, 0) - reference += 2.234 - reference += 1.123 - idx += 2 + column.putDoubles(idx, 2, buffer, 0) + reference += 2.234 + reference += 1.123 + idx += 2 + } while (idx < column.capacity) { val single = random.nextBoolean() @@ -395,7 +407,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("String APIs") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val reference = mutable.ArrayBuffer.empty[String] val column = ColumnVector.allocate(6, BinaryType, memMode) @@ -447,22 +460,32 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Int Array") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode) // Fill the underlying data with all the arrays back to back. - val data = column.arrayData(); - var i = 0 - while (i < 6) { - data.putInt(i, i) - i += 1 - } + val data = if (memMode == MemoryMode.ON_HEAP || memMode == MemoryMode.OFF_HEAP) { + val data = column.arrayData() + var i = 0 + while (i < 6) { + data.putInt(i, i) + i += 1 + } + column.putArray(0, 0, 1) + column.putArray(1, 1, 2) + column.putArray(2, 2, 0) + column.putArray(3, 3, 3) - // Populate it with arrays [0], [1, 2], [], [3, 4, 5] - column.putArray(0, 0, 1) - column.putArray(1, 1, 2) - column.putArray(2, 2, 0) - column.putArray(3, 3, 3) + data + } else { + // Populate it with arrays [0], [1, 2], [], [3, 4, 5] + column.putArray(0, UnsafeArrayData.fromPrimitiveArray(Array(0))) + column.putArray(1, UnsafeArrayData.fromPrimitiveArray(Array(1, 2))) + column.putArray(2, UnsafeArrayData.fromPrimitiveArray(Array.empty[Int])) + column.putArray(3, UnsafeArrayData.fromPrimitiveArray(Array(3, 4, 5))) + null + } val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]] @@ -474,16 +497,16 @@ class ColumnarBatchSuite extends SparkFunSuite { assert(a4 === Array(3, 4, 5)) // Verify the ArrayData APIs - assert(column.getArray(0).length == 1) + assert(column.getArray(0).numElements() == 1) assert(column.getArray(0).getInt(0) == 0) - assert(column.getArray(1).length == 2) + assert(column.getArray(1).numElements() == 2) assert(column.getArray(1).getInt(0) == 1) assert(column.getArray(1).getInt(1) == 2) - assert(column.getArray(2).length == 0) + assert(column.getArray(2).numElements() == 0) - assert(column.getArray(3).length == 3) + assert(column.getArray(3).numElements() == 3) assert(column.getArray(3).getInt(0) == 3) assert(column.getArray(3).getInt(1) == 4) assert(column.getArray(3).getInt(2) == 5) @@ -491,10 +514,14 @@ class ColumnarBatchSuite extends SparkFunSuite { // Add a longer array which requires resizing column.reset val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12) - assert(data.capacity == 10) - data.reserve(array.length) - assert(data.capacity == array.length * 2) - data.putInts(0, array.length, array, 0) + if (memMode == MemoryMode.ON_HEAP || memMode == MemoryMode.OFF_HEAP) { + assert(data.capacity == 10) + data.reserve(array.length) + assert(data.capacity == array.length * 2) + data.putInts(0, array.length, array, 0) + } else { + column.putArray(0, UnsafeArrayData.fromPrimitiveArray(array)) + } column.putArray(0, 0, array.length) assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]] === array) @@ -502,7 +529,9 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Struct Column") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + // skip test for ON_HEAP_UNSAFE since it is complicated to construct a struct on Unsafe + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil) + .foreach { memMode => { val schema = new StructType().add("int", IntegerType).add("double", DoubleType) val column = ColumnVector.allocate(1024, schema, memMode) @@ -532,7 +561,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("ColumnarBatch basic") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val schema = new StructType() .add("intCol", IntegerType) .add("doubleCol", DoubleType) @@ -723,7 +753,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("Convert rows") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val rows = Row(1, 2L, "a", 1.2, 'b'.toByte) :: Row(4, 5L, "cd", 2.3, 'a'.toByte) :: Nil val schema = new StructType() .add("i1", IntegerType) @@ -774,7 +805,8 @@ class ColumnarBatchSuite extends SparkFunSuite { rows += row j += 1 } - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil) + .foreach { memMode => { val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava) assert(batch.numRows() == NUM_ROWS) @@ -810,7 +842,8 @@ class ColumnarBatchSuite extends SparkFunSuite { val row = if (i < numNulls) Row.fromSeq(Seq(i, null)) else Row.fromSeq(Seq(i, i.toString)) rows += row } - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => { + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => { val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava) batch.filterNullsInColumn(1) batch.setNumRows(NUM_ROWS) @@ -841,7 +874,8 @@ class ColumnarBatchSuite extends SparkFunSuite { val oldRow = RandomDataGenerator.randomRow(random, schema) val newRow = RandomDataGenerator.randomRow(random, schema) - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => val batch = ColumnVectorUtils.toBatch(schema, memMode, (oldRow :: Nil).iterator.asJava) val columnarBatchRow = batch.getRow(0) newRow.toSeq.zipWithIndex.foreach(i => columnarBatchRow.update(i._2, i._1)) @@ -852,7 +886,8 @@ class ColumnarBatchSuite extends SparkFunSuite { } test("exceeding maximum capacity should throw an error") { - (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => + (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: MemoryMode.ON_HEAP_UNSAFE :: Nil) + .foreach { memMode => val column = ColumnVector.allocate(1, ByteType, memMode) column.MAX_CAPACITY = 15 column.appendBytes(5, 0.toByte) From 80a502277bed9388b04ceb9c7f46649f5345632f Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 15 Feb 2017 03:15:48 +0900 Subject: [PATCH 2/2] rebase --- .../org/apache/spark/sql/internal/SQLConf.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 776f1cc0a5dd..e2f07ba2d721 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -102,14 +102,14 @@ object SQLConf { .createWithDefault(true) val CACHE_COMPRESSION_CODEC = - SQLConfigBuilder("spark.sql.inMemoryColumnarStorage.compression.codec") - .internal() - .doc("Sets the compression codec use when columnar caching is compressed.") - .stringConf - .transform(_.toLowerCase()) - .createWithDefault("lz4") - - val PREFER_SORTMERGEJOIN = SQLConfigBuilder("spark.sql.join.preferSortMergeJoin") + buildConf("spark.sql.inMemoryColumnarStorage.compression.codec") + .internal() + .doc("Sets the compression codec use when columnar caching is compressed.") + .stringConf + .transform(_.toLowerCase()) + .createWithDefault("lz4") + + val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin") .internal() .doc("When true, prefer sort merge join over shuffle hash join.") .booleanConf