diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java index a87c6eb60c6a..086c9f4bafa1 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderFactory.java @@ -16,6 +16,7 @@ import io.trino.spi.block.Int128ArrayBlock; import io.trino.spi.block.Int96ArrayBlock; import io.trino.spi.type.FixedWidthType; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.spi.type.VariableWidthType; import io.trino.type.BlockTypeOperators; @@ -66,6 +67,9 @@ private PositionsAppender createPrimitiveAppender(Type type, int expectedPositio else if (type instanceof VariableWidthType) { return new SlicePositionsAppender(expectedPositions, maxPageSizeInBytes); } + else if (type instanceof RowType) { + return RowPositionsAppender.createRowAppender(this, (RowType) type, expectedPositions, maxPageSizeInBytes); + } return new TypedPositionsAppender(type, expectedPositions); } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java new file mode 100644 index 000000000000..1250e385a0d5 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java @@ -0,0 +1,225 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.trino.spi.block.Block; +import io.trino.spi.block.RowBlock; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.spi.type.RowType; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.openjdk.jol.info.ClassLayout; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static io.airlift.slice.SizeOf.sizeOf; +import static io.trino.operator.output.PositionsAppenderUtil.calculateBlockResetSize; +import static io.trino.operator.output.PositionsAppenderUtil.calculateNewArraySize; +import static io.trino.spi.block.RowBlock.fromFieldBlocks; +import static java.util.Objects.requireNonNull; + +public class RowPositionsAppender + implements PositionsAppender +{ + private static final int INSTANCE_SIZE = ClassLayout.parseClass(RowPositionsAppender.class).instanceSize(); + private final PositionsAppender[] fieldAppenders; + private int initialEntryCount; + private boolean initialized; + + private int positionCount; + private boolean hasNullRow; + private boolean hasNonNullRow; + private boolean[] rowIsNull = new boolean[0]; + private long retainedSizeInBytes; + private long sizeInBytes; + + public static RowPositionsAppender createRowAppender( + PositionsAppenderFactory positionsAppenderFactory, + RowType type, + int expectedPositions, + long maxPageSizeInBytes) + { + PositionsAppender[] fields = new PositionsAppender[type.getFields().size()]; + for (int i = 0; i < fields.length; i++) { + fields[i] = positionsAppenderFactory.create(type.getFields().get(i).getType(), expectedPositions, maxPageSizeInBytes); + } + return new RowPositionsAppender(fields, expectedPositions); + } + + private RowPositionsAppender(PositionsAppender[] fieldAppenders, int expectedPositions) + { + this.fieldAppenders = requireNonNull(fieldAppenders, "fields is null"); + this.initialEntryCount = expectedPositions; + updateRetainedSize(); + } + + @Override + public void append(IntArrayList positions, Block block) + { + if (positions.isEmpty()) { + return; + } + ensureCapacity(positions.size()); + RowBlock sourceRowBlock = (RowBlock) block; + IntArrayList nonNullPositions; + if (sourceRowBlock.mayHaveNull()) { + nonNullPositions = processNullablePositions(positions, sourceRowBlock); + hasNullRow |= nonNullPositions.size() < positions.size(); + hasNonNullRow |= nonNullPositions.size() > 0; + } + else { + // the source Block does not have nulls + nonNullPositions = processNonNullablePositions(positions, sourceRowBlock); + hasNonNullRow = true; + } + + List fieldBlocks = sourceRowBlock.getChildren(); + for (int i = 0; i < fieldAppenders.length; i++) { + fieldAppenders[i].append(nonNullPositions, fieldBlocks.get(i)); + } + + positionCount += positions.size(); + updateSize(); + } + + @Override + public void appendRle(RunLengthEncodedBlock rleBlock) + { + int rlePositionCount = rleBlock.getPositionCount(); + ensureCapacity(rlePositionCount); + RowBlock sourceRowBlock = (RowBlock) rleBlock.getValue(); + if (sourceRowBlock.isNull(0)) { + // append rlePositionCount nulls + Arrays.fill(rowIsNull, positionCount, positionCount + rlePositionCount, true); + hasNullRow = true; + } + else { + // append not null row value + List fieldBlocks = sourceRowBlock.getChildren(); + int fieldPosition = sourceRowBlock.getFieldBlockOffset(0); + for (int i = 0; i < fieldAppenders.length; i++) { + fieldAppenders[i].appendRle(new RunLengthEncodedBlock(fieldBlocks.get(i).getSingleValueBlock(fieldPosition), rlePositionCount)); + } + hasNonNullRow = true; + } + positionCount += rlePositionCount; + updateSize(); + } + + @Override + public Block build() + { + Block[] fieldBlocks = new Block[fieldAppenders.length]; + for (int i = 0; i < fieldAppenders.length; i++) { + fieldBlocks[i] = fieldAppenders[i].build(); + } + Block result; + if (hasNonNullRow) { + result = fromFieldBlocks(positionCount, hasNullRow ? Optional.of(rowIsNull) : Optional.empty(), fieldBlocks); + } + else { + Block nullRowBlock = fromFieldBlocks(1, Optional.of(new boolean[] {true}), fieldBlocks); + result = new RunLengthEncodedBlock(nullRowBlock, positionCount); + } + + reset(); + return result; + } + + @Override + public long getRetainedSizeInBytes() + { + long size = retainedSizeInBytes; + for (PositionsAppender field : fieldAppenders) { + size += field.getRetainedSizeInBytes(); + } + return size; + } + + @Override + public long getSizeInBytes() + { + return sizeInBytes; + } + + private void reset() + { + initialEntryCount = calculateBlockResetSize(positionCount); + initialized = false; + rowIsNull = new boolean[0]; + positionCount = 0; + sizeInBytes = 0; + hasNonNullRow = false; + hasNullRow = false; + updateRetainedSize(); + } + + private IntArrayList processNullablePositions(IntArrayList positions, RowBlock sourceRowBlock) + { + int[] nonNullPositions = new int[positions.size()]; + int nonNullPositionsCount = 0; + + for (int i = 0; i < positions.size(); i++) { + int position = positions.getInt(i); + boolean positionIsNull = sourceRowBlock.isNull(position); + nonNullPositions[nonNullPositionsCount] = sourceRowBlock.getFieldBlockOffset(position); + nonNullPositionsCount += positionIsNull ? 0 : 1; + rowIsNull[positionCount + i] = positionIsNull; + } + + return IntArrayList.wrap(nonNullPositions, nonNullPositionsCount); + } + + private IntArrayList processNonNullablePositions(IntArrayList positions, RowBlock sourceRowBlock) + { + int[] nonNullPositions = new int[positions.size()]; + for (int i = 0; i < positions.size(); i++) { + nonNullPositions[i] = sourceRowBlock.getFieldBlockOffset(positions.getInt(i)); + } + return IntArrayList.wrap(nonNullPositions); + } + + private void ensureCapacity(int additionalCapacity) + { + if (rowIsNull.length <= positionCount + additionalCapacity) { + int newSize; + if (initialized) { + newSize = calculateNewArraySize(rowIsNull.length); + } + else { + newSize = initialEntryCount; + initialized = true; + } + + int newCapacity = Math.max(newSize, positionCount + additionalCapacity); + rowIsNull = Arrays.copyOf(rowIsNull, newCapacity); + updateRetainedSize(); + } + } + + private void updateSize() + { + long size = (Integer.BYTES + Byte.BYTES) * (long) positionCount; + for (PositionsAppender field : fieldAppenders) { + size += field.getSizeInBytes(); + } + sizeInBytes = size; + } + + private void updateRetainedSize() + { + retainedSizeInBytes = INSTANCE_SIZE + sizeOf(rowIsNull); + } +} diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java index e79520d3d193..7e0f6bba624e 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java @@ -20,10 +20,12 @@ import io.trino.spi.block.BlockBuilderStatus; import io.trino.spi.block.DictionaryBlock; import io.trino.spi.block.PageBuilderStatus; +import io.trino.spi.block.RowBlock; import io.trino.spi.block.RunLengthEncodedBlock; import io.trino.spi.type.ArrayType; import io.trino.spi.type.Decimals; import io.trino.spi.type.LongTimestamp; +import io.trino.spi.type.RowType; import io.trino.spi.type.Type; import io.trino.type.BlockTypeOperators; import it.unimi.dsi.fastutil.ints.IntArrayList; @@ -31,6 +33,7 @@ import org.testng.annotations.Test; import java.util.List; +import java.util.Optional; import java.util.stream.IntStream; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; @@ -57,6 +60,7 @@ import static io.trino.spi.type.DecimalType.createDecimalType; import static io.trino.spi.type.DoubleType.DOUBLE; import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RowType.anonymousRow; import static io.trino.spi.type.SmallintType.SMALLINT; import static io.trino.spi.type.TimestampType.createTimestampType; import static io.trino.spi.type.TinyintType.TINYINT; @@ -91,7 +95,10 @@ public void testMixedBlockTypes(Type type) input(rleBlock(dictionaryBlock(rleBlock(type, 4), 1), 3), 1), // rle -> dict -> rle input(dictionaryBlock(dictionaryBlock(type, 5, 4, 0.5F), 3), 2), // dict -> dict input(dictionaryBlock(dictionaryBlock(dictionaryBlock(type, 5, 4, 0.5F), 3), 3), 2), // dict -> dict -> dict - input(dictionaryBlock(rleBlock(type, 4), 3), 0, 2)); // dict -> rle + input(dictionaryBlock(rleBlock(type, 4), 3), 0, 2), // dict -> rle + input(notNullBlock(type, 4).getRegion(2, 2), 0, 1), // not null block with offset + input(partiallyNullBlock(type, 4).getRegion(2, 2), 0, 1), // nullable block with offset + input(rleBlock(notNullBlock(type, 4).getRegion(2, 1), 3), 1)); // rle block with offset testAppend(type, input); } @@ -101,6 +108,7 @@ public void testNullRle(Type type) { testNullRle(type, nullBlock(type, 2)); testNullRle(type, nullRleBlock(type, 2)); + testNullRle(type, createRandomBlockForType(type, 4, 0.5f)); } @Test(dataProvider = "types") @@ -113,7 +121,7 @@ public void testRleSwitchToFlat(Type type) List dictionaryInputs = ImmutableList.of( input(rleBlock(type, 3), 0, 1), - input(dictionaryBlock(type, 2, 4, 0.5F), 0, 1)); + input(dictionaryBlock(type, 2, 4, 0), 0, 1)); testAppend(type, dictionaryInputs); } @@ -126,7 +134,7 @@ public void testFlatAppendRle(Type type) testAppend(type, inputs); List dictionaryInputs = ImmutableList.of( - input(dictionaryBlock(type, 2, 4, 0.5F), 0, 1), + input(dictionaryBlock(type, 2, 4, 0), 0, 1), input(rleBlock(type, 3), 0, 1)); testAppend(type, dictionaryInputs); } @@ -233,6 +241,24 @@ public void testSliceRle() } } + @Test + public void testRowWithNestedFields() + { + RowType type = anonymousRow(BIGINT, BIGINT, VARCHAR); + Block rowBLock = RowBlock.fromFieldBlocks(2, Optional.empty(), new Block[] { + notNullBlock(BIGINT, 2), + dictionaryBlock(BIGINT, 2, 2, 0.5F), + rleBlock(VARCHAR, 2) + }); + + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + + positionsAppender.append(allPositions(2), rowBLock); + Block actual = positionsAppender.build(); + + assertBlockEquals(type, actual, rowBLock); + } + @DataProvider(name = "nullRleTypes") public static Object[][] nullRleTypes() { @@ -248,7 +274,8 @@ public static Object[][] nullRleTypes() {TINYINT}, {VARBINARY}, {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, - {createTimestampType(9)} + {createTimestampType(9)}, + {anonymousRow(BIGINT, VARCHAR)} }; } @@ -268,7 +295,8 @@ public static Object[][] types() {VARBINARY}, {createDecimalType(Decimals.MAX_SHORT_PRECISION + 1)}, {new ArrayType(BIGINT)}, - {createTimestampType(9)} + {createTimestampType(9)}, + {anonymousRow(BIGINT, VARCHAR)} }; } @@ -354,13 +382,19 @@ private Block emptyBlock(Type type) private void testNullRle(Type type, Block source) { PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type, 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); - + // extract null positions + IntArrayList positions = new IntArrayList(source.getPositionCount()); + for (int i = 0; i < source.getPositionCount(); i++) { + if (source.isNull(i)) { + positions.add(i); + } + } // append twice to trigger RleAwarePositionsAppender.equalOperator call - positionsAppender.append(new IntArrayList(IntStream.range(0, source.getPositionCount()).toArray()), source); - positionsAppender.append(new IntArrayList(IntStream.range(0, source.getPositionCount()).toArray()), source); + positionsAppender.append(positions, source); + positionsAppender.append(positions, source); Block actual = positionsAppender.build(); assertTrue(actual.isNull(0)); - assertEquals(actual.getPositionCount(), source.getPositionCount() * 2); + assertEquals(actual.getPositionCount(), positions.size() * 2); assertInstanceOf(actual, RunLengthEncodedBlock.class); } diff --git a/core/trino-spi/src/main/java/io/trino/spi/block/AbstractRowBlock.java b/core/trino-spi/src/main/java/io/trino/spi/block/AbstractRowBlock.java index c0cd6211b291..fef6da38249b 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/block/AbstractRowBlock.java +++ b/core/trino-spi/src/main/java/io/trino/spi/block/AbstractRowBlock.java @@ -51,7 +51,7 @@ public final List getChildren() protected abstract boolean[] getRowIsNull(); // the offset in each field block, it can also be viewed as the "entry-based" offset in the RowBlock - protected final int getFieldBlockOffset(int position) + public final int getFieldBlockOffset(int position) { int[] offsets = getFieldBlockOffsets(); return offsets != null ? offsets[position + getOffsetBase()] : position + getOffsetBase();