diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 8ffd8f0391108..c3de8a21c6fe3 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -120,6 +120,7 @@ public final class SystemSessionProperties public static final String QUERY_PRIORITY = "query_priority"; public static final String SPILL_ENABLED = "spill_enabled"; public static final String JOIN_SPILL_ENABLED = "join_spill_enabled"; + public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled"; public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled"; public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled"; public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit"; @@ -602,14 +603,19 @@ public SystemSessionProperties( "Enable join spilling", featuresConfig.isJoinSpillingEnabled(), false), + booleanProperty( + AGGREGATION_SPILL_ENABLED, + "Enable aggregate spilling if spill_enabled", + featuresConfig.isAggregationSpillEnabled(), + false), booleanProperty( DISTINCT_AGGREGATION_SPILL_ENABLED, - "Enable spill for distinct aggregations if spill_enabled", + "Enable spill for distinct aggregations if spill_enabled and aggregation_spill_enabled", featuresConfig.isDistinctAggregationSpillEnabled(), false), booleanProperty( ORDER_BY_AGGREGATION_SPILL_ENABLED, - "Enable spill for order-by aggregations if spill_enabled", + "Enable spill for order-by aggregations if spill_enabled and aggregation_spill_enabled", featuresConfig.isOrderByAggregationSpillEnabled(), false), new PropertyMetadata<>( @@ -1394,6 +1400,11 @@ public static boolean isJoinSpillingEnabled(Session session) return session.getSystemProperty(JOIN_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); } + public static boolean isAggregationSpillEnabled(Session session) + { + return session.getSystemProperty(AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); + } + public static boolean isDistinctAggregationSpillEnabled(Session session) { return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index ca04ca3801cbe..d0259d1327d81 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -124,6 +124,7 @@ public class FeaturesConfig private MultimapAggGroupImplementation multimapAggGroupImplementation = MultimapAggGroupImplementation.NEW; private boolean spillEnabled; private boolean joinSpillingEnabled = true; + private boolean aggregationSpillEnabled = true; private boolean distinctAggregationSpillEnabled = true; private boolean orderByAggregationSpillEnabled = true; private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE); @@ -903,8 +904,21 @@ public FeaturesConfig setJoinSpillingEnabled(boolean joinSpillingEnabled) return this; } + @Config("experimental.aggregation-spill-enabled") + @ConfigDescription("Spill aggregations if spill is enabled") + public FeaturesConfig setAggregationSpillEnabled(boolean aggregationSpillEnabled) + { + this.aggregationSpillEnabled = aggregationSpillEnabled; + return this; + } + + public boolean isAggregationSpillEnabled() + { + return aggregationSpillEnabled; + } + @Config("experimental.distinct-aggregation-spill-enabled") - @ConfigDescription("Spill distinct aggregations if spill is enabled") + @ConfigDescription("Spill distinct aggregations if aggregation spill is enabled") public FeaturesConfig setDistinctAggregationSpillEnabled(boolean distinctAggregationSpillEnabled) { this.distinctAggregationSpillEnabled = distinctAggregationSpillEnabled; @@ -917,7 +931,7 @@ public boolean isDistinctAggregationSpillEnabled() } @Config("experimental.order-by-aggregation-spill-enabled") - @ConfigDescription("Spill order-by aggregations if spill is enabled") + @ConfigDescription("Spill order-by aggregations if aggregation spill is enabled") public FeaturesConfig setOrderByAggregationSpillEnabled(boolean orderByAggregationSpillEnabled) { this.orderByAggregationSpillEnabled = orderByAggregationSpillEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 760454bdefdac..8d1589b6c56af 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -253,6 +253,7 @@ import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency; import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount; import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount; +import static com.facebook.presto.SystemSessionProperties.isAggregationSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isDistinctAggregationSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isEnableDynamicFiltering; import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled; @@ -1248,6 +1249,7 @@ public PhysicalOperation visitAggregation(AggregationNode node, LocalExecutionPl node, source, spillEnabled, + isAggregationSpillEnabled(session), isDistinctAggregationSpillEnabled(session), isOrderByAggregationSpillEnabled(session), unspillMemoryLimit, @@ -2531,6 +2533,7 @@ public PhysicalOperation visitTableWriter(TableWriterNode node, LocalExecutionPl false, false, false, + false, new DataSize(0, BYTE), context, STATS_START_CHANNEL, @@ -2636,6 +2639,7 @@ public PhysicalOperation visitTableWriteMerge(TableWriterMergeNode node, LocalEx false, false, false, + false, new DataSize(0, BYTE), context, STATS_START_CHANNEL, @@ -2690,6 +2694,7 @@ public PhysicalOperation visitTableFinish(TableFinishNode node, LocalExecutionPl false, false, false, + false, new DataSize(0, BYTE), context, 0, @@ -3044,6 +3049,7 @@ private PhysicalOperation planGroupByAggregation( AggregationNode node, PhysicalOperation source, boolean spillEnabled, + boolean aggregationSpillEnabled, boolean distinctAggregationSpillEnabled, boolean orderByAggregationSpillEnabled, DataSize unspillMemoryLimit, @@ -3061,6 +3067,7 @@ private PhysicalOperation planGroupByAggregation( source, node.hasDefaultOutput(), spillEnabled, + aggregationSpillEnabled, distinctAggregationSpillEnabled, orderByAggregationSpillEnabled, node.isStreamable(), @@ -3085,6 +3092,7 @@ private OperatorFactory createHashAggregationOperatorFactory( PhysicalOperation source, boolean hasDefaultOutput, boolean spillEnabled, + boolean aggregationSpillEnabled, boolean distinctSpillEnabled, boolean orderBySpillEnabled, boolean isStreamable, @@ -3098,7 +3106,7 @@ private OperatorFactory createHashAggregationOperatorFactory( { List aggregationOutputVariables = new ArrayList<>(); List accumulatorFactories = new ArrayList<>(); - boolean useSpill = spillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled); + boolean useSpill = spillEnabled && aggregationSpillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled); for (Map.Entry entry : aggregations.entrySet()) { VariableReferenceExpression variable = entry.getKey(); Aggregation aggregation = entry.getValue(); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 5a9f350420564..fdb7a81e73d2f 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -90,6 +90,7 @@ public void testDefaults() .setRe2JDfaRetries(5) .setSpillEnabled(false) .setJoinSpillingEnabled(true) + .setAggregationSpillEnabled(true) .setDistinctAggregationSpillEnabled(true) .setOrderByAggregationSpillEnabled(true) .setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("4MB")) @@ -236,6 +237,7 @@ public void testExplicitPropertyMappings() .put("re2j.dfa-retries", "42") .put("experimental.spill-enabled", "true") .put("experimental.join-spill-enabled", "false") + .put("experimental.aggregation-spill-enabled", "false") .put("experimental.distinct-aggregation-spill-enabled", "false") .put("experimental.order-by-aggregation-spill-enabled", "false") .put("experimental.aggregation-operator-unspill-memory-limit", "100MB") @@ -355,6 +357,7 @@ public void testExplicitPropertyMappings() .setRe2JDfaRetries(42) .setSpillEnabled(true) .setJoinSpillingEnabled(false) + .setAggregationSpillEnabled(false) .setDistinctAggregationSpillEnabled(false) .setOrderByAggregationSpillEnabled(false) .setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("100MB")) diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java index 44b4713496e91..8e3277048e468 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java @@ -17,6 +17,7 @@ import com.facebook.presto.testing.QueryRunner; import org.testng.annotations.Test; +import static com.facebook.presto.SystemSessionProperties.AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.DISTINCT_AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.ORDER_BY_AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE; @@ -113,4 +114,24 @@ public void testMultipleDistinctAggregations() { assertQuery("SELECT custkey, count(DISTINCT orderpriority), count(DISTINCT orderstatus), count(DISTINCT totalprice), count(DISTINCT clerk) FROM orders GROUP BY custkey"); } + + @Test + public void testDoesNotSpillWhenAggregationSpillDisabled() + { + Session session = Session.builder(getSession()) + .setSystemProperty(AGGREGATION_SPILL_ENABLED, "false") + // This will not spill even when distinct/orderBy Spill is enabled since aggregationSpill is disabled above + .setSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, "true") + .setSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, "true") + // set this low so that if we ran with spill the query would fail + .setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B") + .build(); + + assertQuery(session, + "SELECT orderpriority, custkey, array_agg(orderstatus ORDER BY orderstatus) FROM orders GROUP BY orderpriority, custkey"); + + // the sum() is necessary so that the aggregation isn't optimized into multiple aggregation nodes + assertQuery(session, + "SELECT custkey, sum(custkey), count(DISTINCT orderpriority) FROM orders GROUP BY custkey"); + } }