diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java index 3a10e9830f581..5c502c9d91be4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java @@ -240,11 +240,6 @@ public int getArrayOffset(int rowId) { return accessor.getArrayOffset(rowId); } - @Override - public void loadBytes(ColumnarArray array) { - throw new UnsupportedOperationException(); - } - // // APIs dealing with Decimals // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java index 360ed83e2af2a..940457f2e3363 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java @@ -180,11 +180,6 @@ public final ColumnarArray getArray(int rowId) { return resultArray; } - /** - * Loads the data into array.byteArray. - */ - public abstract void loadBytes(ColumnarArray array); - /** * Returns the value for rowId. */ @@ -198,7 +193,8 @@ public MapData getMap(int ordinal) { public abstract Decimal getDecimal(int rowId, int precision, int scale); /** - * Returns the UTF8String for rowId. + * Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of + * this column vector, please copy it if you want to keep it after this column vector is freed. */ public abstract UTF8String getUTF8String(int rowId); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java index 34bde3e14d378..b9da641fc66c8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarArray.java @@ -33,14 +33,6 @@ public final class ColumnarArray extends ArrayData { public int length; public int offset; - // Populate if binary data is required for the Array. This is stored here as an optimization - // for string data. - public byte[] byteArray; - public int byteArrayOffset; - - // Reused staging buffer, used for loading from offheap. - protected byte[] tmpByteArray = new byte[1]; - ColumnarArray(ColumnVector data) { this.data = data; } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 6b5c783d4fa87..1cbaf08569334 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -23,6 +23,7 @@ import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; /** * Column data backed using offheap memory. @@ -75,16 +76,14 @@ public OffHeapColumnVector(int capacity, DataType type) { reset(); } + /** + * Returns the off heap pointer for the values buffer. + */ @VisibleForTesting public long valuesNativeAddress() { return data; } - @VisibleForTesting - public long nullsNativeAddress() { - return nulls; - } - @Override public void close() { super.close(); @@ -207,6 +206,11 @@ public byte[] getBytes(int rowId, int count) { return array; } + @Override + protected UTF8String getBytesAsUTF8String(int rowId, int count) { + return UTF8String.fromAddress(null, data + rowId, count); + } + // // APIs dealing with shorts // @@ -524,15 +528,6 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) { return result; } - @Override - public void loadBytes(ColumnarArray array) { - if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length]; - Platform.copyMemory( - null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length); - array.byteArray = array.tmpByteArray; - array.byteArrayOffset = 0; - } - // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java index a7b103a62b17a..85d72295ab9b8 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java @@ -22,6 +22,7 @@ import org.apache.spark.sql.types.*; import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.types.UTF8String; /** * A column backed by an in memory JVM array. This stores the NULLs as a byte per value @@ -203,6 +204,11 @@ public byte[] getBytes(int rowId, int count) { return array; } + @Override + protected UTF8String getBytesAsUTF8String(int rowId, int count) { + return UTF8String.fromBytes(byteData, rowId, count); + } + // // APIs dealing with Shorts // @@ -484,12 +490,6 @@ public void putArray(int rowId, int offset, int length) { arrayLengths[rowId] = length; } - @Override - public void loadBytes(ColumnarArray array) { - array.byteArray = byteData; - array.byteArrayOffset = array.offset; - } - // // APIs dealing with Byte Arrays // diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java index 96cfeed34f300..e7653f0c00b9a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java @@ -280,18 +280,6 @@ public final int putByteArray(int rowId, byte[] value) { return putByteArray(rowId, value, 0, value.length); } - /** - * Returns the value for rowId. - */ - private ColumnarArray getByteArray(int rowId) { - ColumnarArray array = getArray(rowId); - array.data.loadBytes(array); - return array; - } - - /** - * Returns the decimal for rowId. - */ @Override public Decimal getDecimal(int rowId, int precision, int scale) { if (precision <= Decimal.MAX_INT_DIGITS()) { @@ -318,14 +306,10 @@ public void putDecimal(int rowId, Decimal value, int precision) { } } - /** - * Returns the UTF8String for rowId. - */ @Override public UTF8String getUTF8String(int rowId) { if (dictionary == null) { - ColumnarArray a = getByteArray(rowId); - return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length); + return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId)); } else { byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); return UTF8String.fromBytes(bytes); @@ -333,15 +317,16 @@ public UTF8String getUTF8String(int rowId) { } /** - * Returns the byte array for rowId. + * Gets the values of bytes from [rowId, rowId + count), as a UTF8String. + * This method is similar to {@link ColumnVector#getBytes(int, int)}, but can save data copy as + * UTF8String is used as a pointer. */ + protected abstract UTF8String getBytesAsUTF8String(int rowId, int count); + @Override public byte[] getBinary(int rowId) { if (dictionary == null) { - ColumnarArray array = getByteArray(rowId); - byte[] bytes = new byte[array.length]; - System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length); - return bytes; + return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId)); } else { return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId)); }