Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ private void initializeAggregationBuilderIfNeeded()
maxPartialMemory,
joinCompiler,
true,
useSystemMemory ? ReserveType.SYSTEM : ReserveType.USER);
useSystemMemory);
}
else {
verify(!useSystemMemory, "using system memory in spillable aggregations is not supported");
Expand Down Expand Up @@ -667,11 +667,4 @@ private static long calculateDefaultOutputHash(List<Type> groupByChannels, int g
}
return result;
}

public enum ReserveType
{
USER,
SYSTEM,
REVOCABLE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.GroupByHash;
import com.facebook.presto.operator.HashAggregationOperator.ReserveType;
import com.facebook.presto.operator.HashCollisionsCounter;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.TransformWork;
Expand All @@ -45,7 +44,6 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;

import static com.facebook.presto.SystemSessionProperties.isDictionaryAggregationEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand All @@ -63,8 +61,7 @@ public class InMemoryHashAggregationBuilder
private final OptionalLong maxPartialMemory;
private final LocalMemoryContext systemMemoryContext;
private final LocalMemoryContext localUserMemoryContext;
private final ReserveType reserveType;
private final Consumer<Long> memoryConsumer;
private final boolean useSystemMemory;

private boolean full;

Expand All @@ -79,7 +76,7 @@ public InMemoryHashAggregationBuilder(
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation,
ReserveType reserveType)
boolean useSystemMemory)
{
this(accumulatorFactories,
step,
Expand All @@ -92,36 +89,7 @@ public InMemoryHashAggregationBuilder(
Optional.empty(),
joinCompiler,
yieldForMemoryReservation,
reserveType,
Optional.empty());
}

public InMemoryHashAggregationBuilder(
List<AccumulatorFactory> accumulatorFactories,
Step step,
int expectedGroups,
List<Type> groupByTypes,
List<Integer> groupByChannels,
Optional<Integer> hashChannel,
OperatorContext operatorContext,
Optional<DataSize> maxPartialMemory,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation,
Optional<Consumer<Long>> memoryConsumer)
{
this(accumulatorFactories,
step,
expectedGroups,
groupByTypes,
groupByChannels,
hashChannel,
operatorContext,
maxPartialMemory,
Optional.empty(),
joinCompiler,
yieldForMemoryReservation,
ReserveType.REVOCABLE,
memoryConsumer);
useSystemMemory);
}

