diff --git a/presto-main/src/main/java/com/facebook/presto/memory/MemoryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/memory/MemoryManagerConfig.java index 880fdf6910d87..68a14787a8601 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/MemoryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/MemoryManagerConfig.java @@ -39,6 +39,7 @@ public class MemoryManagerConfig private DataSize softMaxQueryTotalMemory; private String lowMemoryKillerPolicy = LowMemoryKillerPolicy.NONE; private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES); + private boolean tableFinishOperatorMemoryTrackingEnabled; public String getLowMemoryKillerPolicy() { @@ -131,6 +132,18 @@ public MemoryManagerConfig setSoftMaxQueryTotalMemory(DataSize softMaxQueryTotal return this; } + public boolean isTableFinishOperatorMemoryTrackingEnabled() + { + return tableFinishOperatorMemoryTrackingEnabled; + } + + @Config("table-finish-operator-memory-tracking-enabled") + public MemoryManagerConfig setTableFinishOperatorMemoryTrackingEnabled(boolean tableFinishOperatorMemoryTrackingEnabled) + { + this.tableFinishOperatorMemoryTrackingEnabled = tableFinishOperatorMemoryTrackingEnabled; + return this; + } + public static class LowMemoryKillerPolicy { public static final String NONE = "none"; diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java index b327121372bae..9e97ccc5ba429 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TableFinishOperator.java @@ -79,6 +79,7 @@ public static class TableFinishOperatorFactory private final StatisticAggregationsDescriptor descriptor; private final Session session; private final JsonCodec tableCommitContextCodec; + private final boolean memoryTrackingEnabled; private boolean closed; @@ -90,7 +91,8 @@ public TableFinishOperatorFactory( OperatorFactory statisticsAggregationOperatorFactory, StatisticAggregationsDescriptor descriptor, Session session, - JsonCodec tableCommitContextCodec) + JsonCodec tableCommitContextCodec, + boolean memoryTrackingEnabled) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); @@ -100,6 +102,7 @@ public TableFinishOperatorFactory( this.descriptor = requireNonNull(descriptor, "descriptor is null"); this.session = requireNonNull(session, "session is null"); this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null"); + this.memoryTrackingEnabled = memoryTrackingEnabled; } @Override @@ -109,7 +112,15 @@ public Operator createOperator(DriverContext driverContext) OperatorContext context = driverContext.addOperatorContext(operatorId, planNodeId, TableFinishOperator.class.getSimpleName()); Operator statisticsAggregationOperator = statisticsAggregationOperatorFactory.createOperator(driverContext); boolean statisticsCpuTimerEnabled = !(statisticsAggregationOperator instanceof DevNullOperator) && isStatisticsCpuTimerEnabled(session); - return new TableFinishOperator(context, tableFinisher, pageSinkCommitter, statisticsAggregationOperator, descriptor, statisticsCpuTimerEnabled, tableCommitContextCodec); + return new TableFinishOperator( + context, + tableFinisher, + pageSinkCommitter, + statisticsAggregationOperator, + descriptor, + statisticsCpuTimerEnabled, + memoryTrackingEnabled, + tableCommitContextCodec); } @Override @@ -121,7 +132,16 @@ public void noMoreOperators() @Override public OperatorFactory duplicate() { - return new TableFinishOperatorFactory(operatorId, planNodeId, tableFinisher, pageSinkCommitter, statisticsAggregationOperatorFactory, descriptor, session, tableCommitContextCodec); + return new TableFinishOperatorFactory( + operatorId, + planNodeId, + tableFinisher, + pageSinkCommitter, + statisticsAggregationOperatorFactory, + descriptor, + session, + tableCommitContextCodec, + memoryTrackingEnabled); } } @@ -141,6 +161,7 @@ private enum State private final OperationTiming statisticsTiming = new OperationTiming(); private final boolean statisticsCpuTimerEnabled; + private final boolean memoryTrackingEnabled; private final JsonCodec tableCommitContextCodec; private final LifespanAndStageStateTracker lifespanAndStageStateTracker; @@ -155,6 +176,7 @@ public TableFinishOperator( Operator statisticsAggregationOperator, StatisticAggregationsDescriptor descriptor, boolean statisticsCpuTimerEnabled, + boolean memoryTrackingEnabled, JsonCodec tableCommitContextCodec) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); @@ -162,6 +184,7 @@ public TableFinishOperator( this.statisticsAggregationOperator = requireNonNull(statisticsAggregationOperator, "statisticsAggregationOperator is null"); this.descriptor = requireNonNull(descriptor, "descriptor is null"); this.statisticsCpuTimerEnabled = statisticsCpuTimerEnabled; + this.memoryTrackingEnabled = memoryTrackingEnabled; this.tableCommitContextCodec = requireNonNull(tableCommitContextCodec, "tableCommitContextCodec is null"); this.lifespanAndStageStateTracker = new LifespanAndStageStateTracker(pageSinkCommitter, operatorRetainedMemoryBytes); this.systemMemoryContext = operatorContext.localSystemMemoryContext(); @@ -225,7 +248,9 @@ public void addInput(Page page) statisticsAggregationOperator.addInput(statisticsPage); timer.end(statisticsTiming); }); - systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get()); + if (memoryTrackingEnabled) { + systemMemoryContext.setBytes(operatorRetainedMemoryBytes.get()); + } } @Override diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 8ec7177d76a33..1e1fdb6f99666 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -44,6 +44,7 @@ import com.facebook.presto.expressions.DynamicFilters.DynamicFilterPlaceholder; import com.facebook.presto.expressions.LogicalRowExpressions; import com.facebook.presto.index.IndexManager; +import com.facebook.presto.memory.MemoryManagerConfig; import com.facebook.presto.metadata.AnalyzeTableHandle; import com.facebook.presto.metadata.ConnectorMetadataUpdaterManager; import com.facebook.presto.metadata.FunctionAndTypeManager; @@ -357,6 +358,7 @@ public class LocalExecutionPlanner private final LogicalRowExpressions logicalRowExpressions; private final FragmentResultCacheManager fragmentResultCacheManager; private final ObjectMapper objectMapper; + private final boolean tableFinishOperatorMemoryTrackingEnabled; private static final TypeSignature SPHERICAL_GEOGRAPHY_TYPE_SIGNATURE = parseTypeSignature("SphericalGeography"); @@ -375,6 +377,7 @@ public LocalExecutionPlanner( JoinFilterFunctionCompiler joinFilterFunctionCompiler, IndexJoinLookupStats indexJoinLookupStats, TaskManagerConfig taskManagerConfig, + MemoryManagerConfig memoryManagerConfig, SpillerFactory spillerFactory, SingleStreamSpillerFactory singleStreamSpillerFactory, PartitioningSpillerFactory partitioningSpillerFactory, @@ -419,6 +422,7 @@ public LocalExecutionPlanner( metadata.getFunctionAndTypeManager()); this.fragmentResultCacheManager = requireNonNull(fragmentResultCacheManager, "fragmentResultCacheManager is null"); this.objectMapper = requireNonNull(objectMapper, "objectMapper is null"); + this.tableFinishOperatorMemoryTrackingEnabled = requireNonNull(memoryManagerConfig, "memoryManagerConfig is null").isTableFinishOperatorMemoryTrackingEnabled(); } public LocalExecutionPlan plan( @@ -2688,7 +2692,8 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl statisticsAggregation, descriptor, session, - tableCommitContextCodec); + tableCommitContextCodec, + tableFinishOperatorMemoryTrackingEnabled); Map layout = ImmutableMap.of(node.getOutputVariables().get(0), 0); return new PhysicalOperation(operatorFactory, layout, context, source); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java index 35ad456bd45c4..e5afa717606e4 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/LocalQueryRunner.java @@ -794,6 +794,7 @@ private List createDrivers(Session session, Plan plan, OutputFactory out joinFilterFunctionCompiler, new IndexJoinLookupStats(), new TaskManagerConfig().setTaskConcurrency(4), + new MemoryManagerConfig(), spillerFactory, singleStreamSpillerFactory, partitioningSpillerFactory, diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java index 9473bbaf735ef..cfac77f02096d 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TaskTestUtils.java @@ -27,6 +27,7 @@ import com.facebook.presto.execution.scheduler.TableWriteInfo; import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionStats; import com.facebook.presto.index.IndexManager; +import com.facebook.presto.memory.MemoryManagerConfig; import com.facebook.presto.metadata.ConnectorMetadataUpdaterManager; import com.facebook.presto.metadata.InMemoryNodeManager; import com.facebook.presto.metadata.MetadataManager; @@ -161,6 +162,7 @@ public static LocalExecutionPlanner createTestingPlanner() new JoinFilterFunctionCompiler(metadata), new IndexJoinLookupStats(), new TaskManagerConfig(), + new MemoryManagerConfig(), new GenericSpillerFactory((types, spillContext, memoryContext) -> { throw new UnsupportedOperationException(); }), diff --git a/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryManagerConfig.java index 8e1a349344c22..43313580b64b5 100644 --- a/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/memory/TestMemoryManagerConfig.java @@ -40,7 +40,8 @@ public void testDefaults() .setMaxQueryMemory(new DataSize(20, GIGABYTE)) .setSoftMaxQueryMemory(new DataSize(20, GIGABYTE)) .setMaxQueryTotalMemory(new DataSize(40, GIGABYTE)) - .setSoftMaxQueryTotalMemory(new DataSize(40, GIGABYTE))); + .setSoftMaxQueryTotalMemory(new DataSize(40, GIGABYTE)) + .setTableFinishOperatorMemoryTrackingEnabled(false)); } @Test @@ -53,6 +54,7 @@ public void testExplicitPropertyMappings() .put("query.soft-max-memory", "1GB") .put("query.max-total-memory", "3GB") .put("query.soft-max-total-memory", "2GB") + .put("table-finish-operator-memory-tracking-enabled", "true") .build(); MemoryManagerConfig expected = new MemoryManagerConfig() @@ -61,7 +63,8 @@ public void testExplicitPropertyMappings() .setMaxQueryMemory(new DataSize(2, GIGABYTE)) .setSoftMaxQueryMemory(new DataSize(1, GIGABYTE)) .setMaxQueryTotalMemory(new DataSize(3, GIGABYTE)) - .setSoftMaxQueryTotalMemory(new DataSize(2, GIGABYTE)); + .setSoftMaxQueryTotalMemory(new DataSize(2, GIGABYTE)) + .setTableFinishOperatorMemoryTrackingEnabled(true); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java index 0bf9ba592c1c3..26e9efcc4fbfe 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestTableFinishOperator.java @@ -121,7 +121,8 @@ public void testStatisticsAggregation() true), descriptor, session, - TABLE_COMMIT_CONTEXT_CODEC); + TABLE_COMMIT_CONTEXT_CODEC, + false); DriverContext driverContext = createTaskContext(scheduledExecutor, scheduledExecutor, session) .addPipelineContext(0, true, true, false) .addDriverContext(); @@ -201,7 +202,8 @@ public void testTableWriteCommit() true), descriptor, session, - TABLE_COMMIT_CONTEXT_CODEC); + TABLE_COMMIT_CONTEXT_CODEC, + false); DriverContext driverContext = createTaskContext(scheduledExecutor, scheduledExecutor, session) .addPipelineContext(0, true, true, false) .addDriverContext();