Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.IntArrayBlock;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;

import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -101,7 +102,8 @@ private static class DistinctAccumulator
implements Accumulator
{
private final Accumulator accumulator;
private final MarkDistinctHash hash;
@Nullable
private MarkDistinctHash hash; // null after evaluateFinal() is called

private DistinctAccumulator(
Accumulator accumulator,
Expand All @@ -120,7 +122,11 @@ private DistinctAccumulator(
@Override
public long getEstimatedSize()
{
return hash.getEstimatedSize() + accumulator.getEstimatedSize();
long estimatedSize = accumulator.getEstimatedSize();
if (hash != null) {
estimatedSize += hash.getEstimatedSize();
}
return estimatedSize;
}

@Override
Expand All @@ -132,6 +138,7 @@ public Accumulator copy()
@Override
public void addInput(Page arguments, AggregationMask mask)
{
checkState(hash != null, "evaluateFinal() has already been called");
// 1. filter out positions based on mask, if present
Page filtered = mask.filterPage(arguments);

Expand Down Expand Up @@ -166,6 +173,8 @@ public void evaluateIntermediate(BlockBuilder blockBuilder)
@Override
public void evaluateFinal(BlockBuilder blockBuilder)
{
// release hash memory since it's no longer needed
hash = null;
accumulator.evaluateFinal(blockBuilder);
}
}
Expand All @@ -174,7 +183,8 @@ private static class DistinctGroupedAccumulator
implements GroupedAccumulator
{
private final GroupedAccumulator accumulator;
private final MarkDistinctHash hash;
@Nullable
private MarkDistinctHash hash; // null after prepareFinal() is called

private DistinctGroupedAccumulator(
GroupedAccumulator accumulator,
Expand All @@ -196,7 +206,11 @@ private DistinctGroupedAccumulator(
@Override
public long getEstimatedSize()
{
return hash.getEstimatedSize() + accumulator.getEstimatedSize();
long estimatedSize = accumulator.getEstimatedSize();
if (hash != null) {
estimatedSize += hash.getEstimatedSize();
}
return estimatedSize;
}

@Override
Expand All @@ -208,6 +222,7 @@ public void setGroupCount(int groupCount)
@Override
public void addInput(int[] groupIds, Page page, AggregationMask mask)
{
checkState(hash != null, "prepareFinal() has already been called");
// 1. filter out positions based on mask
groupIds = maskGroupIds(groupIds, mask);
page = mask.filterPage(page);
Expand Down Expand Up @@ -261,6 +276,10 @@ public void evaluateFinal(int groupId, BlockBuilder output)
}

@Override
public void prepareFinal() {}
public void prepareFinal()
{
// release hash memory after all inputs have been added
hash = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.spi.block.IntArrayBlock;
import io.trino.spi.connector.SortOrder;
import io.trino.spi.type.Type;
import jakarta.annotation.Nullable;

import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -31,6 +32,7 @@
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static io.trino.spi.type.IntegerType.INTEGER;
import static java.lang.Long.max;
Expand Down Expand Up @@ -109,7 +111,8 @@ private static class OrderedAccumulator
private final int[] argumentChannels;
private final List<Integer> orderByChannels;
private final List<SortOrder> orderings;
private final PagesIndex pagesIndex;
@Nullable
private PagesIndex pagesIndex; // null after evaluateFinal() is called

private OrderedAccumulator(
Accumulator accumulator,
Expand All @@ -129,7 +132,11 @@ private OrderedAccumulator(
@Override
public long getEstimatedSize()
{
return pagesIndex.getEstimatedSize().toBytes() + accumulator.getEstimatedSize();
long estimatedSize = accumulator.getEstimatedSize();
if (pagesIndex != null) {
estimatedSize += pagesIndex.getEstimatedSize().toBytes();
}
return estimatedSize;
}

@Override
Expand All @@ -141,6 +148,7 @@ public Accumulator copy()
@Override
public void addInput(Page page, AggregationMask mask)
{
checkState(pagesIndex != null, "evaluateFinal() already called");
pagesIndex.addPage(mask.filterPage(page));
}

Expand All @@ -159,13 +167,16 @@ public void evaluateIntermediate(BlockBuilder blockBuilder)
@Override
public void evaluateFinal(BlockBuilder blockBuilder)
{
checkState(pagesIndex != null, "evaluateFinal() already called");
pagesIndex.sort(orderByChannels, orderings);
Iterator<Page> pagesIterator = pagesIndex.getSortedPages();
AggregationMask mask = AggregationMask.createSelectAll(0);
pagesIterator.forEachRemaining(arguments -> {
mask.reset(arguments.getPositionCount());
accumulator.addInput(arguments.getColumns(argumentChannels), mask);
});
// release pagesIndex memory after transferring its contents into the accumulator
pagesIndex = null;
accumulator.evaluateFinal(blockBuilder);
}
}
Expand All @@ -177,7 +188,8 @@ private static class OrderingGroupedAccumulator
private final int[] argumentChannels;
private final List<Integer> orderByChannels;
private final List<SortOrder> orderings;
private final PagesIndex pagesIndex;
@Nullable
private PagesIndex pagesIndex; // null after prepareFinal() is called
private long groupCount;

private OrderingGroupedAccumulator(
Expand All @@ -203,7 +215,11 @@ private OrderingGroupedAccumulator(
@Override
public long getEstimatedSize()
{
return pagesIndex.getEstimatedSize().toBytes() + accumulator.getEstimatedSize();
long estimatedSize = accumulator.getEstimatedSize();
if (pagesIndex != null) {
estimatedSize += pagesIndex.getEstimatedSize().toBytes();
}
return estimatedSize;
}

@Override
Expand All @@ -216,6 +232,7 @@ public void setGroupCount(int groupCount)
@Override
public void addInput(int[] groupIds, Page page, AggregationMask mask)
{
checkState(pagesIndex != null, "prepareFinal() already called");
if (mask.isSelectNone()) {
return;
}
Expand Down Expand Up @@ -248,6 +265,7 @@ public void evaluateFinal(int groupId, BlockBuilder output)
@Override
public void prepareFinal()
{
checkState(pagesIndex != null, "prepareFinal() already called");
pagesIndex.sort(orderByChannels, orderings);
Iterator<Page> pagesIterator = pagesIndex.getSortedPages();
AggregationMask mask = AggregationMask.createSelectAll(0);
Expand All @@ -258,6 +276,8 @@ public void prepareFinal()
page.getColumns(argumentChannels),
mask);
});
// release pagesIndex memory after transferring its contents into the accumulator
pagesIndex = null;
}

private static int[] extractGroupIds(Page page)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,17 @@ public long getGroupCount()
@Override
public WorkProcessor<Page> buildResult()
{
groupByHash.startReleasingOutput();
for (GroupedAggregator groupedAggregator : groupedAggregators) {
groupedAggregator.prepareFinal();
}
// Only incrementally release memory for final aggregations, since partial aggregations have a fixed
// memory limit and can be expected to fully flush and release their output quickly
// Always update the current memory usage after calling GroupedAggregator#prepareFinal(), since it can increase
// memory consumption significantly in some situations. This also captures any memory usage reduction the
// groupByHash may have initiated for releasing output
updateMemory();
// Only incrementally release memory while producing output for final aggregations, since partial aggregations
// have a fixed memory limit and can be expected to fully flush and release their output quickly
boolean releaseMemoryOnOutput = !partial;
if (releaseMemoryOnOutput) {
groupByHash.startReleasingOutput();
}
return buildResult(consecutiveGroupIds(), new PageBuilder(buildTypes()), false, releaseMemoryOnOutput);
}

Expand Down