Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void allocateBuffers() {
sizeBuffer = allocateBuffers(sizeAllocationSizeInBytes);
}

private ArrowBuf allocateBuffers(final long size) {
protected ArrowBuf allocateBuffers(final long size) {
final int curSize = (int) size;
ArrowBuf buffer = allocator.buffer(curSize);
buffer.readerIndex(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueIterableVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ZeroVector;
import org.apache.arrow.vector.compare.VectorVisitor;
import org.apache.arrow.vector.complex.impl.UnionLargeListViewReader;
import org.apache.arrow.vector.complex.impl.UnionLargeListViewWriter;
Expand Down Expand Up @@ -361,20 +362,17 @@ public TransferPair getTransferPair(Field field, BufferAllocator allocator) {

@Override
public TransferPair getTransferPair(String ref, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support getTransferPair(String, BufferAllocator, CallBack) yet");
return new TransferImpl(ref, allocator, callBack);
}

@Override
public TransferPair getTransferPair(Field field, BufferAllocator allocator, CallBack callBack) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support getTransferPair(Field, BufferAllocator, CallBack) yet");
return new TransferImpl(field, allocator, callBack);
}

@Override
public TransferPair makeTransferPair(ValueVector target) {
throw new UnsupportedOperationException(
"LargeListViewVector does not support makeTransferPair(ValueVector) yet");
return new TransferImpl((LargeListViewVector) target);
}

@Override
Expand Down Expand Up @@ -452,6 +450,159 @@ public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
return visitor.visit(this, value);
}

private class TransferImpl implements TransferPair {

LargeListViewVector to;
TransferPair dataTransferPair;

public TransferImpl(String name, BufferAllocator allocator, CallBack callBack) {
this(new LargeListViewVector(name, allocator, field.getFieldType(), callBack));
}

public TransferImpl(Field field, BufferAllocator allocator, CallBack callBack) {
this(new LargeListViewVector(field, allocator, callBack));
}

public TransferImpl(LargeListViewVector to) {
this.to = to;
to.addOrGetVector(vector.getField().getFieldType());
if (to.getDataVector() instanceof ZeroVector) {
to.addOrGetVector(vector.getField().getFieldType());
}
dataTransferPair = getDataVector().makeTransferPair(to.getDataVector());
}

@Override
public void transfer() {
to.clear();
dataTransferPair.transfer();
to.validityBuffer = transferBuffer(validityBuffer, to.allocator);
to.offsetBuffer = transferBuffer(offsetBuffer, to.allocator);
to.sizeBuffer = transferBuffer(sizeBuffer, to.allocator);
if (valueCount > 0) {
to.setValueCount(valueCount);
}
clear();
}

@Override
public void splitAndTransfer(int startIndex, int length) {
Preconditions.checkArgument(
startIndex >= 0 && length >= 0 && startIndex + length <= valueCount,
"Invalid parameters startIndex: %s, length: %s for valueCount: %s",
startIndex,
length,
valueCount);
to.clear();
if (length > 0) {
// we have to scan by index since there are out-of-order offsets
to.offsetBuffer = to.allocateBuffers((long) length * OFFSET_WIDTH);
to.sizeBuffer = to.allocateBuffers((long) length * SIZE_WIDTH);

/* splitAndTransfer the size buffer */
int maxOffsetAndSizeSum = Integer.MIN_VALUE;
int minOffsetValue = Integer.MAX_VALUE;
for (int i = 0; i < length; i++) {
final int offsetValue = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH);
final int sizeValue = sizeBuffer.getInt((long) (startIndex + i) * SIZE_WIDTH);
to.sizeBuffer.setInt((long) i * SIZE_WIDTH, sizeValue);
maxOffsetAndSizeSum = Math.max(maxOffsetAndSizeSum, offsetValue + sizeValue);
minOffsetValue = Math.min(minOffsetValue, offsetValue);
}

/* splitAndTransfer the offset buffer */
for (int i = 0; i < length; i++) {
final int offsetValue = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH);
final int relativeOffset = offsetValue - minOffsetValue;
to.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeOffset);
}

/* splitAndTransfer the validity buffer */
splitAndTransferValidityBuffer(startIndex, length, to);

/* splitAndTransfer the data buffer */
final int childSliceLength = maxOffsetAndSizeSum - minOffsetValue;
dataTransferPair.splitAndTransfer(minOffsetValue, childSliceLength);
to.setValueCount(length);
}
}

/*
* transfer the validity.
*/
private void splitAndTransferValidityBuffer(
int startIndex, int length, LargeListViewVector target) {
int firstByteSource = BitVectorHelper.byteIndex(startIndex);
int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1);
int byteSizeTarget = getValidityBufferSizeFromCount(length);
int offset = startIndex % 8;

if (length > 0) {
if (offset == 0) {
// slice
if (target.validityBuffer != null) {
target.validityBuffer.getReferenceManager().release();
}
target.validityBuffer = validityBuffer.slice(firstByteSource, byteSizeTarget);
target.validityBuffer.getReferenceManager().retain(1);
} else {
/* Copy data
* When the first bit starts from the middle of a byte (offset != 0),
* copy data from src BitVector.
* Each byte in the target is composed by a part in i-th byte,
* another part in (i+1)-th byte.
*/
target.allocateValidityBuffer(byteSizeTarget);

for (int i = 0; i < byteSizeTarget - 1; i++) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(validityBuffer, firstByteSource + i, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
validityBuffer, firstByteSource + i + 1, offset);

target.validityBuffer.setByte(i, (b1 + b2));
}

/* Copying the last piece is done in the following manner:
* if the source vector has 1 or more bytes remaining, we copy
* the last piece as a byte formed by shifting data
* from the current byte and the next byte.
*
* if the source vector has no more bytes remaining
* (we are at the last byte), we copy the last piece as a byte
* by shifting data from the current byte.
*/
if ((firstByteSource + byteSizeTarget - 1) < lastByteSource) {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
byte b2 =
BitVectorHelper.getBitsFromNextByte(
validityBuffer, firstByteSource + byteSizeTarget, offset);

target.validityBuffer.setByte(byteSizeTarget - 1, b1 + b2);
} else {
byte b1 =
BitVectorHelper.getBitsFromCurrentByte(
validityBuffer, firstByteSource + byteSizeTarget - 1, offset);
target.validityBuffer.setByte(byteSizeTarget - 1, b1);
}
}
}
}

@Override
public ValueVector getTo() {
return to;
}

@Override
public void copyValueSafe(int from, int to) {
this.to.copyFrom(from, to, LargeListViewVector.this);
}
}

@Override
protected FieldReader getReaderImpl() {
throw new UnsupportedOperationException(
Expand Down
Loading