Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.spi.block.Int128ArrayBlock;
import io.trino.spi.block.Int96ArrayBlock;
import io.trino.spi.type.FixedWidthType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VariableWidthType;
import io.trino.type.BlockTypeOperators;
Expand Down Expand Up @@ -66,6 +67,9 @@ private PositionsAppender createPrimitiveAppender(Type type, int expectedPositio
else if (type instanceof VariableWidthType) {
return new SlicePositionsAppender(expectedPositions, maxPageSizeInBytes);
}
else if (type instanceof RowType) {
return RowPositionsAppender.createRowAppender(this, (RowType) type, expectedPositions, maxPageSizeInBytes);
}

return new TypedPositionsAppender(type, expectedPositions);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.output;

import io.trino.spi.block.Block;
import io.trino.spi.block.RowBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.RowType;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.openjdk.jol.info.ClassLayout;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;

import static io.airlift.slice.SizeOf.sizeOf;
import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize;
import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize;
import static io.trino.spi.block.RowBlock.fromFieldBlocks;
import static java.util.Objects.requireNonNull;

public class RowPositionsAppender
implements PositionsAppender
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(RowPositionsAppender.class).instanceSize();
private final PositionsAppender[] fieldAppenders;
private int initialEntryCount;
private boolean initialized;

private int positionCount;
private boolean hasNullRow;
private boolean hasNonNullRow;
private boolean[] rowIsNull = new boolean[0];
private long retainedSizeInBytes;
private long sizeInBytes;

public static RowPositionsAppender createRowAppender(
PositionsAppenderFactory positionsAppenderFactory,
RowType type,
int expectedPositions,
long maxPageSizeInBytes)
{
PositionsAppender[] fields = new PositionsAppender[type.getFields().size()];
for (int i = 0; i < fields.length; i++) {
fields[i] = positionsAppenderFactory.create(type.getFields().get(i).getType(), expectedPositions, maxPageSizeInBytes);
}
return new RowPositionsAppender(fields, expectedPositions);
}

private RowPositionsAppender(PositionsAppender[] fieldAppenders, int expectedPositions)
{
this.fieldAppenders = requireNonNull(fieldAppenders, "fields is null");
this.initialEntryCount = expectedPositions;
updateRetainedSize();
}

@Override
public void append(IntArrayList positions, Block block)
{
if (positions.isEmpty()) {
return;
}
ensureCapacity(positions.size());
RowBlock sourceRowBlock = (RowBlock) block;
IntArrayList nonNullPositions;
if (sourceRowBlock.mayHaveNull()) {
nonNullPositions = processNullablePositions(positions, sourceRowBlock);
hasNullRow |= nonNullPositions.size() < positions.size();
hasNonNullRow |= nonNullPositions.size() > 0;
}
else {
// the source Block does not have nulls
nonNullPositions = processNonNullablePositions(positions, sourceRowBlock);
hasNonNullRow = true;
}

List<Block> fieldBlocks = sourceRowBlock.getChildren();
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].append(nonNullPositions, fieldBlocks.get(i));
}

positionCount += positions.size();
updateSize();
}

@Override
public void appendRle(RunLengthEncodedBlock rleBlock)
{
int rlePositionCount = rleBlock.getPositionCount();
ensureCapacity(rlePositionCount);
RowBlock sourceRowBlock = (RowBlock) rleBlock.getValue();
if (sourceRowBlock.isNull(0)) {
// append rlePositionCount nulls
Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true);
hasNullRow = true;
}
else {
// append not null row value
List<Block> fieldBlocks = sourceRowBlock.getChildren();
int fieldPosition = sourceRowBlock.getFieldBlockOffset(0);
for (int i = 0; i < fieldAppenders.length; i++) {
fieldAppenders[i].appendRle(new RunLengthEncodedBlock(fieldBlocks.get(i).getSingleValueBlock(fieldPosition), rlePositionCount));
}
hasNonNullRow = true;
}
positionCount += rlePositionCount;
updateSize();
}

