Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
import org.apache.spark.sql.catalyst.util.ArrayData;
import org.apache.spark.sql.catalyst.util.MapData;
import org.apache.spark.sql.internal.SQLConf;
Expand Down Expand Up @@ -82,6 +83,15 @@ public static final class Array extends ArrayData {
public int length;
public int offset;

// reused buffer to return a primitive array
protected boolean[] reuseBooleanArray;
protected byte[] reuseByteArray;
protected short[] reuseShortArray;
protected int[] reuseIntArray;
protected long[] reuseLongArray;
protected float[] reuseFloatArray;
protected double[] reuseDoubleArray;

// Populate if binary data is required for the Array. This is stored here as an optimization
// for string data.
public byte[] byteArray;
Expand All @@ -102,6 +112,69 @@ public ArrayData copy() {
throw new UnsupportedOperationException();
}

@Override
public boolean[] toBooleanArray() {
if (reuseBooleanArray == null || reuseBooleanArray.length != length) {
reuseBooleanArray = new boolean[length];
}
data.getBooleanArray(offset, length, reuseBooleanArray);
return reuseBooleanArray;
}

@Override
public byte[] toByteArray() {
if (reuseByteArray == null || reuseByteArray.length != length) {
reuseByteArray = new byte[length];
}
data.getByteArray(offset, length, reuseByteArray);
return reuseByteArray;
}

@Override
public short[] toShortArray() {
if (reuseShortArray == null || reuseShortArray.length != length) {
reuseShortArray = new short[length];
}
data.getShortArray(offset, length, reuseShortArray);
return reuseShortArray;
}

@Override
public int[] toIntArray() {
if (reuseIntArray == null || reuseIntArray.length != length) {
reuseIntArray = new int[length];
}
data.getIntArray(offset, length, reuseIntArray);
return reuseIntArray;
}

@Override
public long[] toLongArray() {
if (reuseLongArray == null || reuseLongArray.length != length) {
reuseLongArray = new long[length];
}
data.getLongArray(offset, length, reuseLongArray);
return reuseLongArray;
}

@Override
public float[] toFloatArray() {
if (reuseFloatArray == null || reuseFloatArray.length != length) {
reuseFloatArray = new float[length];
}
data.getFloatArray(offset, length, reuseFloatArray);
return reuseFloatArray;
}

@Override
public double[] toDoubleArray() {
if (reuseDoubleArray == null || reuseDoubleArray.length != length) {
reuseDoubleArray = new double[length];
}
data.getDoubleArray(offset, length, reuseDoubleArray);
return reuseDoubleArray;
}

// TODO: this is extremely expensive.
@Override
public Object[] array() {
Expand Down Expand Up @@ -368,6 +441,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract boolean getBoolean(int rowId);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getBooleanArray(int rowId, int count, boolean[] array);

/**
* Sets the value at rowId to `value`.
*/
Expand All @@ -388,6 +466,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract byte getByte(int rowId);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getByteArray(int rowId, int count, byte[] array);

/**
* Sets the value at rowId to `value`.
*/
Expand All @@ -403,6 +486,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract void putShorts(int rowId, int count, short[] src, int srcIndex);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getShortArray(int rowId, int count, short[] array);

/**
* Returns the value for rowId.
*/
Expand Down Expand Up @@ -434,6 +522,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract int getInt(int rowId);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getIntArray(int rowId, int count, int[] array);

/**
* Returns the dictionary Id for rowId.
* This should only be called when the ColumnVector is dictionaryIds.
Expand Down Expand Up @@ -467,6 +560,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract long getLong(int rowId);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getLongArray(int rowId, int count, long[] array);

/**
* Sets the value at rowId to `value`.
*/
Expand Down Expand Up @@ -494,6 +592,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract float getFloat(int rowId);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getFloatArray(int rowId, int count, float[] array);

/**
* Sets the value at rowId to `value`.
*/
Expand Down Expand Up @@ -521,6 +624,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
*/
public abstract double getDouble(int rowId);

/**
* Sets a primitive array for (offset, length) to array.
*/
public abstract void getDoubleArray(int rowId, int count, double[] array);

/**
* Puts a byte array that already exists in this column.
*/
Expand Down Expand Up @@ -562,6 +670,26 @@ public final Array getArray(int rowId) {
return resultArray;
}

public final int putArray(int rowId, ArrayData array) {
UnsafeArrayData unsafeArray = (UnsafeArrayData)array;
Object baseObjects = unsafeArray.getBaseObject();
int length = unsafeArray.getSizeInBytes();
int numElements = unsafeArray.numElements();
long elementOffset = unsafeArray.getBaseOffset() + UnsafeArrayData.calculateHeaderPortionInBytes(numElements);
childColumns[0].putArray(rowId, baseObjects, (int) elementOffset, elementsAppended, numElements);
putArray(rowId, elementsAppended, numElements);
elementsAppended += numElements;

if (((ArrayType)type).containsNull()) {
for (int i = 0; i < numElements; i++) {
if (unsafeArray.isNullAt(i)) {
childColumns[0].putNotNull(i);
}
}
}
return length;
}

/**
* Loads the data into array.byteArray.
*/
Expand All @@ -570,6 +698,7 @@ public final Array getArray(int rowId) {
/**
* Sets the value at rowId to `value`.
*/
public abstract void putArray(int rowId, Object value, int srcOffset, int dstOffset, int numElements);
public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
public final int putByteArray(int rowId, byte[] value) {
return putByteArray(rowId, value, 0, value.length);
Expand Down Expand Up @@ -930,7 +1059,7 @@ public final int appendStruct(boolean isNull) {
protected static final int DEFAULT_ARRAY_LENGTH = 4;

/**
* Current write cursor (row index) when appending data.
* Current write cursor (row index) when appending or putting data.
*/
protected int elementsAppended;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ public void putBooleans(int rowId, int count, boolean value) {
@Override
public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }

@Override
public void getBooleanArray(int offset, int length, boolean[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with Bytes
//
Expand Down Expand Up @@ -165,6 +168,9 @@ public byte getByte(int rowId) {
}
}

@Override
public void getByteArray(int offset, int length, byte[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with shorts
//
Expand Down Expand Up @@ -197,6 +203,9 @@ public short getShort(int rowId) {
}
}

@Override
public void getShortArray(int offset, int length, short[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with ints
//
Expand Down Expand Up @@ -255,6 +264,9 @@ public int getDictId(int rowId) {
return Platform.getInt(null, data + 4 * rowId);
}

@Override
public void getIntArray(int offset, int length, int[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with Longs
//
Expand Down Expand Up @@ -302,6 +314,9 @@ public long getLong(int rowId) {
}
}

@Override
public void getLongArray(int offset, int length, long[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with floats
//
Expand Down Expand Up @@ -348,6 +363,8 @@ public float getFloat(int rowId) {
}
}

@Override
public void getFloatArray(int offset, int length, float[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with doubles
Expand Down Expand Up @@ -395,6 +412,9 @@ public double getDouble(int rowId) {
}
}

@Override
public void getDoubleArray(int offset, int length, double[] array) { throw new UnsupportedOperationException(); }

//
// APIs dealing with Arrays.
//
Expand All @@ -405,6 +425,11 @@ public void putArray(int rowId, int offset, int length) {
Platform.putInt(null, offsetData + 4 * rowId, offset);
}

@Override
public void putArray(int rowId, Object src, int srcOffset, int dstOffset, int length) {
throw new UnsupportedOperationException();
}

@Override
public int getArrayLength(int rowId) {
return Platform.getInt(null, lengthData + 4 * rowId);
Expand Down
Loading