Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.operator.output;

import com.google.common.annotations.VisibleForTesting;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.block.Block;
Expand Down Expand Up @@ -80,38 +81,43 @@ public void append(IntArrayList positions, Block block)
return;
}
ensurePositionCapacity(positionCount + positions.size());
int[] positionArray = positions.elements();
int newByteCount = 0;
int[] lengths = new int[positions.size()];

if (block.mayHaveNull()) {
for (int i = 0; i < positions.size(); i++) {
int position = positionArray[i];
if (block.isNull(position)) {
offsets[positionCount + i + 1] = offsets[positionCount + i];
valueIsNull[positionCount + i] = true;
hasNullValue = true;
if (block instanceof VariableWidthBlock) {
VariableWidthBlock variableWidthBlock = (VariableWidthBlock) block;
int newByteCount = 0;
int[] lengths = new int[positions.size()];
int[] sourceOffsets = new int[positions.size()];
int[] positionArray = positions.elements();

if (block.mayHaveNull()) {
for (int i = 0; i < positions.size(); i++) {
int position = positionArray[i];
int length = variableWidthBlock.getSliceLength(position);
lengths[i] = length;
sourceOffsets[i] = variableWidthBlock.getRawSliceOffset(position);
newByteCount += length;
boolean isNull = block.isNull(position);
valueIsNull[positionCount + i] = isNull;
offsets[positionCount + i + 1] = offsets[positionCount + i] + length;
hasNullValue |= isNull;
hasNonNullValue |= !isNull;
}
else {
int length = block.getSliceLength(position);
}
else {
for (int i = 0; i < positions.size(); i++) {
int position = positionArray[i];
int length = variableWidthBlock.getSliceLength(position);
lengths[i] = length;
sourceOffsets[i] = variableWidthBlock.getRawSliceOffset(position);
newByteCount += length;
offsets[positionCount + i + 1] = offsets[positionCount + i] + length;
hasNonNullValue = true;
}
hasNonNullValue = true;
}
copyBytes(variableWidthBlock.getRawSlice(), lengths, sourceOffsets, positions.size(), newByteCount);
}
else {
for (int i = 0; i < positions.size(); i++) {
int position = positionArray[i];
int length = block.getSliceLength(position);
lengths[i] = length;
newByteCount += length;
offsets[positionCount + i + 1] = offsets[positionCount + i] + length;
}
hasNonNullValue = true;
appendGenericBlock(positions, block);
}
copyBytes(block, lengths, positionArray, positions.size(), offsets, positionCount, newByteCount);
}

@Override
Expand All @@ -132,7 +138,7 @@ public void appendRle(RunLengthEncodedBlock block)
}
else {
hasNonNullValue = true;
duplicateBytes(block.getValue(), 0, rlePositionCount);
duplicateBytes(block.getSlice(0, 0, block.getSliceLength(0)), rlePositionCount);
}
}

Expand Down Expand Up @@ -166,16 +172,20 @@ public long getSizeInBytes()
return sizeInBytes;
}

