diff --git a/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java index b3eb82d8a23b..83fc9459e205 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/BytePositionsAppender.java @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount) updateSize(rlePositionCount); } + @Override + public void append(int sourcePosition, Block source) + { + ensureCapacity(positionCount + 1); + if (source.isNull(sourcePosition)) { + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + values[positionCount] = source.getByte(sourcePosition, 0); + hasNonNullValue = true; + } + positionCount++; + + updateSize(1); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java index 4a7d875bd68d..a35441f45e25 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/Int128PositionsAppender.java @@ -129,6 +129,25 @@ public void appendRle(Block block, int rlePositionCount) updateSize(rlePositionCount); } + @Override + public void append(int sourcePosition, Block source) + { + ensureCapacity(positionCount + 1); + if (source.isNull(sourcePosition)) { + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + int positionIndex = positionCount * 2; + values[positionIndex] = source.getLong(sourcePosition, 0); + values[positionIndex + 1] = source.getLong(sourcePosition, SIZE_OF_LONG); + hasNonNullValue = true; + } + positionCount++; + + updateSize(1); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java index aeb469b20e50..9681503a2285 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/Int96PositionsAppender.java @@ -124,6 +124,25 @@ public void appendRle(Block block, int rlePositionCount) updateSize(rlePositionCount); } + @Override + public void append(int sourcePosition, Block source) + { + ensureCapacity(positionCount + 1); + if (source.isNull(sourcePosition)) { + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + high[positionCount] = source.getLong(sourcePosition, 0); + low[positionCount] = source.getInt(sourcePosition, SIZE_OF_LONG); + + hasNonNullValue = true; + } + positionCount++; + + updateSize(1); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java index 9bb986a0f3dc..4882545ff8a6 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/IntPositionsAppender.java @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount) updateSize(rlePositionCount); } + @Override + public void append(int sourcePosition, Block source) + { + ensureCapacity(positionCount + 1); + if (source.isNull(sourcePosition)) { + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + values[positionCount] = source.getInt(sourcePosition, 0); + hasNonNullValue = true; + } + positionCount++; + + updateSize(1); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java index 0d2054682ce3..c38f09f92adf 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/LongPositionsAppender.java @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount) updateSize(rlePositionCount); } + @Override + public void append(int sourcePosition, Block source) + { + ensureCapacity(positionCount + 1); + if (source.isNull(sourcePosition)) { + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + values[positionCount] = source.getLong(sourcePosition, 0); + hasNonNullValue = true; + } + positionCount++; + + updateSize(1); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java index 5cac0752a858..1e44d90c1081 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java @@ -25,7 +25,6 @@ import io.trino.operator.OperatorContext; import io.trino.operator.PartitionFunction; import io.trino.spi.Page; -import io.trino.spi.PageBuilder; import io.trino.spi.block.Block; import io.trino.spi.block.DictionaryBlock; import io.trino.spi.block.RunLengthEncodedBlock; @@ -62,14 +61,12 @@ public class PagePartitioner { private static final int COLUMNAR_STRATEGY_COEFFICIENT = 2; private final OutputBuffer outputBuffer; - private final Type[] sourceTypes; private final PartitionFunction partitionFunction; private final int[] partitionChannels; private final LocalMemoryContext memoryContext; @Nullable private final Block[] partitionConstantBlocks; // when null, no constants are present. Only non-null elements are constants private final PageSerializer serializer; - private final PageBuilder[] pageBuilders; private final PositionsAppenderPageBuilder[] positionsAppenders; private final boolean replicatesAnyRow; private final int nullChannel; // when >= 0, send the position to every partition if this channel is null @@ -107,7 +104,6 @@ public PagePartitioner( this.replicatesAnyRow = replicatesAnyRow; this.nullChannel = nullChannel.orElse(-1); this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); - this.sourceTypes = sourceTypes.toArray(new Type[0]); this.serializer = serdeFactory.createSerializer(exchangeEncryptionKey.map(Ciphers::deserializeAesEncryptionKey)); // Ensure partition channels align with constant arguments provided @@ -124,11 +120,7 @@ public PagePartitioner( this.positionsAppenders = new PositionsAppenderPageBuilder[partitionCount]; for (int i = 0; i < partitionCount; i++) { - positionsAppenders[i] = PositionsAppenderPageBuilder.withMaxPageSize(pageSize, sourceTypes, positionsAppenderFactory); - } - this.pageBuilders = new PageBuilder[partitionCount]; - for (int i = 0; i < partitionCount; i++) { - pageBuilders[i] = PageBuilder.withMaxPageSize(pageSize, sourceTypes); + positionsAppenders[i] = PositionsAppenderPageBuilder.withMaxPageSize(pageSize, requireNonNull(sourceTypes, "sourceTypes is null"), positionsAppenderFactory); } this.memoryContext = aggregatedMemoryContext.newLocalMemoryContext(PagePartitioner.class.getSimpleName()); this.partitionsInitialRetainedSize = getRetainedSizeInBytes(); @@ -183,8 +175,8 @@ public void partitionPageByRow(Page page) int position; // Handle "any row" replication outside of the inner loop processing if (replicatesAnyRow && !hasAnyRowBeenReplicated) { - for (PageBuilder pageBuilder : pageBuilders) { - appendRow(pageBuilder, page, 0); + for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) { + pageBuilder.appendToOutputPartition(page, 0); } hasAnyRowBeenReplicated = true; position = 1; @@ -199,34 +191,24 @@ public void partitionPageByRow(Page page) Block nullsBlock = page.getBlock(nullChannel); for (; position < page.getPositionCount(); position++) { if (nullsBlock.isNull(position)) { - for (PageBuilder pageBuilder : pageBuilders) { - appendRow(pageBuilder, page, position); + for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) { + pageBuilder.appendToOutputPartition(page, position); } } else { int partition = partitionFunction.getPartition(partitionFunctionArgs, position); - appendRow(pageBuilders[partition], page, position); + positionsAppenders[partition].appendToOutputPartition(page, position); } } } else { for (; position < page.getPositionCount(); position++) { int partition = partitionFunction.getPartition(partitionFunctionArgs, position); - appendRow(pageBuilders[partition], page, position); + positionsAppenders[partition].appendToOutputPartition(page, position); } } - flushPageBuilders(false); - } - - private void appendRow(PageBuilder pageBuilder, Page page, int position) - { - pageBuilder.declarePosition(); - - for (int channel = 0; channel < sourceTypes.length; channel++) { - Type type = sourceTypes[channel]; - type.appendTo(page.getBlock(channel), position, pageBuilder.getBlockBuilder(channel)); - } + flushPositionsAppenders(false); } public void partitionPageByColumn(Page page) @@ -451,30 +433,14 @@ public void close() { try { flushPositionsAppenders(true); - flushPageBuilders(true); } finally { // clear buffers before memory release Arrays.fill(positionsAppenders, null); - Arrays.fill(pageBuilders, null); memoryContext.close(); } } - private void flushPageBuilders(boolean force) - { - // add all full pages to output buffer - for (int partition = 0; partition < pageBuilders.length; partition++) { - PageBuilder partitionPageBuilder = pageBuilders[partition]; - if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) { - Page pagePartition = partitionPageBuilder.build(); - partitionPageBuilder.reset(); - - enqueuePage(pagePartition, partition); - } - } - } - private void flushPositionsAppenders(boolean force) { // add all full pages to output buffer @@ -511,9 +477,6 @@ private long getSizeInBytes() for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) { sizeInBytes += pageBuilder.getSizeInBytes(); } - for (PageBuilder pageBuilder : pageBuilders) { - sizeInBytes += pageBuilder.getSizeInBytes(); - } return sizeInBytes; } @@ -526,9 +489,6 @@ private long getRetainedSizeInBytes() for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) { sizeInBytes += pageBuilder.getRetainedSizeInBytes(); } - for (PageBuilder pageBuilder : pageBuilders) { - sizeInBytes += pageBuilder.getRetainedSizeInBytes(); - } sizeInBytes += serializer.getRetainedSizeInBytes(); return sizeInBytes; } diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java index 1fda303a9df8..4b0a38b61a53 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppender.java @@ -27,6 +27,15 @@ public interface PositionsAppender */ void appendRle(Block value, int rlePositionCount); + /** + * Appends single position. The implementation must be conceptually equal to + * {@code append(IntArrayList.wrap(new int[] {position}), source)} but may be optimized. + * Caller should avoid using this method if {@link #append(IntArrayList, Block)} can be used + * as appending positions one by one can be significantly slower and may not support features + * like pushing RLE through the appender. + */ + void append(int position, Block source); + /** * Creates the block from the appender data. * After this, appender is reset to the initial state, and it is ready to build a new block. diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java index f90db9682c26..e19aaeb97401 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java @@ -61,6 +61,16 @@ public void appendToOutputPartition(Page page, IntArrayList positions) } } + public void appendToOutputPartition(Page page, int position) + { + declarePositions(1); + + for (int channel = 0; channel < channelAppenders.length; channel++) { + Block block = page.getBlock(channel); + channelAppenders[channel].append(position, block); + } + } + public long getRetainedSizeInBytes() { // We use a foreach loop instead of streams diff --git a/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java index e2d171649b00..6b7092c56e1d 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/RleAwarePositionsAppender.java @@ -90,6 +90,13 @@ else if (rleValue != null) { } } + @Override + public void append(int position, Block value) + { + switchToFlat(); + delegate.append(position, value); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java index 26b524359651..84711ad278c3 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/RowPositionsAppender.java @@ -138,6 +138,36 @@ else if (value.isNull(0)) { updateSize(); } + @Override + public void append(int position, Block value) + { + ensureCapacity(1); + if (value instanceof AbstractRowBlock sourceRowBlock) { + if (sourceRowBlock.isNull(position)) { + rowIsNull[positionCount] = true; + hasNullRow = true; + } + else { + // append not null row value + List fieldBlocks = sourceRowBlock.getChildren(); + int fieldPosition = sourceRowBlock.getFieldBlockOffset(position); + for (int i = 0; i < fieldAppenders.length; i++) { + fieldAppenders[i].append(fieldPosition, fieldBlocks.get(i)); + } + hasNonNullRow = true; + } + } + else if (value.isNull(position)) { + rowIsNull[positionCount] = true; + hasNullRow = true; + } + else { + throw new IllegalArgumentException("unsupported block type: " + value); + } + positionCount++; + updateSize(); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java index 61be9de04162..85dcc84e85fe 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/ShortPositionsAppender.java @@ -116,6 +116,23 @@ public void appendRle(Block block, int rlePositionCount) updateSize(rlePositionCount); } + @Override + public void append(int sourcePosition, Block source) + { + ensureCapacity(positionCount + 1); + if (source.isNull(sourcePosition)) { + valueIsNull[positionCount] = true; + hasNullValue = true; + } + else { + values[positionCount] = source.getShort(sourcePosition, 0); + hasNonNullValue = true; + } + positionCount++; + + updateSize(1); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java index b551fc025b60..61ec474a2fa7 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/SlicePositionsAppender.java @@ -140,6 +140,35 @@ public void appendRle(Block block, int rlePositionCount) } } + @Override + public void append(int position, Block source) + { + ensurePositionCapacity(positionCount + 1); + if (source.isNull(position)) { + valueIsNull[positionCount] = true; + offsets[positionCount + 1] = getCurrentOffset(); + positionCount++; + + hasNullValue = true; + updateSize(1, 0); + } + else { + hasNonNullValue = true; + int currentOffset = getCurrentOffset(); + int sliceLength = source.getSliceLength(position); + Slice slice = source.getSlice(position, 0, sliceLength); + + ensureExtraBytesCapacity(sliceLength); + + slice.getBytes(0, bytes, currentOffset, sliceLength); + + offsets[positionCount + 1] = currentOffset + sliceLength; + + positionCount++; + updateSize(1, sliceLength); + } + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java index 4f82359169a8..bcf2dc540e8a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/TypedPositionsAppender.java @@ -60,6 +60,12 @@ public void appendRle(Block block, int rlePositionCount) } } + @Override + public void append(int position, Block source) + { + type.appendTo(source, position, blockBuilder); + } + @Override public Block build() { diff --git a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java index 5929114c2225..cedb39566fb1 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java @@ -63,6 +63,20 @@ public void appendRle(Block block, int rlePositionCount) delegate.appendRle(block, rlePositionCount); } + @Override + public void append(int position, Block source) + { + if (source instanceof RunLengthEncodedBlock runLengthEncodedBlock) { + delegate.append(0, runLengthEncodedBlock.getValue()); + } + else if (source instanceof DictionaryBlock dictionaryBlock) { + delegate.append(dictionaryBlock.getId(position), dictionaryBlock.getDictionary()); + } + else { + delegate.append(position, source); + } + } + @Override public Block build() { diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java index 79529892573f..51d2cf82b62d 100644 --- a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppender.java @@ -82,6 +82,7 @@ import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static java.util.Objects.requireNonNull; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; public class TestPositionsAppender @@ -199,6 +200,21 @@ public void testMultipleRleWithTheSameValueProduceRle(TestType type) assertInstanceOf(actual, RunLengthEncodedBlock.class); } + @Test(dataProvider = "types") + public void testRleAppendedWithSinglePositionDoesNotProduceRle(TestType type) + { + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + + Block value = notNullBlock(type, 1); + positionsAppender.append(allPositions(3), rleBlock(value, 3)); + positionsAppender.append(allPositions(2), rleBlock(value, 2)); + positionsAppender.append(0, rleBlock(value, 2)); + + Block actual = positionsAppender.build(); + assertEquals(actual.getPositionCount(), 6); + assertFalse(actual instanceof RunLengthEncodedBlock, actual.getClass().getSimpleName()); + } + @Test(dataProvider = "types") public void testConsecutiveBuilds(TestType type) { @@ -385,6 +401,12 @@ private void testNullRle(Type type, Block source) } private void testAppend(TestType type, List inputs) + { + testAppendBatch(type, inputs); + testAppendSingle(type, inputs); + } + + private void testAppendBatch(TestType type, List inputs) { PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); long initialRetainedSize = positionsAppender.getRetainedSizeInBytes(); @@ -402,6 +424,24 @@ private void testAppend(TestType type, List inputs) assertEquals(secondBlock.getPositionCount(), 0); } + private void testAppendSingle(TestType type, List inputs) + { + PositionsAppender positionsAppender = POSITIONS_APPENDER_FACTORY.create(type.getType(), 10, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + long initialRetainedSize = positionsAppender.getRetainedSizeInBytes(); + + inputs.forEach(input -> input.getPositions().forEach((int position) -> positionsAppender.append(position, input.getBlock()))); + long sizeInBytes = positionsAppender.getSizeInBytes(); + assertGreaterThanOrEqual(positionsAppender.getRetainedSizeInBytes(), sizeInBytes); + Block actual = positionsAppender.build(); + + assertBlockIsValid(actual, sizeInBytes, type.getType(), inputs); + // verify positionsAppender reset + assertEquals(positionsAppender.getSizeInBytes(), 0); + assertEquals(positionsAppender.getRetainedSizeInBytes(), initialRetainedSize); + Block secondBlock = positionsAppender.build(); + assertEquals(secondBlock.getPositionCount(), 0); + } + private void assertBlockIsValid(Block actual, long sizeInBytes, Type type, List inputs) { PageBuilderStatus pageBuilderStatus = new PageBuilderStatus();