Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
Optional.empty(),
Optional.empty(),
10_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
JOIN_COMPILER);

return ImmutableList.of(tableScanOperator, tpchQuery1Operator, aggregationOperator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected List<? extends OperatorFactory> createOperatorFactories()
Optional.empty(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
JOIN_COMPILER);
return ImmutableList.of(tableScanOperator, aggregationOperator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static class HashAggregationOperatorFactory
private final Optional<Integer> groupIdChannel;

private final int expectedGroups;
private final DataSize maxPartialMemory;
private final Optional<DataSize> maxPartialMemory;
private final boolean spillEnabled;
private final DataSize memoryLimitForMerge;
private final DataSize memoryLimitForMergeWithMemory;
Expand All @@ -84,7 +84,7 @@ public HashAggregationOperatorFactory(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler)
{
this(operatorId,
Expand Down Expand Up @@ -120,7 +120,7 @@ public HashAggregationOperatorFactory(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
boolean spillEnabled,
DataSize unspillMemoryLimit,
SpillerFactory spillerFactory,
Expand Down Expand Up @@ -158,7 +158,7 @@ public HashAggregationOperatorFactory(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
boolean spillEnabled,
DataSize memoryLimitForMerge,
DataSize memoryLimitForMergeWithMemory,
Expand Down Expand Up @@ -250,7 +250,7 @@ public OperatorFactory duplicate()
private final Optional<Integer> hashChannel;
private final Optional<Integer> groupIdChannel;
private final int expectedGroups;
private final DataSize maxPartialMemory;
private final Optional<DataSize> maxPartialMemory;
private final boolean spillEnabled;
private final DataSize memoryLimitForMerge;
private final DataSize memoryLimitForMergeWithMemory;
Expand Down Expand Up @@ -280,7 +280,7 @@ public HashAggregationOperator(
Optional<Integer> hashChannel,
Optional<Integer> groupIdChannel,
int expectedGroups,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
boolean spillEnabled,
DataSize memoryLimitForMerge,
DataSize memoryLimitForMergeWithMemory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import static com.facebook.presto.SystemSessionProperties.isDictionaryAggregationEnabled;
import static com.facebook.presto.operator.GroupByHash.createGroupByHash;
Expand All @@ -59,7 +60,7 @@ public class InMemoryHashAggregationBuilder
private final List<Aggregator> aggregators;
private final OperatorContext operatorContext;
private final boolean partial;
private final long maxPartialMemory;
private final OptionalLong maxPartialMemory;
private final LocalMemoryContext systemMemoryContext;
private final LocalMemoryContext localUserMemoryContext;

Expand All @@ -73,7 +74,7 @@ public InMemoryHashAggregationBuilder(
List<Integer> groupByChannels,
Optional<Integer> hashChannel,
OperatorContext operatorContext,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation)
{
Expand All @@ -98,7 +99,7 @@ public InMemoryHashAggregationBuilder(
List<Integer> groupByChannels,
Optional<Integer> hashChannel,
OperatorContext operatorContext,
DataSize maxPartialMemory,
Optional<DataSize> maxPartialMemory,
Optional<Integer> overwriteIntermediateChannelOffset,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation)
Expand Down Expand Up @@ -126,7 +127,7 @@ public InMemoryHashAggregationBuilder(
updateMemory);
this.operatorContext = operatorContext;
this.partial = step.isOutputPartial();
this.maxPartialMemory = maxPartialMemory.toBytes();
this.maxPartialMemory = maxPartialMemory.map(dataSize -> OptionalLong.of(dataSize.toBytes())).orElseGet(OptionalLong::empty);
this.systemMemoryContext = operatorContext.newLocalSystemMemoryContext(InMemoryHashAggregationBuilder.class.getSimpleName());
this.localUserMemoryContext = operatorContext.localUserMemoryContext();

Expand Down Expand Up @@ -326,9 +327,10 @@ public List<Type> buildTypes()
private boolean updateMemoryWithYieldInfo()
{
long memorySize = getSizeInMemory();
if (partial) {
// if partial limit is not set, memory is considered as user memory
if (partial && maxPartialMemory.isPresent()) {
systemMemoryContext.setBytes(memorySize);
full = (memorySize > maxPartialMemory);
full = (memorySize > maxPartialMemory.getAsLong());
return true;
}
// Operator/driver will be blocked on memory after we call setBytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void rebuildHashAggregationBuilder()
groupByPartialChannels,
hashChannel,
operatorContext,
DataSize.succinctBytes(0),
Optional.of(DataSize.succinctBytes(0)),
Optional.of(overwriteIntermediateChannelOffset),
joinCompiler,
false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private void rebuildHashAggregationBuilder()
groupByChannels,
hashChannel,
operatorContext,
DataSize.succinctBytes(0),
Optional.of(DataSize.succinctBytes(0)),
joinCompiler,
false);
emptyHashAggregationBuilderSize = hashAggregationBuilder.getSizeInMemory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2173,7 +2173,14 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl
new DataSize(0, BYTE),
context,
2,
outputMapping);
outputMapping,
200,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arhimondr Assuming that this number ties to max number of output partitions, let's make the connection explicit. E.g. add a comment or, better yet, use the same configuration setting to compute this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is worth configuration. It is not a hard limit, but the initial size for the grouping hashmap. It will grow larger if needed.

// This aggregation must behave as INTERMEDIATE.
// Using INTERMEDIATE aggregation directly
// is not possible, as it doesn't accept raw input data.
// Disabling partial pre-aggregation memory limit effectively
// turns PARTIAL aggregation into INTERMEDIATE.
Optional.empty());
}).orElse(new DevNullOperatorFactory(context.getNextOperatorId(), node.getId()));

List<Integer> inputChannels = node.getColumns().stream()
Expand Down Expand Up @@ -2227,7 +2234,10 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl
new DataSize(0, BYTE),
context,
0,
outputMapping);
outputMapping,
200,
// final aggregation ignores partial pre-aggregation memory limit
Optional.empty());
}).orElse(new DevNullOperatorFactory(context.getNextOperatorId(), node.getId()));

Map<Symbol, Integer> aggregationOutput = outputMapping.build();
Expand Down Expand Up @@ -2544,7 +2554,9 @@ private PhysicalOperation planGroupByAggregation(
unspillMemoryLimit,
context,
0,
mappings);
mappings,
10_000,
Optional.of(maxPartialAggregationMemorySize));
return new PhysicalOperation(operatorFactory, mappings.build(), context, source);
}

