Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.vectorized;

import java.math.BigDecimal;
Expand Down Expand Up @@ -518,19 +519,13 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
public abstract double getDouble(int rowId);

/**
* Puts a byte array that already exists in this column.
*/
public abstract void putArray(int rowId, int offset, int length);

/**
* Returns the length of the array at rowid.
* After writing array elements to the child column vector, call this method to set the offset and
* size of the written array.
*/
public abstract int getArrayLength(int rowId);

/**
* Returns the offset of the array at rowid.
*/
public abstract int getArrayOffset(int rowId);
public void putArrayOffsetAndSize(int rowId, int offset, int size) {
long offsetAndSize = (((long) offset) << 32) | size;
putLong(rowId, offsetAndSize);
}

/**
* Returns a utility object to get structs.
Expand All @@ -553,8 +548,9 @@ public ColumnarBatch.Row getStruct(int rowId, int size) {
* Returns the array at rowid.
*/
public final Array getArray(int rowId) {
resultArray.length = getArrayLength(rowId);
resultArray.offset = getArrayOffset(rowId);
long offsetAndSize = getLong(rowId);
resultArray.offset = (int) (offsetAndSize >> 32);
resultArray.length = (int) offsetAndSize;
return resultArray;
}

Expand All @@ -566,7 +562,12 @@ public final Array getArray(int rowId) {
/**
* Sets the value at rowId to `value`.
*/
public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
public int putByteArray(int rowId, byte[] value, int offset, int length) {
int result = arrayData().appendBytes(length, value, offset);
putArrayOffsetAndSize(rowId, result, length);
return result;
}

public final int putByteArray(int rowId, byte[] value) {
return putByteArray(rowId, value, 0, value.length);
}
Expand Down Expand Up @@ -829,13 +830,13 @@ public final int appendDoubles(int length, double[] src, int offset) {
public final int appendByteArray(byte[] value, int offset, int length) {
int copiedOffset = arrayData().appendBytes(length, value, offset);
reserve(elementsAppended + 1);
putArray(elementsAppended, copiedOffset, length);
putArrayOffsetAndSize(elementsAppended, copiedOffset, length);
return elementsAppended++;
}

public final int appendArray(int length) {
reserve(elementsAppended + 1);
putArray(elementsAppended, arrayData().elementsAppended, length);
putArrayOffsetAndSize(elementsAppended, arrayData().elementsAppended, length);
return elementsAppended++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,15 @@ public final class OffHeapColumnVector extends ColumnVector {
// The data stored in these two allocations need to maintain binary compatible. We can
// directly pass this buffer to external components.
private long nulls;
// The actually data of this column vector will be stored here. If it's an array column vector,
// we will store the offsets and lengths here, and store the element data in child column vector.
private long data;

// Set iff the type is array.
private long lengthData;
private long offsetData;

protected OffHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.OFF_HEAP);

nulls = 0;
data = 0;
lengthData = 0;
offsetData = 0;

reserveInternal(capacity);
reset();
Expand All @@ -66,12 +62,8 @@ public long nullsNativeAddress() {
public void close() {
Platform.freeMemory(nulls);
Platform.freeMemory(data);
Platform.freeMemory(lengthData);
Platform.freeMemory(offsetData);
nulls = 0;
data = 0;
lengthData = 0;
offsetData = 0;
}

//
Expand Down Expand Up @@ -395,35 +387,6 @@ public double getDouble(int rowId) {
}
}

//
// APIs dealing with Arrays.
//
@Override
public void putArray(int rowId, int offset, int length) {
assert(offset >= 0 && offset + length <= childColumns[0].capacity);
Platform.putInt(null, lengthData + 4 * rowId, length);
Platform.putInt(null, offsetData + 4 * rowId, offset);
}

@Override
public int getArrayLength(int rowId) {
return Platform.getInt(null, lengthData + 4 * rowId);
}

@Override
public int getArrayOffset(int rowId) {
return Platform.getInt(null, offsetData + 4 * rowId);
}

// APIs dealing with ByteArrays
@Override
public int putByteArray(int rowId, byte[] value, int offset, int length) {
int result = arrayData().appendBytes(length, value, offset);
Platform.putInt(null, lengthData + 4 * rowId, length);
Platform.putInt(null, offsetData + 4 * rowId, result);
return result;
}

@Override
public void loadBytes(ColumnVector.Array array) {
if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
Expand All @@ -438,10 +401,8 @@ public void loadBytes(ColumnVector.Array array) {
protected void reserveInternal(int newCapacity) {
int oldCapacity = (this.data == 0L) ? 0 : capacity;
if (this.resultArray != null) {
this.lengthData =
Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4);
this.offsetData =
Platform.reallocateMemory(offsetData, oldCapacity * 4, newCapacity * 4);
// need a long as offset and length for each array.
this.data = Platform.reallocateMemory(data, oldCapacity * 8, newCapacity * 8);
} else if (type instanceof ByteType || type instanceof BooleanType) {
this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
} else if (type instanceof ShortType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,12 @@ public final class OnHeapColumnVector extends ColumnVector {
private byte[] byteData;
private short[] shortData;
private int[] intData;
// This is not only used to store data for long column vector, but also can store offsets and
// lengths for array column vector.
private long[] longData;
private float[] floatData;
private double[] doubleData;

// Only set if type is Array.
private int[] arrayLengths;
private int[] arrayOffsets;

protected OnHeapColumnVector(int capacity, DataType type) {
super(capacity, type, MemoryMode.ON_HEAP);
reserveInternal(capacity);
Expand Down Expand Up @@ -366,55 +364,22 @@ public double getDouble(int rowId) {
}
}

//
// APIs dealing with Arrays
//

@Override
public int getArrayLength(int rowId) {
return arrayLengths[rowId];
}
@Override
public int getArrayOffset(int rowId) {
return arrayOffsets[rowId];
}

@Override
public void putArray(int rowId, int offset, int length) {
arrayOffsets[rowId] = offset;
arrayLengths[rowId] = length;
}

@Override
public void loadBytes(ColumnVector.Array array) {
array.byteArray = byteData;
array.byteArrayOffset = array.offset;
}

//
// APIs dealing with Byte Arrays
//

@Override
public int putByteArray(int rowId, byte[] value, int offset, int length) {
int result = arrayData().appendBytes(length, value, offset);
arrayOffsets[rowId] = result;
arrayLengths[rowId] = length;
return result;
}

// Spilt this function out since it is the slow path.
@Override
protected void reserveInternal(int newCapacity) {
if (this.resultArray != null || DecimalType.isByteArrayDecimalType(type)) {
int[] newLengths = new int[newCapacity];
int[] newOffsets = new int[newCapacity];
if (this.arrayLengths != null) {
System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
// need 1 long as offset and length for each array.
if (longData == null || longData.length < newCapacity) {
long[] newData = new long[newCapacity];
if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
longData = newData;
}
arrayLengths = newLengths;
arrayOffsets = newOffsets;
} else if (type instanceof BooleanType) {
if (byteData == null || byteData.length < newCapacity) {
byte[] newData = new byte[newCapacity];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17)

// Put the same "ll" at offset. This should not allocate more memory in the column.
column.putArray(idx, offset, 2)
column.putArrayOffsetAndSize(idx, offset, 2)
reference += "ll"
idx += 1
assert(column.arrayData().elementsAppended == 17)
Expand All @@ -644,7 +644,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
assert(column.arrayData().elementsAppended == 17 + (s + s).length)

reference.zipWithIndex.foreach { v =>
assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
val offsetAndLength = column.getLong(v._2)
assert(v._1.length == offsetAndLength.toInt, "MemoryMode=" + memMode)
assert(v._1 == column.getUTF8String(v._2).toString,
"MemoryMode" + memMode)
}
Expand All @@ -659,18 +660,18 @@ class ColumnarBatchSuite extends SparkFunSuite {
val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)

// Fill the underlying data with all the arrays back to back.
val data = column.arrayData();
val data = column.arrayData()
var i = 0
while (i < 6) {
data.putInt(i, i)
i += 1
}

// Populate it with arrays [0], [1, 2], [], [3, 4, 5]
column.putArray(0, 0, 1)
column.putArray(1, 1, 2)
column.putArray(2, 2, 0)
column.putArray(3, 3, 3)
column.putArrayOffsetAndSize(0, 0, 1)
column.putArrayOffsetAndSize(1, 1, 2)
column.putArrayOffsetAndSize(2, 3, 0)
column.putArrayOffsetAndSize(3, 3, 3)

val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
Expand Down Expand Up @@ -703,7 +704,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
data.reserve(array.length)
assert(data.capacity == array.length * 2)
data.putInts(0, array.length, array, 0)
column.putArray(0, 0, array.length)
column.putArrayOffsetAndSize(0, 0, array.length)
assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
=== array)
}}
Expand Down