Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public class PagePartitioner
private final int nullChannel; // when >= 0, send the position to every partition if this channel is null

private boolean hasAnyRowBeenReplicated;
// outputSizeInBytes that has already been reported to the operator stats during release and should be subtracted
// from future stats reporting to avoid double counting
private long outputSizeReportedBeforeRelease;

public PagePartitioner(
PartitionFunction partitionFunction,
Expand Down Expand Up @@ -135,7 +138,6 @@ public void partitionPage(Page page, OperatorContext operatorContext)
}

int outputPositionCount = replicatesAnyRow && !hasAnyRowBeenReplicated ? page.getPositionCount() + positionsAppenders.length - 1 : page.getPositionCount();
long positionsAppendersSizeBefore = getPositionsAppendersSizeInBytes();
if (page.getPositionCount() < partitionFunction.partitionCount() * COLUMNAR_STRATEGY_COEFFICIENT) {
// Partition will have on average less than COLUMNAR_STRATEGY_COEFFICIENT rows.
// Doing it column-wise would degrade performance, so we fall back to row-wise approach.
Expand All @@ -146,11 +148,68 @@ public void partitionPage(Page page, OperatorContext operatorContext)
else {
partitionPageByColumn(page);
}
long positionsAppendersSizeAfter = getPositionsAppendersSizeInBytes();
flushPositionsAppenders(false);
long outputSizeInBytes = flushPositionsAppenders(false);
updateMemoryUsage();
operatorContext.recordOutput(outputSizeInBytes, outputPositionCount);
}

private long adjustFlushedOutputSizeWithEagerlyReportedBytes(long flushedOutputSize)
{
// Reduce the flushed output size by the previously eagerly reported amount to avoid double counting
if (outputSizeReportedBeforeRelease > 0) {
long adjustmentAmount = min(flushedOutputSize, outputSizeReportedBeforeRelease);
outputSizeReportedBeforeRelease -= adjustmentAmount;
flushedOutputSize -= adjustmentAmount;
}
return flushedOutputSize;
}

operatorContext.recordOutput(positionsAppendersSizeAfter - positionsAppendersSizeBefore, outputPositionCount);
private long adjustEagerlyReportedBytesWithBufferedBytesOnRelease(long bufferedBytesOnRelease)
{
// adjust the amount to eagerly report as output by the amount already eagerly reported if the new value
// is larger, since this indicates that no data was flushed and only the delta between the two values should
// be reported eagerly
if (outputSizeReportedBeforeRelease > 0 && bufferedBytesOnRelease >= outputSizeReportedBeforeRelease) {
bufferedBytesOnRelease -= outputSizeReportedBeforeRelease;
outputSizeReportedBeforeRelease += bufferedBytesOnRelease;
}
return bufferedBytesOnRelease;
}

/**
* Prepares this {@link PagePartitioner} for release to the pool by checking for dictionary mode appenders and either flattening
* them into direct appenders or forcing their current pages to flush to preserve a valuable dictionary encoded representation. This
* is done before release because we know that after reuse, the appenders will not observe any more inputs using the same dictionary.
* <p>
* When a {@link PagePartitioner} is released back to the {@link PagePartitionerPool} we don't know if it will ever be reused. If it is not
* reused, then we have no {@link OperatorContext} we can use to report the output size of the final flushed page, so instead we report the
* buffered bytes still in the partitioner after {@link PagePartitioner#prepareForRelease(OperatorContext)} as output bytes eagerly and record
* that amount in {@link #outputSizeReportedBeforeRelease}. If the {@link PagePartitioner} is reused after having reported buffered bytes eagerly,
* we then have to subtract that same amount from the subsequent output bytes to avoid double counting them.
*/
public void prepareForRelease(OperatorContext operatorContext)
{
long bufferedSizeInBytes = 0;
long outputSizeInBytes = 0;
for (int partition = 0; partition < positionsAppenders.length; partition++) {
PositionsAppenderPageBuilder positionsAppender = positionsAppenders[partition];
Optional<Page> flushedPage = positionsAppender.flushOrFlattenBeforeRelease();
if (flushedPage.isPresent()) {
Page page = flushedPage.get();
outputSizeInBytes += page.getSizeInBytes();
enqueuePage(page, partition);
}
else {
// Dictionaries have now been flattened, so the new reported size is trustworthy to report
// eagerly
bufferedSizeInBytes += positionsAppender.getSizeInBytes();
}
}
updateMemoryUsage();
// Adjust flushed and buffered values against the previously eagerly reported sizes
outputSizeInBytes = adjustFlushedOutputSizeWithEagerlyReportedBytes(outputSizeInBytes);
bufferedSizeInBytes = adjustEagerlyReportedBytesWithBufferedBytesOnRelease(bufferedSizeInBytes);
operatorContext.recordOutput(outputSizeInBytes + bufferedSizeInBytes, 0 /* no new positions */);
}

