diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java
index 4a58724a729a..86fd777a9897 100644
--- a/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java
+++ b/core/trino-main/src/main/java/io/trino/operator/output/PagePartitioner.java
@@ -69,6 +69,9 @@ public class PagePartitioner
private final int nullChannel; // when >= 0, send the position to every partition if this channel is null
private boolean hasAnyRowBeenReplicated;
+ // outputSizeInBytes that has already been reported to the operator stats during release and should be subtracted
+ // from future stats reporting to avoid double counting
+ private long outputSizeReportedBeforeRelease;
public PagePartitioner(
PartitionFunction partitionFunction,
@@ -135,7 +138,6 @@ public void partitionPage(Page page, OperatorContext operatorContext)
}
int outputPositionCount = replicatesAnyRow && !hasAnyRowBeenReplicated ? page.getPositionCount() + positionsAppenders.length - 1 : page.getPositionCount();
- long positionsAppendersSizeBefore = getPositionsAppendersSizeInBytes();
if (page.getPositionCount() < partitionFunction.partitionCount() * COLUMNAR_STRATEGY_COEFFICIENT) {
// Partition will have on average less than COLUMNAR_STRATEGY_COEFFICIENT rows.
// Doing it column-wise would degrade performance, so we fall back to row-wise approach.
@@ -146,11 +148,68 @@ public void partitionPage(Page page, OperatorContext operatorContext)
else {
partitionPageByColumn(page);
}
- long positionsAppendersSizeAfter = getPositionsAppendersSizeInBytes();
- flushPositionsAppenders(false);
+ long outputSizeInBytes = flushPositionsAppenders(false);
updateMemoryUsage();
+ operatorContext.recordOutput(outputSizeInBytes, outputPositionCount);
+ }
+
+ private long adjustFlushedOutputSizeWithEagerlyReportedBytes(long flushedOutputSize)
+ {
+ // Reduce the flushed output size by the previously eagerly reported amount to avoid double counting
+ if (outputSizeReportedBeforeRelease > 0) {
+ long adjustmentAmount = min(flushedOutputSize, outputSizeReportedBeforeRelease);
+ outputSizeReportedBeforeRelease -= adjustmentAmount;
+ flushedOutputSize -= adjustmentAmount;
+ }
+ return flushedOutputSize;
+ }
- operatorContext.recordOutput(positionsAppendersSizeAfter - positionsAppendersSizeBefore, outputPositionCount);
+ private long adjustEagerlyReportedBytesWithBufferedBytesOnRelease(long bufferedBytesOnRelease)
+ {
+ // adjust the amount to eagerly report as output by the amount already eagerly reported if the new value
+ // is larger, since this indicates that no data was flushed and only the delta between the two values should
+ // be reported eagerly
+ if (outputSizeReportedBeforeRelease > 0 && bufferedBytesOnRelease >= outputSizeReportedBeforeRelease) {
+ bufferedBytesOnRelease -= outputSizeReportedBeforeRelease;
+ outputSizeReportedBeforeRelease += bufferedBytesOnRelease;
+ }
+ return bufferedBytesOnRelease;
+ }
+
+ /**
+ * Prepares this {@link PagePartitioner} for release to the pool by checking for dictionary mode appenders and either flattening
+ * them into direct appenders or forcing their current pages to flush to preserve a valuable dictionary encoded representation. This
+ * is done before release because we know that after reuse, the appenders will not observe any more inputs using the same dictionary.
+ *
+ * When a {@link PagePartitioner} is released back to the {@link PagePartitionerPool} we don't know if it will ever be reused. If it is not
+ * reused, then we have no {@link OperatorContext} we can use to report the output size of the final flushed page, so instead we report the
+ * buffered bytes still in the partitioner after {@link PagePartitioner#prepareForRelease(OperatorContext)} as output bytes eagerly and record
+ * that amount in {@link #outputSizeReportedBeforeRelease}. If the {@link PagePartitioner} is reused after having reported buffered bytes eagerly,
+ * we then have to subtract that same amount from the subsequent output bytes to avoid double counting them.
+ */
+ public void prepareForRelease(OperatorContext operatorContext)
+ {
+ long bufferedSizeInBytes = 0;
+ long outputSizeInBytes = 0;
+ for (int partition = 0; partition < positionsAppenders.length; partition++) {
+ PositionsAppenderPageBuilder positionsAppender = positionsAppenders[partition];
+ Optional flushedPage = positionsAppender.flushOrFlattenBeforeRelease();
+ if (flushedPage.isPresent()) {
+ Page page = flushedPage.get();
+ outputSizeInBytes += page.getSizeInBytes();
+ enqueuePage(page, partition);
+ }
+ else {
+ // Dictionaries have now been flattened, so the new reported size is trustworthy to report
+ // eagerly
+ bufferedSizeInBytes += positionsAppender.getSizeInBytes();
+ }
+ }
+ updateMemoryUsage();
+ // Adjust flushed and buffered values against the previously eagerly reported sizes
+ outputSizeInBytes = adjustFlushedOutputSizeWithEagerlyReportedBytes(outputSizeInBytes);
+ bufferedSizeInBytes = adjustEagerlyReportedBytesWithBufferedBytesOnRelease(bufferedSizeInBytes);
+ operatorContext.recordOutput(outputSizeInBytes + bufferedSizeInBytes, 0 /* no new positions */);
}
public void partitionPageByRow(Page page)
@@ -210,15 +269,6 @@ public void partitionPageByColumn(Page page)
}
}
- private long getPositionsAppendersSizeInBytes()
- {
- long sizeInBytes = 0;
- for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
- sizeInBytes += pageBuilder.getSizeInBytes();
- }
- return sizeInBytes;
- }
-
private IntArrayList[] partitionPositions(Page page)
{
verify(page.getPositionCount() > 0, "position count is 0");
@@ -424,6 +474,7 @@ public void close()
{
try {
flushPositionsAppenders(true);
+ outputSizeReportedBeforeRelease = 0;
}
finally {
// clear buffers before memory release
@@ -432,16 +483,19 @@ public void close()
}
}
- private void flushPositionsAppenders(boolean force)
+ private long flushPositionsAppenders(boolean force)
{
+ long outputSizeInBytes = 0;
// add all full pages to output buffer
for (int partition = 0; partition < positionsAppenders.length; partition++) {
PositionsAppenderPageBuilder partitionPageBuilder = positionsAppenders[partition];
if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) {
Page pagePartition = partitionPageBuilder.build();
+ outputSizeInBytes += pagePartition.getSizeInBytes();
enqueuePage(pagePartition, partition);
}
}
+ return adjustFlushedOutputSizeWithEagerlyReportedBytes(outputSizeInBytes);
}
private void enqueuePage(Page pagePartition, int partition)
diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java b/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
index 0bc28fee8330..fd683e126352 100644
--- a/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
+++ b/core/trino-main/src/main/java/io/trino/operator/output/PartitionedOutputOperator.java
@@ -284,6 +284,7 @@ public OperatorContext getOperatorContext()
public void finish()
{
if (!finished) {
+ pagePartitioner.prepareForRelease(operatorContext);
pagePartitionerPool.release(pagePartitioner);
finished = true;
}
diff --git a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
index 4ba6fd3361df..91948beec761 100644
--- a/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
+++ b/core/trino-main/src/main/java/io/trino/operator/output/PositionsAppenderPageBuilder.java
@@ -21,6 +21,7 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;
import java.util.List;
+import java.util.Optional;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -144,6 +145,32 @@ public boolean isEmpty()
return declaredPositions == 0;
}
+ public Optional flushOrFlattenBeforeRelease()
+ {
+ if (declaredPositions == 0) {
+ return Optional.empty();
+ }
+
+ for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
+ if (positionsAppender.shouldForceFlushBeforeRelease()) {
+ // dictionary encoding will be preserved, so force the current page to be flushed
+ return Optional.of(build());
+ }
+ }
+
+ // transition from dictionary to direct mode if necessary, since we won't be able to reuse the
+ // same dictionary from the new operator
+ for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
+ positionsAppender.flattenPendingDictionary();
+ }
+
+ // flush the current page if forced or if the builder is now full as a result of transitioning dictionaries to direct mode
+ if (isFull()) {
+ return Optional.of(build());
+ }
+ return Optional.empty();
+ }
+
public Page build()
{
Block[] blocks = new Block[channelAppenders.length];
diff --git a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java
index 23d2c1147861..258aeb54bd5e 100644
--- a/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java
+++ b/core/trino-main/src/main/java/io/trino/operator/output/UnnestingPositionsAppender.java
@@ -20,6 +20,7 @@
import io.trino.type.BlockTypeOperators.BlockPositionIsDistinctFrom;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntArrays;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import jakarta.annotation.Nullable;
import java.util.Optional;
@@ -52,6 +53,7 @@ private enum State
private State state = State.UNINITIALIZED;
+ @Nullable
private ValueBlock dictionary;
private DictionaryIdsBuilder dictionaryIdsBuilder;
@@ -219,6 +221,28 @@ void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator)
accumulator.accumulate(sizeInBytes, directSizeInBytes);
}
+ public void flattenPendingDictionary()
+ {
+ if (state == State.DICTIONARY && dictionary != null) {
+ transitionToDirect();
+ }
+ }
+
+ public boolean shouldForceFlushBeforeRelease()
+ {
+ if (state == State.DICTIONARY && dictionary != null) {
+ IntOpenHashSet uniqueIdsSet = new IntOpenHashSet();
+ int[] dictionaryIds = dictionaryIdsBuilder.getDictionaryIds();
+ for (int i = 0; i < dictionaryIdsBuilder.size(); i++) {
+ // At least one position is referenced multiple times, preserve the dictionary encoding and force the current page to flush
+ if (!uniqueIdsSet.add(dictionaryIds[i])) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
private static class DictionaryIdsBuilder
{
private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class);
diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java
index a8adb0ddbab2..9dd54666e942 100644
--- a/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java
+++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPartitionedOutputOperator.java
@@ -68,7 +68,10 @@ public void testOperatorContextStats()
partitionedOutputOperator.addInput(page);
OperatorContext operatorContext = partitionedOutputOperator.getOperatorContext();
- assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes());
+ assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(0);
assertThat(operatorContext.getOutputPositions().getTotalCount()).isEqualTo(page.getPositionCount());
+
+ partitionedOutputOperator.finish();
+ assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes());
}
}
diff --git a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java
index ca4cf5f5125f..542cea8944be 100644
--- a/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java
+++ b/core/trino-main/src/test/java/io/trino/operator/output/TestPositionsAppenderPageBuilder.java
@@ -16,14 +16,20 @@
import io.airlift.slice.Slices;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
+import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
+import io.trino.spi.block.ValueBlock;
+import io.trino.spi.predicate.Utils;
import io.trino.type.BlockTypeOperators;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Optional;
+import static io.trino.block.BlockAssertions.createRandomBlockForType;
import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.lang.Math.toIntExact;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -102,4 +108,59 @@ public void testFullOnDirectSizeInBytes()
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum");
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded");
}
+
+ @Test
+ public void testFlushUsefulDictionariesOnRelease()
+ {
+ int maxPageBytes = 100;
+ int maxDirectSize = 1000;
+ PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
+ maxPageBytes,
+ maxDirectSize,
+ List.of(VARCHAR),
+ new PositionsAppenderFactory(new BlockTypeOperators()));
+
+ Block valueBlock = Utils.nativeValueToBlock(VARCHAR, Slices.utf8Slice("test"));
+ Block dictionaryBlock = DictionaryBlock.create(10, valueBlock, new int[10]);
+ Page inputPage = new Page(dictionaryBlock);
+
+ pageBuilder.appendToOutputPartition(inputPage, IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ // Dictionary mode appender should report the size of the ID's, but doesn't currently track
+ // the per-position size at all because it would be inefficient
+ assertEquals(Integer.BYTES * 10, pageBuilder.getSizeInBytes());
+ assertFalse(pageBuilder.isFull());
+
+ Optional flushedPage = pageBuilder.flushOrFlattenBeforeRelease();
+ assertTrue(flushedPage.isPresent(), "pageBuilder should force flush the dictionary");
+ assertTrue(flushedPage.get().getBlock(0) instanceof DictionaryBlock, "result should be dictionary encoded");
+ }
+
+ @Test
+ public void testFlattenUnhelpfulDictionariesOnRelease()
+ {
+ // Create unhelpful dictionary wrapping
+ Block valueBlock = createRandomBlockForType(VARCHAR, 10, 0.25f);
+ Block dictionaryBlock = DictionaryBlock.create(10, valueBlock, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
+ Page inputPage = new Page(dictionaryBlock);
+
+ // Ensure the builder allows the entire value block to be inserted without being full
+ int maxPageBytes = toIntExact(valueBlock.getSizeInBytes() * 10);
+ int maxDirectSize = maxPageBytes * 10;
+ PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
+ maxPageBytes,
+ maxDirectSize,
+ List.of(VARCHAR),
+ new PositionsAppenderFactory(new BlockTypeOperators()));
+
+ pageBuilder.appendToOutputPartition(inputPage, IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
+ assertEquals(Integer.BYTES * 10, pageBuilder.getSizeInBytes());
+ assertFalse(pageBuilder.isFull());
+
+ assertEquals(Optional.empty(), pageBuilder.flushOrFlattenBeforeRelease(), "pageBuilder should not force a flush");
+ assertFalse(pageBuilder.isFull());
+ assertEquals(valueBlock.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder should have transitioned to direct mode");
+
+ Page result = pageBuilder.build();
+ assertTrue(result.getBlock(0) instanceof ValueBlock, "result should not be a dictionary block");
+ }
}