Expand All @@ -2563,7 +2575,9 @@ private OperatorFactory createHashAggregationOperatorFactory(
DataSize unspillMemoryLimit,
LocalExecutionPlanContext context,
int startOutputChannel,
ImmutableMap.Builder<Symbol, Integer> outputMappings)
ImmutableMap.Builder<Symbol, Integer> outputMappings,
int expectedGroups,
Optional<DataSize> maxPartialAggregationMemorySize)
{
List<Symbol> aggregationOutputSymbols = new ArrayList<>();
List<AccumulatorFactory> accumulatorFactories = new ArrayList<>();
Expand Down Expand Up @@ -2626,7 +2640,7 @@ private OperatorFactory createHashAggregationOperatorFactory(
accumulatorFactories,
hashChannel,
groupIdChannel,
10_000,
expectedGroups,
maxPartialAggregationMemorySize,
spillEnabled,
unspillMemoryLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private OperatorFactory createHashAggregationOperatorFactory(Optional<Integer> h
hashChannel,
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
false,
succinctBytes(8),
succinctBytes(Integer.MAX_VALUE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testHashAggregation(boolean hashEnabled, boolean spillEnabled, long
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand Down Expand Up @@ -243,7 +243,7 @@ public void testHashAggregationWithGlobals(boolean hashEnabled, boolean spillEna
rowPagesBuilder.getHashChannel(),
groupIdChannel,
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand Down Expand Up @@ -290,7 +290,7 @@ public void testHashAggregationMemoryReservation(boolean hashEnabled, boolean sp
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testMemoryLimit(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

toPages(operatorFactory, driverContext, input);
Expand Down Expand Up @@ -370,7 +370,7 @@ public void testHashBuilderResize(boolean hashEnabled, boolean spillEnabled, lon
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
spillEnabled,
succinctBytes(memoryLimitForMerge),
succinctBytes(memoryLimitForMergeWithMemory),
Expand All @@ -395,7 +395,7 @@ public void testMemoryReservationYield(Type type)
Optional.of(1),
Optional.empty(),
1,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

// get result with yield; pick a relatively small buffer for aggregator's memory usage
Expand Down Expand Up @@ -446,7 +446,7 @@ public void testHashBuilderResizeLimit(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

toPages(operatorFactory, driverContext, input);
Expand Down Expand Up @@ -479,7 +479,7 @@ public void testMultiSliceAggregationOutput(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
joinCompiler);

assertEquals(toPages(operatorFactory, createDriverContext(), input).size(), 2);
Expand Down Expand Up @@ -509,7 +509,7 @@ public void testMultiplePartialFlushes(boolean hashEnabled)
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(1, KILOBYTE),
Optional.of(new DataSize(1, KILOBYTE)),
joinCompiler);

DriverContext driverContext = createDriverContext(1024);
Expand Down Expand Up @@ -584,7 +584,7 @@ public void testMergeWithMemorySpill()
rowPagesBuilder.getHashChannel(),
Optional.empty(),
1,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
true,
new DataSize(smallPagesSpillThresholdSize, Unit.BYTE),
succinctBytes(Integer.MAX_VALUE),
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testSpillerFailure()
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
new DataSize(16, MEGABYTE),
Optional.of(new DataSize(16, MEGABYTE)),
true,
succinctBytes(8),
succinctBytes(Integer.MAX_VALUE),
Expand Down