public void partitionPageByRow(Page page)
Expand Down Expand Up @@ -210,15 +269,6 @@ public void partitionPageByColumn(Page page)
}
}

private long getPositionsAppendersSizeInBytes()
{
long sizeInBytes = 0;
for (PositionsAppenderPageBuilder pageBuilder : positionsAppenders) {
sizeInBytes += pageBuilder.getSizeInBytes();
}
return sizeInBytes;
}

private IntArrayList[] partitionPositions(Page page)
{
verify(page.getPositionCount() > 0, "position count is 0");
Expand Down Expand Up @@ -424,6 +474,7 @@ public void close()
{
try {
flushPositionsAppenders(true);
outputSizeReportedBeforeRelease = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen if this is not zeroed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably nothing, this is just precautionary to meet expected semantics for what close() should do.

}
finally {
// clear buffers before memory release
Expand All @@ -432,16 +483,19 @@ public void close()
}
}

private void flushPositionsAppenders(boolean force)
private long flushPositionsAppenders(boolean force)
{
long outputSizeInBytes = 0;
// add all full pages to output buffer
for (int partition = 0; partition < positionsAppenders.length; partition++) {
PositionsAppenderPageBuilder partitionPageBuilder = positionsAppenders[partition];
if (!partitionPageBuilder.isEmpty() && (force || partitionPageBuilder.isFull())) {
Page pagePartition = partitionPageBuilder.build();
outputSizeInBytes += pagePartition.getSizeInBytes();
enqueuePage(pagePartition, partition);
}
}
return adjustFlushedOutputSizeWithEagerlyReportedBytes(outputSizeInBytes);
}

private void enqueuePage(Page pagePartition, int partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public OperatorContext getOperatorContext()
public void finish()
{
if (!finished) {
pagePartitioner.prepareForRelease(operatorContext);
pagePartitionerPool.release(pagePartitioner);
finished = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import it.unimi.dsi.fastutil.ints.IntArrayList;

import java.util.List;
import java.util.Optional;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -144,6 +145,32 @@ public boolean isEmpty()
return declaredPositions == 0;
}

public Optional<Page> flushOrFlattenBeforeRelease()
{
if (declaredPositions == 0) {
return Optional.empty();
}

for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
if (positionsAppender.shouldForceFlushBeforeRelease()) {
// dictionary encoding will be preserved, so force the current page to be flushed
return Optional.of(build());
}
}

// transition from dictionary to direct mode if necessary, since we won't be able to reuse the
// same dictionary from the new operator
for (UnnestingPositionsAppender positionsAppender : channelAppenders) {
positionsAppender.flattenPendingDictionary();
}

// flush the current page if forced or if the builder is now full as a result of transitioning dictionaries to direct mode
if (isFull()) {
return Optional.of(build());
}
return Optional.empty();
}

public Page build()
{
Block[] blocks = new Block[channelAppenders.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.type.BlockTypeOperators.BlockPositionIsDistinctFrom;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import jakarta.annotation.Nullable;

import java.util.Optional;
Expand Down Expand Up @@ -52,6 +53,7 @@ private enum State

private State state = State.UNINITIALIZED;

@Nullable
private ValueBlock dictionary;
private DictionaryIdsBuilder dictionaryIdsBuilder;

Expand Down Expand Up @@ -219,6 +221,28 @@ void addSizesToAccumulator(PositionsAppenderSizeAccumulator accumulator)
accumulator.accumulate(sizeInBytes, directSizeInBytes);
}

public void flattenPendingDictionary()
{
if (state == State.DICTIONARY && dictionary != null) {
transitionToDirect();
}
}

public boolean shouldForceFlushBeforeRelease()
{
if (state == State.DICTIONARY && dictionary != null) {
IntOpenHashSet uniqueIdsSet = new IntOpenHashSet();
int[] dictionaryIds = dictionaryIdsBuilder.getDictionaryIds();
for (int i = 0; i < dictionaryIdsBuilder.size(); i++) {
// At least one position is referenced multiple times, preserve the dictionary encoding and force the current page to flush
if (!uniqueIdsSet.add(dictionaryIds[i])) {
return true;
}
}
}
return false;
}

private static class DictionaryIdsBuilder
{
private static final int INSTANCE_SIZE = instanceSize(DictionaryIdsBuilder.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ public void testOperatorContextStats()
partitionedOutputOperator.addInput(page);

OperatorContext operatorContext = partitionedOutputOperator.getOperatorContext();
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes());
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(0);
assertThat(operatorContext.getOutputPositions().getTotalCount()).isEqualTo(page.getPositionCount());

partitionedOutputOperator.finish();
assertThat(operatorContext.getOutputDataSize().getTotalCount()).isEqualTo(page.getSizeInBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@
import io.airlift.slice.Slices;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.block.ValueBlock;
import io.trino.spi.predicate.Utils;
import io.trino.type.BlockTypeOperators;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.Optional;

import static io.trino.block.BlockAssertions.createRandomBlockForType;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.Math.toIntExact;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -102,4 +108,59 @@ public void testFullOnDirectSizeInBytes()
assertEquals(120, result.getPositionCount(), "result positions should be below the 8192 maximum");
assertTrue(result.getBlock(0) instanceof RunLengthEncodedBlock, "result block is RLE encoded");
}

@Test
public void testFlushUsefulDictionariesOnRelease()
{
int maxPageBytes = 100;
int maxDirectSize = 1000;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

Block valueBlock = Utils.nativeValueToBlock(VARCHAR, Slices.utf8Slice("test"));
Block dictionaryBlock = DictionaryBlock.create(10, valueBlock, new int[10]);
Page inputPage = new Page(dictionaryBlock);

pageBuilder.appendToOutputPartition(inputPage, IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
// Dictionary mode appender should report the size of the ID's, but doesn't currently track
// the per-position size at all because it would be inefficient
assertEquals(Integer.BYTES * 10, pageBuilder.getSizeInBytes());
assertFalse(pageBuilder.isFull());

Optional<Page> flushedPage = pageBuilder.flushOrFlattenBeforeRelease();
assertTrue(flushedPage.isPresent(), "pageBuilder should force flush the dictionary");
assertTrue(flushedPage.get().getBlock(0) instanceof DictionaryBlock, "result should be dictionary encoded");
}

@Test
public void testFlattenUnhelpfulDictionariesOnRelease()
{
// Create unhelpful dictionary wrapping
Block valueBlock = createRandomBlockForType(VARCHAR, 10, 0.25f);
Block dictionaryBlock = DictionaryBlock.create(10, valueBlock, new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
Page inputPage = new Page(dictionaryBlock);

// Ensure the builder allows the entire value block to be inserted without being full
int maxPageBytes = toIntExact(valueBlock.getSizeInBytes() * 10);
int maxDirectSize = maxPageBytes * 10;
PositionsAppenderPageBuilder pageBuilder = PositionsAppenderPageBuilder.withMaxPageSize(
maxPageBytes,
maxDirectSize,
List.of(VARCHAR),
new PositionsAppenderFactory(new BlockTypeOperators()));

pageBuilder.appendToOutputPartition(inputPage, IntArrayList.wrap(new int[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
assertEquals(Integer.BYTES * 10, pageBuilder.getSizeInBytes());
assertFalse(pageBuilder.isFull());

assertEquals(Optional.empty(), pageBuilder.flushOrFlattenBeforeRelease(), "pageBuilder should not force a flush");
assertFalse(pageBuilder.isFull());
assertEquals(valueBlock.getSizeInBytes(), pageBuilder.getSizeInBytes(), "pageBuilder should have transitioned to direct mode");

Page result = pageBuilder.build();
assertTrue(result.getBlock(0) instanceof ValueBlock, "result should not be a dictionary block");
}
}