diff --git a/presto-docs/src/main/sphinx/admin/properties.rst b/presto-docs/src/main/sphinx/admin/properties.rst index 3b39429f84f18..6067a6ecd8479 100644 --- a/presto-docs/src/main/sphinx/admin/properties.rst +++ b/presto-docs/src/main/sphinx/admin/properties.rst @@ -162,13 +162,24 @@ Spilling Properties This config property can be overridden by the ``join_spill_enabled`` session property. +``experimental.aggregation-spill-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``boolean`` + * **Default value:** ``true`` + + When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for aggregations to + avoid exceeding memory limits for the query. + + This config property can be overridden by the ``aggregation_spill_enabled`` session property. + ``experimental.distinct-aggregation-spill-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * **Type:** ``boolean`` * **Default value:** ``true`` - When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for distinct + When ``aggregation_spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for distinct aggregations to avoid exceeding memory limits for the query. This config property can be overridden by the ``distinct_aggregation_spill_enabled`` session property. @@ -179,11 +190,33 @@ Spilling Properties * **Type:** ``boolean`` * **Default value:** ``true`` - When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by + When ``aggregation_spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by aggregations to avoid exceeding memory limits for the query. This config property can be overridden by the ``order_by_aggregation_spill_enabled`` session property. +``experimental.window-spill-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``boolean`` + * **Default value:** ``true`` + + When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for window functions to + avoid exceeding memory limits for the query. + + This config property can be overridden by the ``window_spill_enabled`` session property. + +``experimental.order-by-spill-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + * **Type:** ``boolean`` + * **Default value:** ``true`` + + When ``spill_enabled`` is ``true``, this determines whether Presto will try spilling memory to disk for order by to + avoid exceeding memory limits for the query. + + This config property can be overridden by the ``order_by_spill_enabled`` session property. + ``experimental.spiller.task-spilling-strategy`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ * **Type:** ``string`` diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 8ffd8f0391108..9b2996a9912f6 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -120,8 +120,11 @@ public final class SystemSessionProperties public static final String QUERY_PRIORITY = "query_priority"; public static final String SPILL_ENABLED = "spill_enabled"; public static final String JOIN_SPILL_ENABLED = "join_spill_enabled"; + public static final String AGGREGATION_SPILL_ENABLED = "aggregation_spill_enabled"; public static final String DISTINCT_AGGREGATION_SPILL_ENABLED = "distinct_aggregation_spill_enabled"; public static final String ORDER_BY_AGGREGATION_SPILL_ENABLED = "order_by_aggregation_spill_enabled"; + public static final String WINDOW_SPILL_ENABLED = "window_spill_enabled"; + public static final String ORDER_BY_SPILL_ENABLED = "order_by_spill_enabled"; public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit"; public static final String QUERY_MAX_REVOCABLE_MEMORY_PER_NODE = "query_max_revocable_memory_per_node"; public static final String TEMP_STORAGE_SPILLER_BUFFER_SIZE = "temp_storage_spiller_buffer_size"; @@ -602,16 +605,31 @@ public SystemSessionProperties( "Enable join spilling", featuresConfig.isJoinSpillingEnabled(), false), + booleanProperty( + AGGREGATION_SPILL_ENABLED, + "Enable aggregate spilling if spill_enabled", + featuresConfig.isAggregationSpillEnabled(), + false), booleanProperty( DISTINCT_AGGREGATION_SPILL_ENABLED, - "Enable spill for distinct aggregations if spill_enabled", + "Enable spill for distinct aggregations if spill_enabled and aggregation_spill_enabled", featuresConfig.isDistinctAggregationSpillEnabled(), false), booleanProperty( ORDER_BY_AGGREGATION_SPILL_ENABLED, - "Enable spill for order-by aggregations if spill_enabled", + "Enable spill for order-by aggregations if spill_enabled and aggregation_spill_enabled", featuresConfig.isOrderByAggregationSpillEnabled(), false), + booleanProperty( + WINDOW_SPILL_ENABLED, + "Enable window spilling if spill_enabled", + featuresConfig.isWindowSpillEnabled(), + false), + booleanProperty( + ORDER_BY_SPILL_ENABLED, + "Enable order by spilling if spill_enabled", + featuresConfig.isOrderBySpillEnabled(), + false), new PropertyMetadata<>( AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, "Experimental: How much memory can should be allocated per aggragation operator in unspilling process", @@ -1394,14 +1412,29 @@ public static boolean isJoinSpillingEnabled(Session session) return session.getSystemProperty(JOIN_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); } + public static boolean isAggregationSpillEnabled(Session session) + { + return session.getSystemProperty(AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); + } + public static boolean isDistinctAggregationSpillEnabled(Session session) { - return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); + return session.getSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session); } public static boolean isOrderByAggregationSpillEnabled(Session session) { - return session.getSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); + return session.getSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, Boolean.class) && isAggregationSpillEnabled(session); + } + + public static boolean isWindowSpillEnabled(Session session) + { + return session.getSystemProperty(WINDOW_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); + } + + public static boolean isOrderBySpillEnabled(Session session) + { + return session.getSystemProperty(ORDER_BY_SPILL_ENABLED, Boolean.class) && isSpillEnabled(session); } public static DataSize getAggregationOperatorUnspillMemoryLimit(Session session) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index ca04ca3801cbe..23984f548c854 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -124,8 +124,11 @@ public class FeaturesConfig private MultimapAggGroupImplementation multimapAggGroupImplementation = MultimapAggGroupImplementation.NEW; private boolean spillEnabled; private boolean joinSpillingEnabled = true; + private boolean aggregationSpillEnabled = true; private boolean distinctAggregationSpillEnabled = true; private boolean orderByAggregationSpillEnabled = true; + private boolean windowSpillEnabled = true; + private boolean orderBySpillEnabled = true; private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE); private List spillerSpillPaths = ImmutableList.of(); private int spillerThreads = 4; @@ -903,8 +906,21 @@ public FeaturesConfig setJoinSpillingEnabled(boolean joinSpillingEnabled) return this; } + @Config("experimental.aggregation-spill-enabled") + @ConfigDescription("Spill aggregations if spill is enabled") + public FeaturesConfig setAggregationSpillEnabled(boolean aggregationSpillEnabled) + { + this.aggregationSpillEnabled = aggregationSpillEnabled; + return this; + } + + public boolean isAggregationSpillEnabled() + { + return aggregationSpillEnabled; + } + @Config("experimental.distinct-aggregation-spill-enabled") - @ConfigDescription("Spill distinct aggregations if spill is enabled") + @ConfigDescription("Spill distinct aggregations if aggregation spill is enabled") public FeaturesConfig setDistinctAggregationSpillEnabled(boolean distinctAggregationSpillEnabled) { this.distinctAggregationSpillEnabled = distinctAggregationSpillEnabled; @@ -917,7 +933,7 @@ public boolean isDistinctAggregationSpillEnabled() } @Config("experimental.order-by-aggregation-spill-enabled") - @ConfigDescription("Spill order-by aggregations if spill is enabled") + @ConfigDescription("Spill order-by aggregations if aggregation spill is enabled") public FeaturesConfig setOrderByAggregationSpillEnabled(boolean orderByAggregationSpillEnabled) { this.orderByAggregationSpillEnabled = orderByAggregationSpillEnabled; @@ -929,6 +945,32 @@ public boolean isOrderByAggregationSpillEnabled() return orderByAggregationSpillEnabled; } + @Config("experimental.window-spill-enabled") + @ConfigDescription("Enable Window Operator Spilling if spill is enabled") + public FeaturesConfig setWindowSpillEnabled(boolean windowSpillEnabled) + { + this.windowSpillEnabled = windowSpillEnabled; + return this; + } + + public boolean isWindowSpillEnabled() + { + return windowSpillEnabled; + } + + @Config("experimental.order-by-spill-enabled") + @ConfigDescription("Enable Order-by Operator Spilling if spill is enabled") + public FeaturesConfig setOrderBySpillEnabled(boolean orderBySpillEnabled) + { + this.orderBySpillEnabled = orderBySpillEnabled; + return this; + } + + public boolean isOrderBySpillEnabled() + { + return orderBySpillEnabled; + } + public boolean isIterativeOptimizerEnabled() { return iterativeOptimizerEnabled; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java index 760454bdefdac..a0efb67ada8a1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/LocalExecutionPlanner.java @@ -253,6 +253,7 @@ import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency; import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount; import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount; +import static com.facebook.presto.SystemSessionProperties.isAggregationSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isDistinctAggregationSpillEnabled; import static com.facebook.presto.SystemSessionProperties.isEnableDynamicFiltering; import static com.facebook.presto.SystemSessionProperties.isExchangeChecksumEnabled; @@ -261,7 +262,9 @@ import static com.facebook.presto.SystemSessionProperties.isOptimizeCommonSubExpressions; import static com.facebook.presto.SystemSessionProperties.isOptimizedRepartitioningEnabled; import static com.facebook.presto.SystemSessionProperties.isOrderByAggregationSpillEnabled; +import static com.facebook.presto.SystemSessionProperties.isOrderBySpillEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; +import static com.facebook.presto.SystemSessionProperties.isWindowSpillEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature; import static com.facebook.presto.common.type.TypeUtils.writeNativeValue; @@ -1085,7 +1088,7 @@ public PhysicalOperation visitWindow(WindowNode node, LocalExecutionPlanContext node.getPreSortedOrderPrefix(), 10_000, pagesIndexFactory, - isSpillEnabled(session), + isWindowSpillEnabled(session), spillerFactory, orderingCompiler); @@ -1136,7 +1139,7 @@ public PhysicalOperation visitSort(SortNode node, LocalExecutionPlanContext cont outputChannels.add(i); } - boolean spillEnabled = isSpillEnabled(context.getSession()); + boolean spillEnabled = isOrderBySpillEnabled(context.getSession()); OperatorFactory operator = new OrderByOperatorFactory( context.getNextOperatorId(), @@ -1247,7 +1250,7 @@ public PhysicalOperation visitAggregation(AggregationNode node, LocalExecutionPl return planGroupByAggregation( node, source, - spillEnabled, + isAggregationSpillEnabled(session), isDistinctAggregationSpillEnabled(session), isOrderByAggregationSpillEnabled(session), unspillMemoryLimit, @@ -3043,7 +3046,7 @@ private AggregationOperatorFactory createAggregationOperatorFactory( private PhysicalOperation planGroupByAggregation( AggregationNode node, PhysicalOperation source, - boolean spillEnabled, + boolean aggregationSpillEnabled, boolean distinctAggregationSpillEnabled, boolean orderByAggregationSpillEnabled, DataSize unspillMemoryLimit, @@ -3060,7 +3063,7 @@ private PhysicalOperation planGroupByAggregation( node.getGroupIdVariable(), source, node.hasDefaultOutput(), - spillEnabled, + aggregationSpillEnabled, distinctAggregationSpillEnabled, orderByAggregationSpillEnabled, node.isStreamable(), @@ -3084,7 +3087,7 @@ private OperatorFactory createHashAggregationOperatorFactory( Optional groupIdVariable, PhysicalOperation source, boolean hasDefaultOutput, - boolean spillEnabled, + boolean aggregationSpillEnabled, boolean distinctSpillEnabled, boolean orderBySpillEnabled, boolean isStreamable, @@ -3098,7 +3101,7 @@ private OperatorFactory createHashAggregationOperatorFactory( { List aggregationOutputVariables = new ArrayList<>(); List accumulatorFactories = new ArrayList<>(); - boolean useSpill = spillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled); + boolean useSpill = aggregationSpillEnabled && !isStreamable && (!hasDistinct(aggregations) || distinctSpillEnabled) && (!hasOrderBy(aggregations) || orderBySpillEnabled); for (Map.Entry entry : aggregations.entrySet()) { VariableReferenceExpression variable = entry.getKey(); Aggregation aggregation = entry.getValue(); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 5a9f350420564..21de9871eae72 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -90,8 +90,11 @@ public void testDefaults() .setRe2JDfaRetries(5) .setSpillEnabled(false) .setJoinSpillingEnabled(true) + .setAggregationSpillEnabled(true) .setDistinctAggregationSpillEnabled(true) .setOrderByAggregationSpillEnabled(true) + .setWindowSpillEnabled(true) + .setOrderBySpillEnabled(true) .setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("4MB")) .setSpillerSpillPaths("") .setSpillerThreads(4) @@ -236,8 +239,11 @@ public void testExplicitPropertyMappings() .put("re2j.dfa-retries", "42") .put("experimental.spill-enabled", "true") .put("experimental.join-spill-enabled", "false") + .put("experimental.aggregation-spill-enabled", "false") .put("experimental.distinct-aggregation-spill-enabled", "false") .put("experimental.order-by-aggregation-spill-enabled", "false") + .put("experimental.window-spill-enabled", "false") + .put("experimental.order-by-spill-enabled", "false") .put("experimental.aggregation-operator-unspill-memory-limit", "100MB") .put("experimental.spiller-spill-path", "/tmp/custom/spill/path1,/tmp/custom/spill/path2") .put("experimental.spiller-threads", "42") @@ -355,8 +361,11 @@ public void testExplicitPropertyMappings() .setRe2JDfaRetries(42) .setSpillEnabled(true) .setJoinSpillingEnabled(false) + .setAggregationSpillEnabled(false) .setDistinctAggregationSpillEnabled(false) .setOrderByAggregationSpillEnabled(false) + .setWindowSpillEnabled(false) + .setOrderBySpillEnabled(false) .setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("100MB")) .setSpillerSpillPaths("/tmp/custom/spill/path1,/tmp/custom/spill/path2") .setSpillerThreads(42) diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java index 44b4713496e91..8e3277048e468 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledAggregations.java @@ -17,6 +17,7 @@ import com.facebook.presto.testing.QueryRunner; import org.testng.annotations.Test; +import static com.facebook.presto.SystemSessionProperties.AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.DISTINCT_AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.ORDER_BY_AGGREGATION_SPILL_ENABLED; import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE; @@ -113,4 +114,24 @@ public void testMultipleDistinctAggregations() { assertQuery("SELECT custkey, count(DISTINCT orderpriority), count(DISTINCT orderstatus), count(DISTINCT totalprice), count(DISTINCT clerk) FROM orders GROUP BY custkey"); } + + @Test + public void testDoesNotSpillWhenAggregationSpillDisabled() + { + Session session = Session.builder(getSession()) + .setSystemProperty(AGGREGATION_SPILL_ENABLED, "false") + // This will not spill even when distinct/orderBy Spill is enabled since aggregationSpill is disabled above + .setSystemProperty(ORDER_BY_AGGREGATION_SPILL_ENABLED, "true") + .setSystemProperty(DISTINCT_AGGREGATION_SPILL_ENABLED, "true") + // set this low so that if we ran with spill the query would fail + .setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B") + .build(); + + assertQuery(session, + "SELECT orderpriority, custkey, array_agg(orderstatus ORDER BY orderstatus) FROM orders GROUP BY orderpriority, custkey"); + + // the sum() is necessary so that the aggregation isn't optimized into multiple aggregation nodes + assertQuery(session, + "SELECT custkey, sum(custkey), count(DISTINCT orderpriority) FROM orders GROUP BY custkey"); + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledOrderByQueries.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledOrderByQueries.java index 962da3c2a26fd..5c05a027fe63c 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledOrderByQueries.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledOrderByQueries.java @@ -14,7 +14,12 @@ package com.facebook.presto.tests; +import com.facebook.presto.Session; import com.facebook.presto.testing.QueryRunner; +import org.testng.annotations.Test; + +import static com.facebook.presto.SystemSessionProperties.ORDER_BY_SPILL_ENABLED; +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE; public class TestSpilledOrderByQueries extends AbstractTestOrderByQueries @@ -25,4 +30,16 @@ protected QueryRunner createQueryRunner() { return TestDistributedSpilledQueries.localCreateQueryRunner(); } + + @Test + public void testDoesNotSpillWhenOrderBySpillDisabled() + { + Session session = Session.builder(getSession()) + .setSystemProperty(ORDER_BY_SPILL_ENABLED, "false") + // set this low so that if we ran with spill the query would fail + .setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B") + .build(); + + assertQuery(session, "SELECT orderstatus FROM orders ORDER BY orderkey DESC"); + } } diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledWindowQueries.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledWindowQueries.java index 78bc5a2122af3..9ec23774445f8 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledWindowQueries.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestSpilledWindowQueries.java @@ -13,7 +13,12 @@ */ package com.facebook.presto.tests; +import com.facebook.presto.Session; import com.facebook.presto.testing.QueryRunner; +import org.testng.annotations.Test; + +import static com.facebook.presto.SystemSessionProperties.QUERY_MAX_REVOCABLE_MEMORY_PER_NODE; +import static com.facebook.presto.SystemSessionProperties.WINDOW_SPILL_ENABLED; public class TestSpilledWindowQueries extends AbstractTestWindowQueries @@ -24,4 +29,27 @@ protected QueryRunner createQueryRunner() { return TestDistributedSpilledQueries.localCreateQueryRunner(); } + + @Test + public void testDoesNotSpillWhenWindowSpillDisabled() + { + Session session = Session.builder(getSession()) + .setSystemProperty(WINDOW_SPILL_ENABLED, "false") + // set this low so that if we ran with spill the query would fail + .setSystemProperty(QUERY_MAX_REVOCABLE_MEMORY_PER_NODE, "1B") + .build(); + + assertQuery(session, + "SELECT orderkey, orderstatus " + + ", row_number() OVER (ORDER BY orderkey * 2) * " + + " row_number() OVER (ORDER BY orderkey DESC) + 100 " + + "FROM (SELECT * FROM orders ORDER BY orderkey LIMIT 10) x " + + "ORDER BY orderkey LIMIT 5", + "VALUES " + + "(1, 'O', 110), " + + "(2, 'O', 118), " + + "(3, 'F', 124), " + + "(4, 'O', 128), " + + "(5, 'F', 130)"); + } }