Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class MemoryManagerConfig
private DataSize softMaxQueryTotalMemory;
private String lowMemoryKillerPolicy = LowMemoryKillerPolicy.NONE;
private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES);
private boolean tableFinishOperatorMemoryTrackingEnabled;

public String getLowMemoryKillerPolicy()
{
Expand Down Expand Up @@ -131,6 +132,18 @@ public MemoryManagerConfig setSoftMaxQueryTotalMemory(DataSize softMaxQueryTotal
return this;
}

public boolean isTableFinishOperatorMemoryTrackingEnabled()
{
return tableFinishOperatorMemoryTrackingEnabled;
}

@Config("table-finish-operator-memory-tracking-enabled")
public MemoryManagerConfig setTableFinishOperatorMemoryTrackingEnabled(boolean tableFinishOperatorMemoryTrackingEnabled)
{
this.tableFinishOperatorMemoryTrackingEnabled = tableFinishOperatorMemoryTrackingEnabled;
return this;
}

public static class LowMemoryKillerPolicy
{
public static final String NONE = "none";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static class TableFinishOperatorFactory
private final StatisticAggregationsDescriptor<Integer> descriptor;
private final Session session;
private final JsonCodec<TableCommitContext> tableCommitContextCodec;
private final boolean memoryTrackingEnabled;

private boolean closed;

Expand All @@ -90,7 +91,8 @@ public TableFinishOperatorFactory(
OperatorFactory statisticsAggregationOperatorFactory,
StatisticAggregationsDescriptor<Integer> descriptor,
Session session,
JsonCodec<TableCommitContext> tableCommitContextCodec)
JsonCodec<TableCommitContext> tableCommitContextCodec,
boolean memoryTrackingEnabled)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand All @@ -100,6 +102,7 @@ public TableFinishOperatorFactory(
this.descriptor = requireNonNull(descriptor, "descriptor is null");
this.session = requireNonNull(session, "session is null");
this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
this.memoryTrackingEnabled = memoryTrackingEnabled;
}

@Override
Expand All @@ -109,7 +112,15 @@ public Operator createOperator(DriverContext driverContext)
OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableFinishOperator.class.getSimpleName());
Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext);
boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session);
return new TableFinishOperator(context, tableFinisher, pageSinkCommitter, statisticsAggregationOperator, descriptor, statisticsCpuTimerEnabled, tableCommitContextCodec);
return new TableFinishOperator(
context,
tableFinisher,
pageSinkCommitter,
statisticsAggregationOperator,
descriptor,
statisticsCpuTimerEnabled,
memoryTrackingEnabled,
tableCommitContextCodec);
}

@Override
Expand All @@ -121,7 +132,16 @@ public void noMoreOperators()
@Override
public OperatorFactory duplicate()
{
return new TableFinishOperatorFactory(operatorId, planNodeId, tableFinisher, pageSinkCommitter, statisticsAggregationOperatorFactory, descriptor, session, tableCommitContextCodec);
return new TableFinishOperatorFactory(
operatorId,
planNodeId,
tableFinisher,
pageSinkCommitter,
statisticsAggregationOperatorFactory,
descriptor,
session,
tableCommitContextCodec,
memoryTrackingEnabled);
}
}

Expand All @@ -141,6 +161,7 @@ private enum State

private final OperationTiming statisticsTiming = new OperationTiming();
private final boolean statisticsCpuTimerEnabled;
private final boolean memoryTrackingEnabled;