@Override
public Block build()
{
Block[] fieldBlocks = new Block[fieldAppenders.length];
for (int i = 0; i < fieldAppenders.length; i++) {
fieldBlocks[i] = fieldAppenders[i].build();
}
Block result;
if (hasNonNullRow) {
result = fromFieldBlocks(positionCount, hasNullRow ? Optional.of(rowIsNull) : Optional.empty(), fieldBlocks);
}
else {
Block nullRowBlock = fromFieldBlocks(1, Optional.of(new boolean[] {true}), fieldBlocks);
result = new RunLengthEncodedBlock(nullRowBlock, positionCount);
}

reset();
return result;
}

@Override
public long getRetainedSizeInBytes()
{
long size = retainedSizeInBytes;
for (PositionsAppender field : fieldAppenders) {
size += field.getRetainedSizeInBytes();
}
return size;
}

@Override
public long getSizeInBytes()
{
return sizeInBytes;
}

private void reset()
{
initialEntryCount = calculateBlockResetSize(positionCount);
initialized = false;
rowIsNull = new boolean[0];
positionCount = 0;
sizeInBytes = 0;
hasNonNullRow = false;
hasNullRow = false;
updateRetainedSize();
}

private IntArrayList processNullablePositions(IntArrayList positions, RowBlock sourceRowBlock)
{
int[] nonNullPositions = new int[positions.size()];
int nonNullPositionsCount = 0;

for (int i = 0; i < positions.size(); i++) {
int position = positions.getInt(i);
boolean positionIsNull = sourceRowBlock.isNull(position);
nonNullPositions[nonNullPositionsCount] = sourceRowBlock.getFieldBlockOffset(position);
nonNullPositionsCount += positionIsNull ? 0 : 1;
rowIsNull[positionCount + i] = positionIsNull;
}

return IntArrayList.wrap(nonNullPositions, nonNullPositionsCount);
}

private IntArrayList processNonNullablePositions(IntArrayList positions, RowBlock sourceRowBlock)
{
int[] nonNullPositions = new int[positions.size()];
for (int i = 0; i < positions.size(); i++) {
nonNullPositions[i] = sourceRowBlock.getFieldBlockOffset(positions.getInt(i));
}
return IntArrayList.wrap(nonNullPositions);
}

private void ensureCapacity(int additionalCapacity)
{
if (rowIsNull.length <= positionCount + additionalCapacity) {
int newSize;
if (initialized) {
newSize = calculateNewArraySize(rowIsNull.length);
}
else {
newSize = initialEntryCount;
initialized = true;
}

int newCapacity = Math.max(newSize, positionCount + additionalCapacity);
rowIsNull = Arrays.copyOf(rowIsNull, newCapacity);
updateRetainedSize();
}
}

private void updateSize()
{
long size = (Integer.BYTES + Byte.BYTES) * (long) positionCount;
for (PositionsAppender field : fieldAppenders) {
size += field.getSizeInBytes();
}
sizeInBytes = size;
}

private void updateRetainedSize()
{
retainedSizeInBytes = INSTANCE_SIZE + sizeOf(rowIsNull);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,20 @@
import io.trino.spi.block.BlockBuilderStatus;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.PageBuilderStatus;
import io.trino.spi.block.RowBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.Decimals;
import io.trino.spi.type.LongTimestamp;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;
import io.trino.type.BlockTypeOperators;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

import static io.airlift.testing.Assertions.assertGreaterThanOrEqual;
Expand All @@ -57,6 +60,7 @@
import static io.trino.spi.type.DecimalType.createDecimalType;
import static io.trino.spi.type.DoubleType.DOUBLE;
import static io.trino.spi.type.IntegerType.INTEGER;
import static io.trino.spi.type.RowType.anonymousRow;
import static io.trino.spi.type.SmallintType.SMALLINT;
import static io.trino.spi.type.TimestampType.createTimestampType;
import static io.trino.spi.type.TinyintType.TINYINT;
Expand Down Expand Up @@ -91,7 +95,10 @@ public void testMixedBlockTypes(Type type)
input(rleBlock(dictionaryBlock(rleBlock(type, 4), 1), 3), 1), // rle -> dict -> rle
input(dictionaryBlock(dictionaryBlock(type, 5, 4, 0.5F), 3), 2), // dict -> dict
input(dictionaryBlock(dictionaryBlock(dictionaryBlock(type, 5, 4, 0.5F), 3), 3), 2), // dict -> dict -> dict
input(dictionaryBlock(rleBlock(type, 4), 3), 0, 2)); // dict -> rle
input(dictionaryBlock(rleBlock(type, 4), 3), 0, 2), // dict -> rle
input(notNullBlock(type, 4).getRegion(2, 2), 0, 1), // not null block with offset
input(partiallyNullBlock(type, 4).getRegion(2, 2), 0, 1), // nullable block with offset
input(rleBlock(notNullBlock(type, 4).getRegion(2, 1), 3), 1)); // rle block with offset

