Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,6 @@ public int getArrayOffset(int rowId) {
return accessor.getArrayOffset(rowId);
}

@Override
public void loadBytes(ColumnarArray array) {
throw new UnsupportedOperationException();
}

//
// APIs dealing with Decimals
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,6 @@ public final ColumnarArray getArray(int rowId) {
return resultArray;
}

/**
* Loads the data into array.byteArray.
*/
public abstract void loadBytes(ColumnarArray array);

/**
* Returns the value for rowId.
*/
Expand All @@ -198,7 +193,8 @@ public MapData getMap(int ordinal) {
public abstract Decimal getDecimal(int rowId, int precision, int scale);

/**
* Returns the UTF8String for rowId.
* Returns the UTF8String for rowId. Note that the returned UTF8String may point to the data of
* this column vector, please copy it if you want to keep it after this column vector is freed.
*/
public abstract UTF8String getUTF8String(int rowId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@ public final class ColumnarArray extends ArrayData {
public int length;
public int offset;

// Populate if binary data is required for the Array. This is stored here as an optimization
// for string data.
public byte[] byteArray;
public int byteArrayOffset;

// Reused staging buffer, used for loading from offheap.
protected byte[] tmpByteArray = new byte[1];

ColumnarArray(ColumnVector data) {
this.data = data;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;

/**
* Column data backed using offheap memory.
Expand Down Expand Up @@ -75,16 +76,14 @@ public OffHeapColumnVector(int capacity, DataType type) {
reset();
}

/**
* Returns the off heap pointer for the values buffer.
*/
@VisibleForTesting
public long valuesNativeAddress() {
return data;
}

@VisibleForTesting
public long nullsNativeAddress() {
return nulls;
}

@Override
public void close() {
super.close();
Expand Down Expand Up @@ -207,6 +206,11 @@ public byte[] getBytes(int rowId, int count) {
return array;
}

@Override
protected UTF8String getBytesAsUTF8String(int rowId, int count) {
return UTF8String.fromAddress(null, data + rowId, count);
}

//
// APIs dealing with shorts
//
Expand Down Expand Up @@ -524,15 +528,6 @@ public int putByteArray(int rowId, byte[] value, int offset, int length) {
return result;
}

@Override
public void loadBytes(ColumnarArray array) {
if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
Platform.copyMemory(
null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length);
array.byteArray = array.tmpByteArray;
array.byteArrayOffset = 0;
}

// Split out the slow path.
@Override
protected void reserveInternal(int newCapacity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.types.UTF8String;

/**
* A column backed by an in memory JVM array. This stores the NULLs as a byte per value
Expand Down Expand Up @@ -203,6 +204,11 @@ public byte[] getBytes(int rowId, int count) {
return array;
}

@Override
protected UTF8String getBytesAsUTF8String(int rowId, int count) {
return UTF8String.fromBytes(byteData, rowId, count);
}

//
// APIs dealing with Shorts
//
Expand Down Expand Up @@ -484,12 +490,6 @@ public void putArray(int rowId, int offset, int length) {
arrayLengths[rowId] = length;
}

@Override
public void loadBytes(ColumnarArray array) {
array.byteArray = byteData;
array.byteArrayOffset = array.offset;
}

//
// APIs dealing with Byte Arrays
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,6 @@ public final int putByteArray(int rowId, byte[] value) {
return putByteArray(rowId, value, 0, value.length);
}

/**
* Returns the value for rowId.
*/
private ColumnarArray getByteArray(int rowId) {
ColumnarArray array = getArray(rowId);
array.data.loadBytes(array);
return array;
}

/**
* Returns the decimal for rowId.
*/
@Override
public Decimal getDecimal(int rowId, int precision, int scale) {
if (precision <= Decimal.MAX_INT_DIGITS()) {
Expand All @@ -318,30 +306,27 @@ public void putDecimal(int rowId, Decimal value, int precision) {
}
}

/**
* Returns the UTF8String for rowId.
*/
@Override
public UTF8String getUTF8String(int rowId) {
Copy link
Copy Markdown
Member

@viirya viirya Nov 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add comment that getUTF8String reuse the data in column vector? It seems different than other getXXX APIs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

if (dictionary == null) {
ColumnarArray a = getByteArray(rowId);
return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId));
} else {
byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
return UTF8String.fromBytes(bytes);
}
}

/**
* Returns the byte array for rowId.
* Gets the values of bytes from [rowId, rowId + count), as a UTF8String.
* This method is similar to {@link ColumnVector#getBytes(int, int)}, but can save data copy as
* UTF8String is used as a pointer.
*/
protected abstract UTF8String getBytesAsUTF8String(int rowId, int count);

@Override
public byte[] getBinary(int rowId) {
if (dictionary == null) {
ColumnarArray array = getByteArray(rowId);
byte[] bytes = new byte[array.length];
System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
return bytes;
return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId));
} else {
return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
}
Expand Down