diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index d2d83140f66ab..5fc0f89963de9 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -66,6 +66,7 @@ public final class SystemSessionProperties public static final String HASH_PARTITION_COUNT = "hash_partition_count"; public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog"; public static final String EXCHANGE_MATERIALIZATION_STRATEGY = "exchange_materialization_strategy"; + public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct"; public static final String GROUPED_EXECUTION_FOR_AGGREGATION = "grouped_execution_for_aggregation"; public static final String GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS = "grouped_execution_for_eligible_table_scans"; public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution"; @@ -228,6 +229,11 @@ public SystemSessionProperties( false, value -> ExchangeMaterializationStrategy.valueOf(((String) value).toUpperCase()), ExchangeMaterializationStrategy::name), + booleanProperty( + USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, + "Use streaming instead of materialization for mark distinct with materialized exchange enabled", + queryManagerConfig.getUseStreamingExchangeForMarkDistinct(), + false), booleanProperty( GROUPED_EXECUTION_FOR_AGGREGATION, "Use grouped execution for aggregation when possible", @@ -808,6 +814,11 @@ public static ExchangeMaterializationStrategy getExchangeMaterializationStrategy return session.getSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.class); } + public static boolean isUseStreamingExchangeForMarkDistinctEnabled(Session session) + { + return session.getSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, Boolean.class); + } + public static boolean isGroupedExecutionForAggregationEnabled(Session session) { return session.getSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, Boolean.class); diff --git a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java index 1b7dfe6e35c46..d59cf8877f7d7 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/QueryManagerConfig.java @@ -44,6 +44,7 @@ public class QueryManagerConfig private int hashPartitionCount = 100; private String partitioningProviderCatalog = GlobalSystemConnector.NAME; private ExchangeMaterializationStrategy exchangeMaterializationStrategy = ExchangeMaterializationStrategy.NONE; + private boolean useStreamingExchangeForMarkDistinct; private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES); private int maxQueryHistory = 100; private int maxQueryLength = 1_000_000; @@ -159,6 +160,20 @@ public ExchangeMaterializationStrategy getExchangeMaterializationStrategy() return exchangeMaterializationStrategy; } + @Config("query.use-streaming-exchange-for-mark-distinct") + @ConfigDescription("Use streaming instead of materialization with mark distinct when materialized exchange is enabled") + public QueryManagerConfig setUseStreamingExchangeForMarkDistinct(boolean useStreamingExchangeForMarkDistinct) + { + this.useStreamingExchangeForMarkDistinct = useStreamingExchangeForMarkDistinct; + return this; + } + + @NotNull + public boolean getUseStreamingExchangeForMarkDistinct() + { + return useStreamingExchangeForMarkDistinct; + } + @Config("query.exchange-materialization-strategy") @ConfigDescription("The exchange materialization strategy to use") public QueryManagerConfig setExchangeMaterializationStrategy(ExchangeMaterializationStrategy exchangeMaterializationStrategy) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index e5ac56ceac460..31a018005a929 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -107,6 +107,7 @@ import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; +import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled; import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators; import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT; import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference; @@ -321,7 +322,9 @@ public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProp child = withDerivedProperties( partitionedExchange( idAllocator.getNextId(), - selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false), + isUseStreamingExchangeForMarkDistinctEnabled(session) ? + REMOTE_STREAMING : + selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false), child.getNode(), createPartitioning(node.getDistinctVariables()), node.getHashVariable()), diff --git a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java index d0bb97c68d134..c57bd1bea7f22 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/TestQueryManagerConfig.java @@ -53,7 +53,8 @@ public void testDefaults() .setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS)) .setRequiredWorkers(1) .setRequiredWorkersMaxWait(new Duration(5, TimeUnit.MINUTES)) - .setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2)); + .setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2) + .setUseStreamingExchangeForMarkDistinct(false)); } @Test @@ -83,6 +84,7 @@ public void testExplicitPropertyMappings() .put("query.max-run-time", "2h") .put("query.max-execution-time", "3h") .put("query.max-cpu-time", "2d") + .put("query.use-streaming-exchange-for-mark-distinct", "true") .put("query-manager.required-workers", "333") .put("query-manager.required-workers-max-wait", "33m") .put("query-manager.query-submission-max-threads", "5") @@ -114,7 +116,8 @@ public void testExplicitPropertyMappings() .setQueryMaxCpuTime(new Duration(2, TimeUnit.DAYS)) .setRequiredWorkers(333) .setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES)) - .setQuerySubmissionMaxThreads(5); + .setQuerySubmissionMaxThreads(5) + .setUseStreamingExchangeForMarkDistinct(true); ConfigAssertions.assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java index e832b2b4ec4f9..e4f4a262bae8e 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java @@ -15,9 +15,7 @@ package com.facebook.presto.sql.planner.optimizations; import com.facebook.presto.Session; -import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.spi.plan.AggregationNode; -import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.assertions.BasePlanTest; import com.facebook.presto.sql.planner.assertions.ExpectedValueProvider; @@ -34,8 +32,13 @@ import java.util.function.BiConsumer; import static com.facebook.presto.SystemSessionProperties.AGGREGATION_PARTITIONING_MERGING_STRATEGY; +import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY; +import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY; import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY; +import static com.facebook.presto.SystemSessionProperties.USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT; +import static com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy.ALL; import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; @@ -49,6 +52,7 @@ import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; @@ -415,6 +419,49 @@ public void testMarkDistinctIsExactlyPartitioned() "orderdate", "orderdate")))))))))); } + @Test + public void testMarkDistinctStreamingExchange() + { + assertMaterializedWithStreamingMarkDistinctDistributedPlan( + " SELECT\n" + + " orderkey,\n" + + " orderstatus,\n" + + " COUNT(DISTINCT orderdate),\n" + + " COUNT(DISTINCT clerk)\n" + + " FROM orders\n" + + " WHERE\n" + + " orderdate > CAST('2042-01-01' AS DATE)\n" + + " GROUP BY\n" + + " orderkey,\n" + + " orderstatus\n", + anyTree( + exchange(REMOTE_MATERIALIZED, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "orderstatus", "orderstatus", + "orderkey", "orderkey", + "clerk", "clerk", + "orderdate", "orderdate")))))))))); + } + + void assertMaterializedWithStreamingMarkDistinctDistributedPlan(String sql, PlanMatchPattern pattern) + { + assertDistributedPlan( + sql, + TestingSession.testSessionBuilder() + .setCatalog("local") + .setSchema("tiny") + .setSystemProperty(PARTITIONING_PRECISION_STRATEGY, PREFER_EXACT_PARTITIONING.toString()) + .setSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ALL.toString()) + .setSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, "true") + .build(), + pattern); + } + void assertExactDistributedPlan(String sql, PlanMatchPattern pattern) { assertDistributedPlan( @@ -422,9 +469,7 @@ void assertExactDistributedPlan(String sql, PlanMatchPattern pattern) TestingSession.testSessionBuilder() .setCatalog("local") .setSchema("tiny") - .setSystemProperty( - SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY, - FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING.toString()) + .setSystemProperty(PARTITIONING_PRECISION_STRATEGY, PREFER_EXACT_PARTITIONING.toString()) .build(), pattern); }