diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java index 7b113d87d429..4ba6fd3361df 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java @@ -13,6 +13,8 @@ */ package io.trino.operator.output; +import com.google.common.annotations.VisibleForTesting; +import io.trino.operator.project.PageProcessor; import io.trino.spi.Page; import io.trino.spi.block.Block; import io.trino.spi.type.Type; @@ -20,31 +22,50 @@ import java.util.List; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.util.Objects.requireNonNull; public class PositionsAppenderPageBuilder { private static final int DEFAULT_INITIAL_EXPECTED_ENTRIES = 8; + @VisibleForTesting + static final int MAX_POSITION_COUNT = PageProcessor.MAX_BATCH_SIZE * 4; + // Maximum page size before being considered full based on current direct appender size and if RLE channels were converted to direct. Currently, + // dictionary mode appenders still under-report because computing their equivalent size if converted to direct is prohibitively expensive. + private static final int MAXIMUM_DIRECT_SIZE_MULTIPLIER = 8; + private final UnnestingPositionsAppender[] channelAppenders; private final int maxPageSizeInBytes; + private final int maxDirectPageSizeInBytes; private int declaredPositions; public static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, List sourceTypes, PositionsAppenderFactory positionsAppenderFactory) { - return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, sourceTypes, positionsAppenderFactory); + return withMaxPageSize(maxPageBytes, maxPageBytes * MAXIMUM_DIRECT_SIZE_MULTIPLIER, sourceTypes, positionsAppenderFactory); + } + + @VisibleForTesting + static PositionsAppenderPageBuilder withMaxPageSize(int maxPageBytes, int maxDirectSizeInBytes, List sourceTypes, PositionsAppenderFactory positionsAppenderFactory) + { + return new PositionsAppenderPageBuilder(DEFAULT_INITIAL_EXPECTED_ENTRIES, maxPageBytes, maxDirectSizeInBytes, sourceTypes, positionsAppenderFactory); } private PositionsAppenderPageBuilder( int initialExpectedEntries, int maxPageSizeInBytes, + int maxDirectPageSizeInBytes, List types, PositionsAppenderFactory positionsAppenderFactory) { requireNonNull(types, "types is null"); requireNonNull(positionsAppenderFactory, "positionsAppenderFactory is null"); + checkArgument(maxPageSizeInBytes > 0, "maxPageSizeInBytes is negative: %s", maxPageSizeInBytes); + checkArgument(maxDirectPageSizeInBytes > 0, "maxDirectPageSizeInBytes is negative: %s", maxDirectPageSizeInBytes); + checkArgument(maxDirectPageSizeInBytes >= maxPageSizeInBytes, "maxDirectPageSizeInBytes (%s) must be >= maxPageSizeInBytes (%s)", maxDirectPageSizeInBytes, maxPageSizeInBytes); this.maxPageSizeInBytes = maxPageSizeInBytes; + this.maxDirectPageSizeInBytes = maxDirectPageSizeInBytes; channelAppenders = new UnnestingPositionsAppender[types.size()]; for (int i = 0; i < channelAppenders.length; i++) { channelAppenders[i] = positionsAppenderFactory.create(types.get(i), initialExpectedEntries, maxPageSizeInBytes); @@ -98,7 +119,24 @@ private void declarePositions(int positions) public boolean isFull() { - return declaredPositions == Integer.MAX_VALUE || getSizeInBytes() >= maxPageSizeInBytes; + if (declaredPositions == 0) { + return false; + } + if (declaredPositions >= MAX_POSITION_COUNT) { + return true; + } + PositionsAppenderSizeAccumulator accumulator = computeAppenderSizes(); + return accumulator.getSizeInBytes() >= maxPageSizeInBytes || accumulator.getDirectSizeInBytes() >= maxDirectPageSizeInBytes; + } + + @VisibleForTesting + PositionsAppenderSizeAccumulator computeAppenderSizes() + { + PositionsAppenderSizeAccumulator accumulator = new PositionsAppenderSizeAccumulator(); + for (UnnestingPositionsAppender positionsAppender : channelAppenders) { + positionsAppender.addSizesToAccumulator(accumulator); + } + return accumulator; } public boolean isEmpty() diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderSizeAccumulator.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderSizeAccumulator.java new file mode 100644 index 000000000000..f0f43953e700 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderSizeAccumulator.java @@ -0,0 +1,42 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +final class PositionsAppenderSizeAccumulator +{ + private long sizeInBytes; + private long directSizeInBytes; + + public long getSizeInBytes() + { + return sizeInBytes; + } + + public long getDirectSizeInBytes() + { + return directSizeInBytes; + } + + public void accumulate(long sizeInBytes, long directSizeInBytes) + { + this.sizeInBytes += sizeInBytes; + this.directSizeInBytes += directSizeInBytes; + } + + @Override + public String toString() + { + return "PositionsAppenderSizeAccumulator{sizeInBytes=" + sizeInBytes + " directSizeInBytes=" + directSizeInBytes + "}"; + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java index 63c1485fcc08..23d2c1147861 100644 --- a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java +++ b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java @@ -206,10 +206,19 @@ public long getRetainedSizeInBytes() public long getSizeInBytes() { return delegate.getSizeInBytes() + - // dictionary size is not included due to the expense of the calculation + // dictionary size is not included due to the expense of the calculation, but we can account for the ids size + (dictionaryIdsBuilder.size() * (long) Integer.BYTES) + (rleValue != null ? rleValue.getSizeInBytes() : 0); } + void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator) + { + long sizeInBytes = getSizeInBytes(); + // dictionary size is not included due to the expense of the calculation, so this will under-report for dictionaries + long directSizeInBytes = (rleValue == null) ? sizeInBytes : (rleValue.getSizeInBytes() * rlePositionCount); + accumulator.accumulate(sizeInBytes, directSizeInBytes); + } + private static class DictionaryIdsBuilder { private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class); diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java new file mode 100644 index 000000000000..ca4cf5f5125f --- /dev/null +++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java @@ -0,0 +1,105 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.output; + +import io.airlift.slice.Slices; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.RunLengthEncodedBlock; +import io.trino.type.BlockTypeOperators; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static io.trino.spi.type.VarcharType.VARCHAR; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestPositionsAppenderPageBuilder +{ + @Test + public void testFullOnPositionCountLimit() + { + int maxPageBytes = 1024 * 1024; + int maxDirectSize = maxPageBytes * 10; + PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize( + maxPageBytes, + maxDirectSize, + List.of(VARCHAR), + new PositionsAppenderFactory(new BlockTypeOperators())); + + Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10); + Page inputPage = new Page(rleBlock); + + IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); + // Append 32760 positions, just less than MAX_POSITION_COUNT + assertEquals(32768, PositionsAppenderPageBuilder.MAX_POSITION_COUNT, "expected MAX_POSITION_COUNT to be 32768"); + for (int i = 0; i < 3276; i++) { + pageBuilder.appendToOutputPartition(inputPage, positions); + } + assertFalse(pageBuilder.isFull(), "pageBuilder should still not be full"); + // Append 10 more positions, crossing the threshold on position count + pageBuilder.appendToOutputPartition(inputPage, positions); + assertTrue(pageBuilder.isFull(), "pageBuilder should be full"); + PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes(); + assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes()); + assertTrue(sizeAccumulator.getDirectSizeInBytes() < maxDirectSize, "direct size should still be below threshold"); + assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value"); + } + + @Test + public void testFullOnDirectSizeInBytes() + { + int maxPageBytes = 100; + int maxDirectSize = 1000; + PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize( + maxPageBytes, + maxDirectSize, + List.of(VARCHAR), + new PositionsAppenderFactory(new BlockTypeOperators())); + + PositionsAppenderSizeAccumulator sizeAccumulator = pageBuilder.computeAppenderSizes(); + assertEquals(0L, sizeAccumulator.getSizeInBytes()); + assertEquals(0L, sizeAccumulator.getDirectSizeInBytes()); + assertFalse(pageBuilder.isFull()); + + Block rleBlock = RunLengthEncodedBlock.create(VARCHAR, Slices.utf8Slice("test"), 10); + Page inputPage = new Page(rleBlock); + + IntArrayList positions = IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}); + pageBuilder.appendToOutputPartition(inputPage, positions); + // 10 positions inserted, size in bytes is still the same since we're in RLE mode but direct size is 10x + sizeAccumulator = pageBuilder.computeAppenderSizes(); + assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes()); + assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value"); + assertEquals(rleBlock.getSizeInBytes() * 10, sizeAccumulator.getDirectSizeInBytes()); + assertFalse(pageBuilder.isFull()); + + // Keep inserting until the direct size limit is reached + while (pageBuilder.computeAppenderSizes().getDirectSizeInBytes() < maxDirectSize) { + pageBuilder.appendToOutputPartition(inputPage, positions); + } + // size in bytes is unchanged + sizeAccumulator = pageBuilder.computeAppenderSizes(); + assertEquals(rleBlock.getSizeInBytes(), sizeAccumulator.getSizeInBytes(), "sizeInBytes must still report the RLE block size only"); + assertEquals(sizeAccumulator.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder sizeInBytes must match sizeAccumulator value"); + // builder reports full due to maximum size in bytes reached + assertTrue(pageBuilder.isFull()); + Page result = pageBuilder.build(); + assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum"); + assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded"); + } +}