diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index fa4d2f1fc80d8..8ffd8f0391108 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -123,6 +123,7 @@ public final class SystemSessionProperties public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled"; public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled"; public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit"; + public static final String QUERY_MAX_REVOCABLE_MEMORY_PER_NODE = "query_max_revocable_memory_per_node"; public static final String TEMP_STORAGE_SPILLER_BUFFER_SIZE = "temp_storage_spiller_buffer_size"; public static final String OPTIMIZE_DISTINCT_AGGREGATIONS = "optimize_mixed_distinct_aggregations"; public static final String LEGACY_ROW_FIELD_ORDINAL_ACCESS = "legacy_row_field_ordinal_access"; @@ -620,6 +621,15 @@ public SystemSessionProperties( false, value -> DataSize.valueOf((String) value), DataSize::toString), + new PropertyMetadata<>( + QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, + "Maximum amount of revocable memory a query can use", + VARCHAR, + DataSize.class, + nodeSpillConfig.getMaxRevocableMemoryPerNode(), + true, + value -> DataSize.valueOf((String) value), + DataSize::toString), new PropertyMetadata<>( TEMP_STORAGE_SPILLER_BUFFER_SIZE, "Experimental: Buffer size used by TempStorageSingleStreamSpiller", @@ -1401,6 +1411,11 @@ public static DataSize getAggregationOperatorUnspillMemoryLimit(Session session) return memoryLimitForMerge; } + public static DataSize getQueryMaxRevocableMemoryPerNode(Session session) + { + return session.getSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, DataSize.class); + } + public static DataSize getTempStorageSpillerBufferSize(Session session) { DataSize tempStorageSpillerBufferSize = session.getSystemProperty(TEMP_STORAGE_SPILLER_BUFFER_SIZE, DataSize.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java index 31160162e12b9..100e30364c47a 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/SqlTaskManager.java @@ -78,6 +78,7 @@ import static com.facebook.airlift.concurrent.Threads.threadsNamed; import static com.facebook.presto.SystemSessionProperties.getQueryMaxBroadcastMemory; import static com.facebook.presto.SystemSessionProperties.getQueryMaxMemoryPerNode; +import static com.facebook.presto.SystemSessionProperties.getQueryMaxRevocableMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.getQueryMaxTotalMemoryPerNode; import static com.facebook.presto.SystemSessionProperties.isVerboseExceededMemoryLimitErrorsEnabled; import static com.facebook.presto.SystemSessionProperties.resourceOvercommit; @@ -412,7 +413,8 @@ public TaskInfo updateTask( queryContext.setMemoryLimits( getQueryMaxMemoryPerNode(session), getQueryMaxTotalMemoryPerNode(session), - getQueryMaxBroadcastMemory(session)); + getQueryMaxBroadcastMemory(session), + getQueryMaxRevocableMemoryPerNode(session)); } } diff --git a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java index 74043c9fb2575..d858ead7edb4b 100644 --- a/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java +++ b/presto-main/src/main/java/com/facebook/presto/memory/QueryContext.java @@ -92,10 +92,8 @@ public class QueryContext private long maxTotalMemory; @GuardedBy("this") private long peakNodeTotalMemory; - - // TODO: Make max revocable memory be configurable by session property. @GuardedBy("this") - private final long maxRevocableMemory; + private long maxRevocableMemory; @GuardedBy("this") private long broadcastUsed; @@ -392,12 +390,17 @@ public QueryId getQueryId() return queryId; } - public synchronized void setMemoryLimits(DataSize queryMaxTaskMemory, DataSize queryMaxTotalTaskMemory, DataSize queryMaxBroadcastMemory) + public synchronized void setMemoryLimits( + DataSize queryMaxTaskMemory, + DataSize queryMaxTotalTaskMemory, + DataSize queryMaxBroadcastMemory, + DataSize queryMaxRevocableMemory) { // Don't allow session properties to increase memory beyond configured limits maxUserMemory = Math.min(maxUserMemory, queryMaxTaskMemory.toBytes()); maxTotalMemory = Math.min(maxTotalMemory, queryMaxTotalTaskMemory.toBytes()); maxBroadcastUsedMemory = Math.min(maxBroadcastUsedMemory, queryMaxBroadcastMemory.toBytes()); + maxRevocableMemory = Math.min(maxRevocableMemory, queryMaxRevocableMemory.toBytes()); // Mark future memory limit updates as unnecessary memoryLimitsInitialized = true; } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java index 1f5492fb228cc..58e3b0ac1c1d1 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java @@ -66,8 +66,6 @@ public static class HashAggregationOperatorFactory private final int expectedGroups; private final Optional maxPartialMemory; private final boolean spillEnabled; - private final boolean distinctAggregationSpillEnabled; - private final boolean orderByAggregationSpillEnabled; private final DataSize memoryLimitForMerge; private final DataSize memoryLimitForMergeWithMemory; private final SpillerFactory spillerFactory; @@ -105,8 +103,6 @@ public HashAggregationOperatorFactory( expectedGroups, maxPartialMemory, false, - false, - false, new DataSize(0, MEGABYTE), new DataSize(0, MEGABYTE), (types, spillContext, memoryContext) -> { @@ -130,8 +126,6 @@ public HashAggregationOperatorFactory( int expectedGroups, Optional maxPartialMemory, boolean spillEnabled, - boolean distinctAggregationSpillEnabled, - boolean orderByAggregationSpillEnabled, DataSize unspillMemoryLimit, SpillerFactory spillerFactory, JoinCompiler joinCompiler, @@ -150,8 +144,6 @@ public HashAggregationOperatorFactory( expectedGroups, maxPartialMemory, spillEnabled, - distinctAggregationSpillEnabled, - orderByAggregationSpillEnabled, unspillMemoryLimit, DataSize.succinctBytes((long) (unspillMemoryLimit.toBytes() * MERGE_WITH_MEMORY_RATIO)), spillerFactory, @@ -174,8 +166,6 @@ public HashAggregationOperatorFactory( int expectedGroups, Optional maxPartialMemory, boolean spillEnabled, - boolean distinctAggregationSpillEnabled, - boolean orderByAggregationSpillEnabled, DataSize memoryLimitForMerge, DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, @@ -195,8 +185,6 @@ public HashAggregationOperatorFactory( this.expectedGroups = expectedGroups; this.maxPartialMemory = requireNonNull(maxPartialMemory, "maxPartialMemory is null"); this.spillEnabled = spillEnabled; - this.distinctAggregationSpillEnabled = distinctAggregationSpillEnabled; - this.orderByAggregationSpillEnabled = orderByAggregationSpillEnabled; this.memoryLimitForMerge = requireNonNull(memoryLimitForMerge, "memoryLimitForMerge is null"); this.memoryLimitForMergeWithMemory = requireNonNull(memoryLimitForMergeWithMemory, "memoryLimitForMergeWithMemory is null"); this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null"); @@ -223,8 +211,6 @@ public Operator createOperator(DriverContext driverContext) expectedGroups, maxPartialMemory, spillEnabled, - distinctAggregationSpillEnabled, - orderByAggregationSpillEnabled, memoryLimitForMerge, memoryLimitForMergeWithMemory, spillerFactory, @@ -256,8 +242,6 @@ public OperatorFactory duplicate() expectedGroups, maxPartialMemory, spillEnabled, - distinctAggregationSpillEnabled, - orderByAggregationSpillEnabled, memoryLimitForMerge, memoryLimitForMergeWithMemory, spillerFactory, @@ -278,8 +262,6 @@ public OperatorFactory duplicate() private final int expectedGroups; private final Optional maxPartialMemory; private final boolean spillEnabled; - private final boolean distinctAggregationSpillEnabled; - private final boolean orderByAggregationSpillEnabled; private final DataSize memoryLimitForMerge; private final DataSize memoryLimitForMergeWithMemory; private final SpillerFactory spillerFactory; @@ -311,8 +293,6 @@ public HashAggregationOperator( int expectedGroups, Optional maxPartialMemory, boolean spillEnabled, - boolean distinctAggregationSpillEnabled, - boolean orderByAggregationSpillEnabled, DataSize memoryLimitForMerge, DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, @@ -336,8 +316,6 @@ public HashAggregationOperator( this.maxPartialMemory = requireNonNull(maxPartialMemory, "maxPartialMemory is null"); this.types = toTypes(groupByTypes, step, accumulatorFactories, hashChannel); this.spillEnabled = spillEnabled; - this.distinctAggregationSpillEnabled = distinctAggregationSpillEnabled; - this.orderByAggregationSpillEnabled = orderByAggregationSpillEnabled; this.memoryLimitForMerge = requireNonNull(memoryLimitForMerge, "memoryLimitForMerge is null"); this.memoryLimitForMergeWithMemory = requireNonNull(memoryLimitForMergeWithMemory, "memoryLimitForMergeWithMemory is null"); this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null"); @@ -388,10 +366,7 @@ public void addInput(Page page) inputProcessed = true; if (aggregationBuilder == null) { - if (step.isOutputPartial() || - !spillEnabled || - (!distinctAggregationSpillEnabled && hasDistinct()) || - (!orderByAggregationSpillEnabled && hasOrderBy())) { + if (step.isOutputPartial() || !spillEnabled) { aggregationBuilder = new InMemoryHashAggregationBuilder( accumulatorFactories, step, @@ -435,16 +410,6 @@ public void addInput(Page page) aggregationBuilder.updateMemory(); } - private boolean hasOrderBy() - { - return accumulatorFactories.stream().anyMatch(AccumulatorFactory::hasOrderBy); - } - - private boolean hasDistinct() - { - return accumulatorFactories.stream().anyMatch(AccumulatorFactory::hasDistinct); - } - @Override public ListenableFuture startMemoryRevoke() { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 0e53d13f57e55..760454bdefdac 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -3098,11 +3098,12 @@ private OperatorFactory createHashAggregationOperatorFactory( { List aggregationOutputVariables = new ArrayList<>(); List accumulatorFactories = new ArrayList<>(); + boolean useSpill = spillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled); for (Map.Entry entry : aggregations.entrySet()) { VariableReferenceExpression variable = entry.getKey(); Aggregation aggregation = entry.getValue(); - accumulatorFactories.add(buildAccumulatorFactory(source, aggregation, !isStreamable && spillEnabled)); + accumulatorFactories.add(buildAccumulatorFactory(source, aggregation, useSpill)); aggregationOutputVariables.add(variable); } @@ -3159,15 +3160,23 @@ private OperatorFactory createHashAggregationOperatorFactory( groupIdChannel, expectedGroups, maxPartialAggregationMemorySize, - spillEnabled, - distinctSpillEnabled, - orderBySpillEnabled, + useSpill, unspillMemoryLimit, spillerFactory, joinCompiler, useSystemMemory); } } + + private boolean hasDistinct(Map aggregations) + { + return aggregations.values().stream().anyMatch(aggregation -> aggregation.isDistinct()); + } + + private boolean hasOrderBy(Map aggregations) + { + return aggregations.values().stream().anyMatch(aggregation -> aggregation.getOrderBy().isPresent()); + } } private static TableFinisher createTableFinisher(Session session, Metadata metadata, ExecutionWriterTarget target) diff --git a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkHashAndStreamingAggregationOperators.java b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkHashAndStreamingAggregationOperators.java index 8a7996532d80d..12a4d575b8f8f 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkHashAndStreamingAggregationOperators.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/BenchmarkHashAndStreamingAggregationOperators.java @@ -168,8 +168,6 @@ private OperatorFactory createHashAggregationOperatorFactory(Optional h 100_000, Optional.of(new DataSize(16, MEGABYTE)), false, - false, - false, succinctBytes(8), succinctBytes(Integer.MAX_VALUE), spillerFactory, diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestHashAggregationOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestHashAggregationOperator.java index 84ef5c1419950..44f9b939902b4 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestHashAggregationOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestHashAggregationOperator.java @@ -189,8 +189,6 @@ public void testHashAggregation(boolean hashEnabled, boolean spillEnabled, boole 100_000, Optional.of(new DataSize(16, MEGABYTE)), spillEnabled, - true, - true, succinctBytes(memoryLimitForMerge), succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, @@ -244,8 +242,6 @@ public void testHashAggregationWithGlobals(boolean hashEnabled, boolean spillEna 100_000, Optional.of(new DataSize(16, MEGABYTE)), spillEnabled, - true, - true, succinctBytes(memoryLimitForMerge), succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, @@ -292,8 +288,6 @@ public void testHashAggregationMemoryReservation(boolean hashEnabled, boolean sp 100_000, Optional.of(new DataSize(16, MEGABYTE)), spillEnabled, - true, - true, succinctBytes(memoryLimitForMerge), succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, @@ -374,8 +368,6 @@ public void testHashBuilderResize(boolean hashEnabled, boolean spillEnabled, boo 100_000, Optional.of(new DataSize(16, MEGABYTE)), spillEnabled, - true, - true, succinctBytes(memoryLimitForMerge), succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, @@ -601,8 +593,6 @@ public void testMergeWithMemorySpill() 1, Optional.of(new DataSize(16, MEGABYTE)), true, - true, - true, new DataSize(smallPagesSpillThresholdSize, Unit.BYTE), succinctBytes(Integer.MAX_VALUE), spillerFactory, @@ -657,8 +647,6 @@ public void testSpillerFailure() 100_000, Optional.of(new DataSize(16, MEGABYTE)), true, - true, - true, succinctBytes(8), succinctBytes(Integer.MAX_VALUE), new FailingSpillerFactory(), @@ -699,8 +687,6 @@ public void testMask() 1, Optional.of(new DataSize(16, MEGABYTE)), false, - false, - false, new DataSize(16, MEGABYTE), new DataSize(16, MEGABYTE), new FailingSpillerFactory(), diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java index 84690bc89b2ee..6d9e44cdbad94 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestHashJoinOperator.java @@ -1197,7 +1197,7 @@ public void testMemoryLimit(boolean parallelBuild, boolean buildHashEnabled) public void testBroadcastMemoryLimit(boolean parallelBuild, boolean buildHashEnabled) { TaskContext taskContext = TestingTaskContext.createTaskContext(executor, scheduledExecutor, TEST_SESSION, new DataSize(100, MEGABYTE)); - taskContext.getQueryContext().setMemoryLimits(new DataSize(512, MEGABYTE), new DataSize(512, MEGABYTE), new DataSize(100, BYTE)); + taskContext.getQueryContext().setMemoryLimits(new DataSize(512, MEGABYTE), new DataSize(512, MEGABYTE), new DataSize(100, BYTE), new DataSize(512, MEGABYTE)); RowPagesBuilder buildPages = rowPagesBuilder(buildHashEnabled, Ints.asList(0), ImmutableList.of(VARCHAR, BIGINT, BIGINT)) .addSequencePage(10, 20, 30, 40); BuildSideSetup buildSideSetup = setupBuildSide(parallelBuild, taskContext, Ints.asList(0), diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java index cee5ff9372cbf..458396f62bb01 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestDistributedSpilledQueries.java @@ -42,7 +42,6 @@ public static QueryRunner localCreateQueryRunner() .setCatalog("tpch") .setSchema(TINY_SCHEMA_NAME) .setSystemProperty(SystemSessionProperties.TASK_CONCURRENCY, "2") - .setSystemProperty(SystemSessionProperties.SPILL_ENABLED, "true") .setSystemProperty(SystemSessionProperties.AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, "128kB") .setSystemProperty(SystemSessionProperties.USE_MARK_DISTINCT, "false") .build(); diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java index a7e804a28e34e..44b4713496e91 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java @@ -17,9 +17,9 @@ import com.facebook.presto.testing.QueryRunner; import org.testng.annotations.Test; -import static com.facebook.presto.SystemSessionProperties.AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT; import static com.facebook.presto.SystemSessionProperties.DISTINCT_AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.ORDER_BY_AGGREGATION_SPILL_ENABLED; +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE; public class TestSpilledAggregations extends AbstractTestAggregations @@ -43,10 +43,10 @@ public void testDoesNotSpillOrderByWhenDisabled() Session session = Session.builder(getSession()) .setSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, "false") // set this low so that if we ran with spill the query would fail - .setSystemProperty(AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, "1B") + .setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B") .build(); assertQuery(session, - "SELECT orderpriority, custkey, array_agg(orderstatus ORDER BY orderstatus) FROM orders GROUP BY orderpriority, custkey ORDER BY 1, 2"); + "SELECT orderpriority, custkey, array_agg(orderstatus ORDER BY orderstatus) FROM orders GROUP BY orderpriority, custkey"); } @Test @@ -73,11 +73,11 @@ public void testDoesNotSpillDistinctWhenDisabled() Session session = Session.builder(getSession()) .setSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, "false") // set this low so that if we ran with spill the query would fail - .setSystemProperty(AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, "1B") + .setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B") .build(); // the sum() is necessary so that the aggregation isn't optimized into multiple aggregation nodes assertQuery(session, - "SELECT custkey, sum(custkey), count(DISTINCT orderpriority) FILTER(WHERE orderkey > 5) FROM orders GROUP BY custkey ORDER BY 1"); + "SELECT custkey, sum(custkey), count(DISTINCT orderpriority) FROM orders GROUP BY custkey"); } @Test