private final JsonCodec<TableCommitContext> tableCommitContextCodec;
private final LifespanAndStageStateTracker lifespanAndStageStateTracker;
Expand All @@ -155,13 +176,15 @@ public TableFinishOperator(
Operator statisticsAggregationOperator,
StatisticAggregationsDescriptor<Integer> descriptor,
boolean statisticsCpuTimerEnabled,
boolean memoryTrackingEnabled,
JsonCodec<TableCommitContext> tableCommitContextCodec)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.tableFinisher = requireNonNull(tableFinisher, "tableCommitter is null");
this.statisticsAggregationOperator = requireNonNull(statisticsAggregationOperator, "statisticsAggregationOperator is null");
this.descriptor = requireNonNull(descriptor, "descriptor is null");
this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled;
this.memoryTrackingEnabled = memoryTrackingEnabled;
this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null");
this.lifespanAndStageStateTracker = new LifespanAndStageStateTracker(pageSinkCommitter, operatorRetainedMemoryBytes);
this.systemMemoryContext = operatorContext.localSystemMemoryContext();
Expand Down Expand Up @@ -225,7 +248,9 @@ public void addInput(Page page)
statisticsAggregationOperator.addInput(statisticsPage);
timer.end(statisticsTiming);
});
systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get());
if (memoryTrackingEnabled) {
systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.facebook.presto.expressions.DynamicFilters.DynamicFilterPlaceholder;
import com.facebook.presto.expressions.LogicalRowExpressions;
import com.facebook.presto.index.IndexManager;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.metadata.AnalyzeTableHandle;
import com.facebook.presto.metadata.ConnectorMetadataUpdaterManager;
import com.facebook.presto.metadata.FunctionAndTypeManager;
Expand Down Expand Up @@ -357,6 +358,7 @@ public class LocalExecutionPlanner
private final LogicalRowExpressions logicalRowExpressions;
private final FragmentResultCacheManager fragmentResultCacheManager;
private final ObjectMapper objectMapper;
private final boolean tableFinishOperatorMemoryTrackingEnabled;

private static final TypeSignature SPHERICAL_GEOGRAPHY_TYPE_SIGNATURE = parseTypeSignature("SphericalGeography");

Expand All @@ -375,6 +377,7 @@ public LocalExecutionPlanner(
JoinFilterFunctionCompiler joinFilterFunctionCompiler,
IndexJoinLookupStats indexJoinLookupStats,
TaskManagerConfig taskManagerConfig,
MemoryManagerConfig memoryManagerConfig,
SpillerFactory spillerFactory,
SingleStreamSpillerFactory singleStreamSpillerFactory,
PartitioningSpillerFactory partitioningSpillerFactory,
Expand Down Expand Up @@ -419,6 +422,7 @@ public LocalExecutionPlanner(
metadata.getFunctionAndTypeManager());
this.fragmentResultCacheManager = requireNonNull(fragmentResultCacheManager, "fragmentResultCacheManager is null");
this.objectMapper = requireNonNull(objectMapper, "objectMapper is null");
this.tableFinishOperatorMemoryTrackingEnabled = requireNonNull(memoryManagerConfig, "memoryManagerConfig is null").isTableFinishOperatorMemoryTrackingEnabled();
}

public LocalExecutionPlan plan(
Expand Down Expand Up @@ -2688,7 +2692,8 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl
statisticsAggregation,
descriptor,
session,
tableCommitContextCodec);
tableCommitContextCodec,
tableFinishOperatorMemoryTrackingEnabled);
Map<VariableReferenceExpression, Integer> layout = ImmutableMap.of(node.getOutputVariables().get(0), 0);

return new PhysicalOperation(operatorFactory, layout, context, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,7 @@ private List<Driver> createDrivers(Session session, Plan plan, OutputFactory out
joinFilterFunctionCompiler,
new IndexJoinLookupStats(),
new TaskManagerConfig().setTaskConcurrency(4),
new MemoryManagerConfig(),
spillerFactory,
singleStreamSpillerFactory,
partitioningSpillerFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.execution.scheduler.TableWriteInfo;
import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionStats;
import com.facebook.presto.index.IndexManager;
import com.facebook.presto.memory.MemoryManagerConfig;
import com.facebook.presto.metadata.ConnectorMetadataUpdaterManager;
import com.facebook.presto.metadata.InMemoryNodeManager;
import com.facebook.presto.metadata.MetadataManager;
Expand Down Expand Up @@ -161,6 +162,7 @@ public static LocalExecutionPlanner createTestingPlanner()
new JoinFilterFunctionCompiler(metadata),
new IndexJoinLookupStats(),
new TaskManagerConfig(),
new MemoryManagerConfig(),
new GenericSpillerFactory((types, spillContext, memoryContext) -> {
throw new UnsupportedOperationException();
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void testDefaults()
.setMaxQueryMemory(new DataSize(20, GIGABYTE))
.setSoftMaxQueryMemory(new DataSize(20, GIGABYTE))
.setMaxQueryTotalMemory(new DataSize(40, GIGABYTE))
.setSoftMaxQueryTotalMemory(new DataSize(40, GIGABYTE)));
.setSoftMaxQueryTotalMemory(new DataSize(40, GIGABYTE))
.setTableFinishOperatorMemoryTrackingEnabled(false));
}

@Test
Expand All @@ -53,6 +54,7 @@ public void testExplicitPropertyMappings()
.put("query.soft-max-memory", "1GB")
.put("query.max-total-memory", "3GB")
.put("query.soft-max-total-memory", "2GB")
.put("table-finish-operator-memory-tracking-enabled", "true")
.build();

MemoryManagerConfig expected = new MemoryManagerConfig()
Expand All @@ -61,7 +63,8 @@ public void testExplicitPropertyMappings()
.setMaxQueryMemory(new DataSize(2, GIGABYTE))
.setSoftMaxQueryMemory(new DataSize(1, GIGABYTE))
.setMaxQueryTotalMemory(new DataSize(3, GIGABYTE))
.setSoftMaxQueryTotalMemory(new DataSize(2, GIGABYTE));
.setSoftMaxQueryTotalMemory(new DataSize(2, GIGABYTE))
.setTableFinishOperatorMemoryTrackingEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public void testStatisticsAggregation()
true),
descriptor,
session,
TABLE_COMMIT_CONTEXT_CODEC);
TABLE_COMMIT_CONTEXT_CODEC,
false);
DriverContext driverContext = createTaskContext(scheduledExecutor, scheduledExecutor, session)
.addPipelineContext(0, true, true, false)
.addDriverContext();
Expand Down Expand Up @@ -201,7 +202,8 @@ public void testTableWriteCommit()
true),
descriptor,
session,
TABLE_COMMIT_CONTEXT_CODEC);
TABLE_COMMIT_CONTEXT_CODEC,
false);
DriverContext driverContext = createTaskContext(scheduledExecutor, scheduledExecutor, session)
.addPipelineContext(0, true, true, false)
.addDriverContext();
Expand Down