testAppend(type, input);
}
Expand All @@ -101,6 +108,7 @@ public void testNullRle(Type type)
{
testNullRle(type, nullBlock(type, 2));
testNullRle(type, nullRleBlock(type, 2));
testNullRle(type, createRandomBlockForType(type, 4, 0.5f));
}

@Test(dataProvider = "types")
Expand All @@ -113,7 +121,7 @@ public void testRleSwitchToFlat(Type type)

List<BlockView> dictionaryInputs = ImmutableList.of(
input(rleBlock(type, 3), 0, 1),
input(dictionaryBlock(type, 2, 4, 0.5F), 0, 1));
input(dictionaryBlock(type, 2, 4, 0), 0, 1));
testAppend(type, dictionaryInputs);
}

Expand All @@ -126,7 +134,7 @@ public void testFlatAppendRle(Type type)
testAppend(type, inputs);

List<BlockView> dictionaryInputs = ImmutableList.of(
input(dictionaryBlock(type, 2, 4, 0.5F), 0, 1),
input(dictionaryBlock(type, 2, 4, 0), 0, 1),
input(rleBlock(type, 3), 0, 1));
testAppend(type, dictionaryInputs);
}
Expand Down Expand Up @@ -233,6 +241,24 @@ public void testSliceRle()
}
}

@Test
public void testRowWithNestedFields()
{
RowType type = anonymousRow(BIGINT, BIGINT, VARCHAR);
Block rowBLock = RowBlock.fromFieldBlocks(2, Optional.empty(), new Block[] {
notNullBlock(BIGINT, 2),
dictionaryBlock(BIGINT, 2, 2, 0.5F),
rleBlock(VARCHAR, 2)
});

PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);

positionsAppender.append(allPositions(2), rowBLock);
Block actual = positionsAppender.build();

assertBlockEquals(type, actual, rowBLock);
}

@DataProvider(name = "nullRleTypes")
public static Object[][] nullRleTypes()
{
Expand All @@ -248,7 +274,8 @@ public static Object[][] nullRleTypes()
{TINYINT},
{VARBINARY},
{createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)},
{createTimestampType(9)}
{createTimestampType(9)},
{anonymousRow(BIGINT, VARCHAR)}
};
}

Expand All @@ -268,7 +295,8 @@ public static Object[][] types()
{VARBINARY},
{createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)},
{new ArrayType(BIGINT)},
{createTimestampType(9)}
{createTimestampType(9)},
{anonymousRow(BIGINT, VARCHAR)}
};
}

Expand Down Expand Up @@ -354,13 +382,19 @@ private Block emptyBlock(Type type)
private void testNullRle(Type type, Block source)
{
PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES);

// extract null positions
IntArrayList positions = new IntArrayList(source.getPositionCount());
for (int i = 0; i < source.getPositionCount(); i++) {
if (source.isNull(i)) {
positions.add(i);
}
}
// append twice to trigger RleAwarePositionsAppender.equalOperator call
positionsAppender.append(new IntArrayList(IntStream.range(0, source.getPositionCount()).toArray()), source);
positionsAppender.append(new IntArrayList(IntStream.range(0, source.getPositionCount()).toArray()), source);
positionsAppender.append(positions, source);
positionsAppender.append(positions, source);
Block actual = positionsAppender.build();
assertTrue(actual.isNull(0));
assertEquals(actual.getPositionCount(), source.getPositionCount() * 2);
assertEquals(actual.getPositionCount(), positions.size() * 2);
assertInstanceOf(actual, RunLengthEncodedBlock.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public final List<Block> getChildren()
protected abstract boolean[] getRowIsNull();

// the offset in each field block, it can also be viewed as the "entry-based" offset in the RowBlock
protected final int getFieldBlockOffset(int position)
public final int getFieldBlockOffset(int position)
{
int[] offsets = getFieldBlockOffsets();
return offsets != null ? offsets[position + getOffsetBase()] : position + getOffsetBase();
Expand Down