Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,12 @@ public Block copyPositions(int[] positions, int offset, int length)
return block.copyPositions(positions, offset, length);
}

@Override
public Block copyWithAppendedNull()
{
throw new UnsupportedOperationException("GroupByIdBlock does not support newBlockWithAppendedNull()");
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,42 @@
*/
package io.trino.operator.unnest;

import com.google.common.collect.Iterables;
import io.trino.spi.block.Block;
import io.trino.spi.block.ColumnarArray;
import io.trino.spi.block.ColumnarRow;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.unnest.UnnestOperator.ensureCapacity;
import static io.trino.spi.block.ColumnarArray.toColumnarArray;
import static io.trino.spi.block.ColumnarRow.toColumnarRow;
import static java.util.Objects.requireNonNull;

/**
* Unnester for a nested column with array type, only when array elements are of {@code RowType} type.
* It maintains {@link ColumnarArray} and {@link ColumnarRow} objects to get underlying elements. The two
* different columnar structures are required because there are two layers of translation involved. One
* from {@code ArrayBlock} to {@code RowBlock}, and then from {@code RowBlock} to individual element blocks.
* <p>
* All protected methods implemented here assume that they are invoked when {@code columnarArray} and
* {@code columnarRow} are non-null.
*/
class ArrayOfRowsUnnester
extends Unnester
public class ArrayOfRowsUnnester
implements Unnester
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ArrayOfRowsUnnester.class).instanceSize();

private final int fieldCount;
private final UnnestBlockBuilder[] blockBuilders;

private int[] arrayLengths = new int[0];
private ColumnarArray columnarArray;
private ColumnarRow columnarRow;
private final int fieldCount;

// Keeping track of null row element count is required. This count needs to be deducted
// when translating row block indexes to element block indexes.
private int nullRowsEncountered;
public ArrayOfRowsUnnester(int fieldCount)
{
blockBuilders = createUnnestBlockBuilders(fieldCount);
this.fieldCount = fieldCount;
}

public ArrayOfRowsUnnester(RowType elementType)
private static UnnestBlockBuilder[] createUnnestBlockBuilders(int fieldCount)
{
super(Iterables.toArray(requireNonNull(elementType, "elementType is null").getTypeParameters(), Type.class));
this.fieldCount = elementType.getTypeParameters().size();
this.nullRowsEncountered = 0;
UnnestBlockBuilder[] builders = new UnnestBlockBuilder[fieldCount];
for (int i = 0; i < fieldCount; i++) {
builders[i] = new UnnestBlockBuilder();
}
return builders;
}

@Override
Expand All @@ -59,68 +58,72 @@ public int getChannelCount()
}