private void copyBytes(Block block, int[] lengths, int[] positions, int count, int[] targetOffsets, int targetOffsetsIndex, int newByteCount)
private void copyBytes(Slice rawSlice, int[] lengths, int[] sourceOffsets, int count, int newByteCount)
{
ensureBytesCapacity(getCurrentOffset() + newByteCount);
ensureExtraBytesCapacity(newByteCount);

for (int i = 0; i < count; i++) {
int position = positions[i];
if (!block.isNull(position)) {
int length = lengths[i];
Slice slice = block.getSlice(position, 0, length);
slice.getBytes(0, bytes, targetOffsets[targetOffsetsIndex + i], length);
if (rawSlice.hasByteArray()) {
byte[] base = rawSlice.byteArray();
int byteArrayOffset = rawSlice.byteArrayOffset();
for (int i = 0; i < count; i++) {
System.arraycopy(base, byteArrayOffset + sourceOffsets[i], bytes, offsets[positionCount + i], lengths[i]);
}
}
else {
for (int i = 0; i < count; i++) {
rawSlice.getBytes(sourceOffsets[i], bytes, offsets[positionCount + i], lengths[i]);
}
}

Expand All @@ -184,25 +194,75 @@ private void copyBytes(Block block, int[] lengths, int[] positions, int count, i
}

/**
* Copy {@code length} bytes from {@code block}, at position {@code position} to {@code count} consecutive positions in the {@link #bytes} array.
* Copy all bytes from {@code slice} to {@code count} consecutive positions in the {@link #bytes} array.
*/
private void duplicateBytes(Block block, int position, int count)
private void duplicateBytes(Slice slice, int count)
{
int length = block.getSliceLength(position);
int length = slice.length();
int newByteCount = toIntExact((long) count * length);
int startOffset = getCurrentOffset();
ensureBytesCapacity(startOffset + newByteCount);
ensureExtraBytesCapacity(newByteCount);

duplicateBytes(slice, bytes, startOffset, count);

Slice slice = block.getSlice(position, 0, length);
int currentStartOffset = startOffset + length;
for (int i = 0; i < count; i++) {
slice.getBytes(0, bytes, startOffset + (i * length), length);
offsets[positionCount + i + 1] = startOffset + ((i + 1) * length);
offsets[positionCount + i + 1] = currentStartOffset;
currentStartOffset += length;
}

positionCount += count;
updateSize(count, newByteCount);
}

/**
* Copy {@code length} bytes from {@code slice}, starting at offset {@code sourceOffset} to {@code count} consecutive positions in the {@link #bytes} array.
*/
@VisibleForTesting
static void duplicateBytes(Slice slice, byte[] bytes, int startOffset, int count)
{
int length = slice.length();
if (length == 0) {
// nothing to copy
return;
}
// copy slice to the first position
slice.getBytes(0, bytes, startOffset, length);
int totalDuplicatedBytes = count * length;
int duplicatedBytes = length;
// copy every byte copied so far, doubling the number of bytes copied on evey iteration
while (duplicatedBytes * 2 <= totalDuplicatedBytes) {
System.arraycopy(bytes, startOffset, bytes, startOffset + duplicatedBytes, duplicatedBytes);
duplicatedBytes = duplicatedBytes * 2;
}
// copy the leftover
System.arraycopy(bytes, startOffset, bytes, startOffset + duplicatedBytes, totalDuplicatedBytes - duplicatedBytes);
}

private void appendGenericBlock(IntArrayList positions, Block block)
{
int newByteCount = 0;
for (int i = 0; i < positions.size(); i++) {
int position = positions.getInt(i);
if (block.isNull(position)) {
offsets[positionCount + 1] = offsets[positionCount];
valueIsNull[positionCount] = true;
hasNullValue = true;
}
else {
int length = block.getSliceLength(position);
ensureExtraBytesCapacity(length);
Slice slice = block.getSlice(position, 0, length);
slice.getBytes(0, bytes, offsets[positionCount], length);
offsets[positionCount + 1] = offsets[positionCount] + length;
hasNonNullValue = true;
newByteCount += length;
}
positionCount++;
}
updateSize(positions.size(), newByteCount);
}

private void reset()
{
initialEntryCount = calculateBlockResetSize(positionCount);
Expand All @@ -228,12 +288,13 @@ private void updateSize(long positionsSize, int bytesWritten)
sizeInBytes += (SIZE_OF_BYTE + SIZE_OF_INT) * positionsSize + bytesWritten;
}

private void ensureBytesCapacity(int bytesCapacity)
private void ensureExtraBytesCapacity(int extraBytesCapacity)
{
if (bytes.length < bytesCapacity) {
int totalBytesCapacity = getCurrentOffset() + extraBytesCapacity;
if (bytes.length < totalBytesCapacity) {
int newBytesLength = Math.max(bytes.length, initialBytesSize);
if (bytesCapacity > newBytesLength) {
newBytesLength = Math.max(bytesCapacity, calculateNewArraySize(newBytesLength));
if (totalBytesCapacity > newBytesLength) {
newBytesLength = Math.max(totalBytesCapacity, calculateNewArraySize(newBytesLength));
}
bytes = Arrays.copyOf(bytes, newBytesLength);
updateRetainedSize();
Expand Down
Loading