From 10bb97162169d44ad8269a3f5be3346736cb5468 Mon Sep 17 00:00:00 2001 From: Ariel Weisberg Date: Wed, 4 Mar 2020 11:54:57 -0500 Subject: [PATCH 1/2] Use static imports in TestAddExchangePlans --- .../sql/planner/optimizations/TestAddExchangesPlans.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java index e832b2b4ec4f9..55c411dc56fbc 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java @@ -15,9 +15,7 @@ package com.facebook.presto.sql.planner.optimizations; import com.facebook.presto.Session; -import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.spi.plan.AggregationNode; -import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.assertions.BasePlanTest; import com.facebook.presto.sql.planner.assertions.ExpectedValueProvider; @@ -34,7 +32,9 @@ import java.util.function.BiConsumer; import static com.facebook.presto.SystemSessionProperties.AGGREGATION_PARTITIONING_MERGING_STRATEGY; +import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING; import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol; @@ -422,9 +422,7 @@ void assertExactDistributedPlan(String sql, PlanMatchPattern pattern) TestingSession.testSessionBuilder() .setCatalog("local") .setSchema("tiny") - .setSystemProperty( - SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY, - FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING.toString()) + .setSystemProperty(PARTITIONING_PRECISION_STRATEGY, PREFER_EXACT_PARTITIONING.toString()) .build(), pattern); } From 1e124c4aa1de49345d33c8df382a089d7f5a710b Mon Sep 17 00:00:00 2001 From: Ariel Weisberg Date: Thu, 5 Mar 2020 12:05:46 -0500 Subject: [PATCH 2/2] Allow forcing streaming exchange for Mark Distinct When MarkDistinctOperator executes queries with distincted aggregation columns it requires an exchange per column, which is which is prohibitively expensive for materialized exchange. Add "query.use-stream-exchange-for-mark-distinct" to force streaming for mark distinct even if materialized exchange is enabled. --- .../presto/SystemSessionProperties.java | 11 +++++ .../presto/execution/QueryManagerConfig.java | 15 ++++++ .../planner/optimizations/AddExchanges.java | 5 +- .../execution/TestQueryManagerConfig.java | 7 ++- .../optimizations/TestAddExchangesPlans.java | 49 ++++++++++++++++++- 5 files changed, 83 insertions(+), 4 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index d2d83140f66ab..5fc0f89963de9 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -66,6 +66,7 @@ public final class SystemSessionProperties public static final String HASH_PARTITION_COUNT = "hash_partition_count"; public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog"; public static final String EXCHANGE_MATERIALIZATION_STRATEGY = "exchange_materialization_strategy"; + public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct"; public static final String GROUPED_EXECUTION_FOR_AGGREGATION = "grouped_execution_for_aggregation"; public static final String GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS = "grouped_execution_for_eligible_table_scans"; public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution"; @@ -228,6 +229,11 @@ public SystemSessionProperties( false, value -> ExchangeMaterializationStrategy.valueOf(((String) value).toUpperCase()), ExchangeMaterializationStrategy::name), + booleanProperty( + USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, + "Use streaming instead of materialization for mark distinct with materialized exchange enabled", + queryManagerConfig.getUseStreamingExchangeForMarkDistinct(), + false), booleanProperty( GROUPED_EXECUTION_FOR_AGGREGATION, "Use grouped execution for aggregation when possible", @@ -808,6 +814,11 @@ public static ExchangeMaterializationStrategy getExchangeMaterializationStrategy return session.getSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.class); } + public static boolean isUseStreamingExchangeForMarkDistinctEnabled(Session session) + { + return session.getSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, Boolean.class); + } + public static boolean isGroupedExecutionForAggregationEnabled(Session session) { return session.getSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 1b7dfe6e35c46..d59cf8877f7d7 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -44,6 +44,7 @@ public class QueryManagerConfig private int hashPartitionCount = 100; private String partitioningProviderCatalog = GlobalSystemConnector.NAME; private ExchangeMaterializationStrategy exchangeMaterializationStrategy = ExchangeMaterializationStrategy.NONE; + private boolean useStreamingExchangeForMarkDistinct; private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES); private int maxQueryHistory = 100; private int maxQueryLength = 1_000_000; @@ -159,6 +160,20 @@ public ExchangeMaterializationStrategy getExchangeMaterializationStrategy() return exchangeMaterializationStrategy; } + @Config("query.use-streaming-exchange-for-mark-distinct") + @ConfigDescription("Use streaming instead of materialization with mark distinct when materialized exchange is enabled") + public QueryManagerConfig setUseStreamingExchangeForMarkDistinct(boolean useStreamingExchangeForMarkDistinct) + { + this.useStreamingExchangeForMarkDistinct = useStreamingExchangeForMarkDistinct; + return this; + } + + @NotNull + public boolean getUseStreamingExchangeForMarkDistinct() + { + return useStreamingExchangeForMarkDistinct; + } + @Config("query.exchange-materialization-strategy") @ConfigDescription("The exchange materialization strategy to use") public QueryManagerConfig setExchangeMaterializationStrategy(ExchangeMaterializationStrategy exchangeMaterializationStrategy) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index e5ac56ceac460..31a018005a929 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -107,6 +107,7 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; +import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled; import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference; @@ -321,7 +322,9 @@ public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProp child = withDerivedProperties( partitionedExchange( idAllocator.getNextId(), - selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false), + isUseStreamingExchangeForMarkDistinctEnabled(session) ? + REMOTE_STREAMING : + selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false), child.getNode(), createPartitioning(node.getDistinctVariables()), node.getHashVariable()), diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index d0bb97c68d134..c57bd1bea7f22 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -53,7 +53,8 @@ public void testDefaults() .setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS)) .setRequiredWorkers(1) .setRequiredWorkersMaxWait(new Duration(5, TimeUnit.MINUTES)) - .setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2)); + .setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2) + .setUseStreamingExchangeForMarkDistinct(false)); } @Test @@ -83,6 +84,7 @@ public void testExplicitPropertyMappings() .put("query.max-run-time", "2h") .put("query.max-execution-time", "3h") .put("query.max-cpu-time", "2d") + .put("query.use-streaming-exchange-for-mark-distinct", "true") .put("query-manager.required-workers", "333") .put("query-manager.required-workers-max-wait", "33m") .put("query-manager.query-submission-max-threads", "5") @@ -114,7 +116,8 @@ public void testExplicitPropertyMappings() .setQueryMaxCpuTime(new Duration(2, TimeUnit.DAYS)) .setRequiredWorkers(333) .setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES)) - .setQuerySubmissionMaxThreads(5); + .setQuerySubmissionMaxThreads(5) + .setUseStreamingExchangeForMarkDistinct(true); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java index 55c411dc56fbc..e4f4a262bae8e 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java @@ -32,10 +32,13 @@ import java.util.function.BiConsumer; import static com.facebook.presto.SystemSessionProperties.AGGREGATION_PARTITIONING_MERGING_STRATEGY; +import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY; -import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING; +import static com.facebook.presto.SystemSessionProperties.USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT; +import static com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy.ALL; import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; @@ -49,6 +52,7 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; @@ -415,6 +419,49 @@ public void testMarkDistinctIsExactlyPartitioned() "orderdate", "orderdate")))))))))); } + @Test + public void testMarkDistinctStreamingExchange() + { + assertMaterializedWithStreamingMarkDistinctDistributedPlan( + " SELECT\n" + + " orderkey,\n" + + " orderstatus,\n" + + " COUNT(DISTINCT orderdate),\n" + + " COUNT(DISTINCT clerk)\n" + + " FROM orders\n" + + " WHERE\n" + + " orderdate > CAST('2042-01-01' AS DATE)\n" + + " GROUP BY\n" + + " orderkey,\n" + + " orderstatus\n", + anyTree( + exchange(REMOTE_MATERIALIZED, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "orderstatus", "orderstatus", + "orderkey", "orderkey", + "clerk", "clerk", + "orderdate", "orderdate")))))))))); + } + + void assertMaterializedWithStreamingMarkDistinctDistributedPlan(String sql, PlanMatchPattern pattern) + { + assertDistributedPlan( + sql, + TestingSession.testSessionBuilder() + .setCatalog("local") + .setSchema("tiny") + .setSystemProperty(PARTITIONING_PRECISION_STRATEGY, PREFER_EXACT_PARTITIONING.toString()) + .setSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ALL.toString()) + .setSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, "true") + .build(), + pattern); + } + void assertExactDistributedPlan(String sql, PlanMatchPattern pattern) { assertDistributedPlan(