@Override
int getInputEntryCount()
public void resetInput(Block block)
{
if (columnarArray == null) {
return 0;
requireNonNull(block, "block is null");
columnarArray = toColumnarArray(block);
columnarRow = toColumnarRow(columnarArray.getElementsBlock());

for (int i = 0; i < fieldCount; i++) {
blockBuilders[i].resetInputBlock(columnarRow.getField(i), columnarRow.getNullCheckBlock());
}

int positionCount = block.getPositionCount();
arrayLengths = ensureCapacity(arrayLengths, positionCount, false);
for (int j = 0; j < positionCount; j++) {
arrayLengths[j] = columnarArray.getLength(j);
}
return columnarArray.getPositionCount();
}

@Override
protected void resetColumnarStructure(Block block)
public int[] getOutputEntriesPerPosition()
{
columnarArray = toColumnarArray(block);
columnarRow = toColumnarRow(columnarArray.getElementsBlock());
nullRowsEncountered = 0;
return arrayLengths;
}

@Override
public void processCurrentPosition(int requireCount)
public Block[] buildOutputBlocks(int[] outputEntriesPerPosition, int startPosition, int batchSize, int outputRowCount)
{
// Translate to row block index
int rowBlockIndex = columnarArray.getOffset(getCurrentPosition());

// Unnest current entry
for (int i = 0; i < getCurrentUnnestedLength(); i++) {
if (columnarRow.isNull(rowBlockIndex + i)) {
// Nulls have to be appended when Row element itself is null
for (int field = 0; field < fieldCount; field++) {
getBlockBuilder(field).appendNull();
}
nullRowsEncountered++;
boolean nullRequired = needToInsertNulls(startPosition, batchSize, outputRowCount);

Block[] outputBlocks = new Block[fieldCount];
for (int i = 0; i < fieldCount; i++) {
if (nullRequired) {
outputBlocks[i] = blockBuilders[i].buildWithNulls(outputEntriesPerPosition, startPosition, batchSize, outputRowCount, arrayLengths);
}
else {
for (int field = 0; field < fieldCount; field++) {
getBlockBuilder(field).appendElement(rowBlockIndex + i - nullRowsEncountered);
}
outputBlocks[i] = blockBuilders[i].buildWithoutNulls(outputRowCount);
}
}

// Append nulls if more output entries are needed
appendNulls(requireCount - getCurrentUnnestedLength());
return outputBlocks;
}

@Override
public void appendNulls(int count)
private boolean needToInsertNulls(int offset, int inputBatchSize, int outputRowCount)
{
for (int i = 0; i < count; i++) {
for (int field = 0; field < fieldCount; field++) {
getBlockBuilder(field).appendNull();
int start = columnarArray.getOffset(offset);
int end = columnarArray.getOffset(offset + inputBatchSize);
int totalLength = end - start;

if (totalLength < outputRowCount) {
return true;
}

if (columnarRow.mayHaveNull()) {
for (int i = start; i < end; i++) {
if (columnarRow.isNull(i)) {
return true;
}
}
}
}

@Override
protected Block getElementsBlock(int channel)
{
checkState(channel >= 0 && channel < fieldCount, "Invalid channel number");
return columnarRow.getField(channel);
return false;
}

@Override
protected int getElementsLength(int index)
public long getRetainedSizeInBytes()
{
return columnarArray.getLength(index);
// The lengths array in blockBuilders is the same object as in the unnester and doesn't need to be counted again.
return INSTANCE_SIZE + sizeOf(arrayLengths);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,21 @@

import io.trino.spi.block.Block;
import io.trino.spi.block.ColumnarArray;
import io.trino.spi.type.Type;
import org.openjdk.jol.info.ClassLayout;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.unnest.UnnestOperator.ensureCapacity;
import static io.trino.spi.block.ColumnarArray.toColumnarArray;
import static java.util.Objects.requireNonNull;

/**
* Unnester for a nested column with array type, only when array elements are NOT of type {@code RowType}.
* Maintains a {@link ColumnarArray} object to get underlying elements block from the array block.
* <p>
* All protected methods implemented here assume that they are being invoked when {@code columnarArray} is non-null.
*/
class ArrayUnnester
extends Unnester
public class ArrayUnnester
implements Unnester
{
private ColumnarArray columnarArray;
private static final int INSTANCE_SIZE = ClassLayout.parseClass(ArrayUnnester.class).instanceSize();

public ArrayUnnester(Type elementType)
{
super(elementType);
}
private final UnnestBlockBuilder blockBuilder = new UnnestBlockBuilder();
private int[] arrayLengths = new int[0];
private ColumnarArray columnarArray;

@Override
public int getChannelCount()
Expand All @@ -43,50 +38,47 @@ public int getChannelCount()
}

@Override
protected int getInputEntryCount()
public void resetInput(Block block)
{
if (columnarArray == null) {
return 0;
}
return columnarArray.getPositionCount();
}
requireNonNull(block, "block is null");

@Override
protected void resetColumnarStructure(Block block)
{
this.columnarArray = toColumnarArray(block);
}
columnarArray = toColumnarArray(block);
blockBuilder.resetInputBlock(columnarArray.getElementsBlock());

@Override
protected Block getElementsBlock(int channel)
{
checkState(channel == 0, "index is not 0");
return columnarArray.getElementsBlock();
int positionCount = block.getPositionCount();
arrayLengths = ensureCapacity(arrayLengths, positionCount, false);
for (int i = 0; i < positionCount; i++) {
arrayLengths[i] = columnarArray.getLength(i);
}
}

@Override
protected void processCurrentPosition(int requiredOutputCount)
public int[] getOutputEntriesPerPosition()
{
// Translate indices
int startElementIndex = columnarArray.getOffset(getCurrentPosition());
int length = columnarArray.getLength(getCurrentPosition());

// Append elements and nulls
getBlockBuilder(0).appendRange(startElementIndex, length);
appendNulls(requiredOutputCount - length);
return arrayLengths;
}

@Override
protected void appendNulls(int count)
public Block[] buildOutputBlocks(int[] outputEntriesPerPosition, int startPosition, int inputBatchSize, int outputRowCount)
{
for (int i = 0; i < count; i++) {
getBlockBuilder(0).appendNull();
int unnestedLength = columnarArray.getOffset(startPosition + inputBatchSize) - columnarArray.getOffset(startPosition);
boolean nullRequired = unnestedLength < outputRowCount;

Block[] outputBlocks = new Block[1];
if (nullRequired) {
outputBlocks[0] = blockBuilder.buildWithNulls(outputEntriesPerPosition, startPosition, inputBatchSize, outputRowCount, arrayLengths);
}
else {
outputBlocks[0] = blockBuilder.buildWithoutNulls(outputRowCount);
}

return outputBlocks;
}

@Override
protected int getElementsLength(int index)
public long getRetainedSizeInBytes()
{
return columnarArray.getLength(index);
// The lengths array in blockBuilders is the same object as in the unnester and does not need to be counted again.
return INSTANCE_SIZE + sizeOf(arrayLengths);
}
}
Loading