public InMemoryHashAggregationBuilder(
Expand All @@ -136,31 +104,16 @@ public InMemoryHashAggregationBuilder(
Optional<Integer> overwriteIntermediateChannelOffset,
JoinCompiler joinCompiler,
boolean yieldForMemoryReservation,
ReserveType reserveType,
Optional<Consumer<Long>> memoryConsumer)
boolean useSystemMemory)
{
// reserveType is REVOCABLE implies current InMemoryHashAggregationBuilder is built from SpillableHashAggregationBuilder
// and it will accept a customized memoryConsumer for memory update
if (reserveType == ReserveType.REVOCABLE) {
checkArgument(memoryConsumer.isPresent(),
"memoryConsumer must be present when reserve type is REVOCABLE");
}

this.reserveType = reserveType;
if (memoryConsumer.isPresent()) {
this.memoryConsumer = memoryConsumer.get();
}
else {
this.memoryConsumer = this::updateMemory;
}

UpdateMemory updateMemory;
if (yieldForMemoryReservation) {
updateMemory = this::updateMemoryWithYieldInfo;
}
else {
// Report memory usage but do not yield for memory.
// This is specially used for spillable hash aggregation operator.
// TODO: revisit this when spillable hash aggregation operator is turned on
updateMemory = () -> {
updateMemoryWithYieldInfo();
return true;
Expand All @@ -179,6 +132,7 @@ public InMemoryHashAggregationBuilder(
this.maxPartialMemory = maxPartialMemory.map(dataSize -> OptionalLong.of(dataSize.toBytes())).orElseGet(OptionalLong::empty);
this.systemMemoryContext = operatorContext.newLocalSystemMemoryContext(InMemoryHashAggregationBuilder.class.getSimpleName());
this.localUserMemoryContext = operatorContext.localUserMemoryContext();
this.useSystemMemory = useSystemMemory;

// wrapper each function with an aggregator
ImmutableList.Builder<Aggregator> builder = ImmutableList.builder();
Expand All @@ -197,7 +151,7 @@ public InMemoryHashAggregationBuilder(
@Override
public void close()
{
memoryConsumer.accept(0L);
updateMemory(0);
}

@Override
Expand Down Expand Up @@ -372,28 +326,24 @@ private boolean updateMemoryWithYieldInfo()
{
long memorySize = getSizeInMemory();
if (partial && maxPartialMemory.isPresent()) {
memoryConsumer.accept(memorySize);
updateMemory(memorySize);
full = (memorySize > maxPartialMemory.getAsLong());
return true;
}
// Operator/driver will be blocked on memory after we call setBytes.
// If memory is not available, once we return, this operator will be blocked until memory is available.
memoryConsumer.accept(memorySize);
updateMemory(memorySize);
// If memory is not available, inform the caller that we cannot proceed for allocation.
return operatorContext.isWaitingForMemory().isDone();
}

private void updateMemory(long memorySize)
{
switch (reserveType) {
case USER:
localUserMemoryContext.setBytes(memorySize);
break;
case SYSTEM:
systemMemoryContext.setBytes(memorySize);
break;
default:
throw new AssertionError("InMemoryHashAggregationBuilder do not support reserve type: " + reserveType);
if (useSystemMemory) {
systemMemoryContext.setBytes(memorySize);
}
else {
localUserMemoryContext.setBytes(memorySize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.facebook.presto.common.Page;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.HashAggregationOperator.ReserveType;
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.WorkProcessor;
import com.facebook.presto.operator.WorkProcessor.Transformation;
Expand Down Expand Up @@ -151,7 +150,6 @@ private void rebuildHashAggregationBuilder()
Optional.of(overwriteIntermediateChannelOffset),
joinCompiler,
false,
ReserveType.USER,
Optional.empty());
false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private void rebuildHashAggregationBuilder()
Optional.of(DataSize.succinctBytes(0)),
joinCompiler,
false,
Optional.of((memorySize) -> localRevocableMemoryContext.setBytes(memorySize)));
false);
emptyHashAggregationBuilderSize = hashAggregationBuilder.getSizeInMemory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,6 @@ public static TaskContext createTaskContext(Executor notificationExecutor, Sched
.build();
}

public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session,
DataSize maxMemory, DataSize maxTotalMemory)
{
return builder(notificationExecutor, yieldExecutor, session)
.setQueryMaxMemory(maxMemory)
.setQueryMaxTotalMemory(maxTotalMemory)
.build();
}

public static TaskContext createTaskContext(Executor notificationExecutor, ScheduledExecutorService yieldExecutor, Session session, TaskStateMachine taskStateMachine)
{
return builder(notificationExecutor, yieldExecutor, session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,54 +620,6 @@ public void testMergeWithMemorySpill()
assertOperatorEqualsIgnoreOrder(operatorFactory, driverContext, input, resultBuilder.build());
}

@Test
public void testMemoryLimitInSpillWhenTriggerRehash()
{
RowPagesBuilder rowPagesBuilder = rowPagesBuilder(BIGINT);

int smallPagesSpillThresholdSize = 100000;

List<Page> input = rowPagesBuilder
.addSequencePage(smallPagesSpillThresholdSize, 0)
.addSequencePage(smallPagesSpillThresholdSize, smallPagesSpillThresholdSize)
.addSequencePage(smallPagesSpillThresholdSize, 2 * smallPagesSpillThresholdSize)
.addSequencePage(smallPagesSpillThresholdSize, 3 * smallPagesSpillThresholdSize)
.build();

HashAggregationOperatorFactory operatorFactory = new HashAggregationOperatorFactory(
0,
new PlanNodeId("test"),
ImmutableList.of(BIGINT),
ImmutableList.of(0),
ImmutableList.of(),
ImmutableList.of(),
Step.SINGLE,
false,
ImmutableList.of(generateAccumulatorFactory(LONG_SUM, ImmutableList.of(0), Optional.empty())),
rowPagesBuilder.getHashChannel(),
Optional.empty(),
1,
Optional.of(new DataSize(16, MEGABYTE)),
true,
new DataSize(smallPagesSpillThresholdSize, Unit.BYTE),
succinctBytes(Integer.MAX_VALUE),
spillerFactory,
joinCompiler,
false);

TaskContext taskContext = createTaskContext(executor, scheduledExecutor, TEST_SESSION,
new DataSize(10, MEGABYTE), new DataSize(20, MEGABYTE));
DriverContext driverContext = taskContext
.addPipelineContext(0, true, true, false)
.addDriverContext();

MaterializedResult.Builder resultBuilder = resultBuilder(driverContext.getSession(), BIGINT, BIGINT);
for (int i = 0; i < 4 * smallPagesSpillThresholdSize; ++i) {
resultBuilder.row((long) i, (long) i);
}
assertOperatorEqualsIgnoreOrder(operatorFactory, driverContext, input, resultBuilder.build());
}

@Test
public void testSpillerFailure()
{
Expand Down