Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class SystemSessionProperties
public static final String HASH_PARTITION_COUNT = "hash_partition_count";
public static final String PARTITIONING_PROVIDER_CATALOG = "partitioning_provider_catalog";
public static final String EXCHANGE_MATERIALIZATION_STRATEGY = "exchange_materialization_strategy";
public static final String USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT = "use_stream_exchange_for_mark_distinct";
public static final String GROUPED_EXECUTION_FOR_AGGREGATION = "grouped_execution_for_aggregation";
public static final String GROUPED_EXECUTION_FOR_ELIGIBLE_TABLE_SCANS = "grouped_execution_for_eligible_table_scans";
public static final String DYNAMIC_SCHEDULE_FOR_GROUPED_EXECUTION = "dynamic_schedule_for_grouped_execution";
Expand Down Expand Up @@ -228,6 +229,11 @@ public SystemSessionProperties(
false,
value -> ExchangeMaterializationStrategy.valueOf(((String) value).toUpperCase()),
ExchangeMaterializationStrategy::name),
booleanProperty(
USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT,
"Use streaming instead of materialization for mark distinct with materialized exchange enabled",
queryManagerConfig.getUseStreamingExchangeForMarkDistinct(),
false),
booleanProperty(
GROUPED_EXECUTION_FOR_AGGREGATION,
"Use grouped execution for aggregation when possible",
Expand Down Expand Up @@ -808,6 +814,11 @@ public static ExchangeMaterializationStrategy getExchangeMaterializationStrategy
return session.getSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.class);
}

public static boolean isUseStreamingExchangeForMarkDistinctEnabled(Session session)
{
return session.getSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, Boolean.class);
}

public static boolean isGroupedExecutionForAggregationEnabled(Session session)
{
return session.getSystemProperty(GROUPED_EXECUTION_FOR_AGGREGATION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class QueryManagerConfig
private int hashPartitionCount = 100;
private String partitioningProviderCatalog = GlobalSystemConnector.NAME;
private ExchangeMaterializationStrategy exchangeMaterializationStrategy = ExchangeMaterializationStrategy.NONE;
private boolean useStreamingExchangeForMarkDistinct;
private Duration minQueryExpireAge = new Duration(15, TimeUnit.MINUTES);
private int maxQueryHistory = 100;
private int maxQueryLength = 1_000_000;
Expand Down Expand Up @@ -159,6 +160,20 @@ public ExchangeMaterializationStrategy getExchangeMaterializationStrategy()
return exchangeMaterializationStrategy;
}

@Config("query.use-streaming-exchange-for-mark-distinct")
@ConfigDescription("Use streaming instead of materialization with mark distinct when materialized exchange is enabled")
public QueryManagerConfig setUseStreamingExchangeForMarkDistinct(boolean useStreamingExchangeForMarkDistinct)
{
this.useStreamingExchangeForMarkDistinct = useStreamingExchangeForMarkDistinct;
return this;
}

@NotNull
public boolean getUseStreamingExchangeForMarkDistinct()
{
return useStreamingExchangeForMarkDistinct;
}

@Config("query.exchange-materialization-strategy")
@ConfigDescription("The exchange materialization strategy to use")
public QueryManagerConfig setExchangeMaterializationStrategy(ExchangeMaterializationStrategy exchangeMaterializationStrategy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites;
import static com.facebook.presto.SystemSessionProperties.isScaleWriters;
import static com.facebook.presto.SystemSessionProperties.isUseStreamingExchangeForMarkDistinctEnabled;
import static com.facebook.presto.SystemSessionProperties.preferStreamingOperators;
import static com.facebook.presto.expressions.LogicalRowExpressions.TRUE_CONSTANT;
import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference;
Expand Down Expand Up @@ -321,7 +322,9 @@ public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProp
child = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false),
isUseStreamingExchangeForMarkDistinctEnabled(session) ?
REMOTE_STREAMING :
selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false),
child.getNode(),
createPartitioning(node.getDistinctVariables()),
node.getHashVariable()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void testDefaults()
.setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS))
.setRequiredWorkers(1)
.setRequiredWorkersMaxWait(new Duration(5, TimeUnit.MINUTES))
.setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2));
.setQuerySubmissionMaxThreads(Runtime.getRuntime().availableProcessors() * 2)
.setUseStreamingExchangeForMarkDistinct(false));
}

@Test
Expand Down Expand Up @@ -83,6 +84,7 @@ public void testExplicitPropertyMappings()
.put("query.max-run-time", "2h")
.put("query.max-execution-time", "3h")
.put("query.max-cpu-time", "2d")
.put("query.use-streaming-exchange-for-mark-distinct", "true")
.put("query-manager.required-workers", "333")
.put("query-manager.required-workers-max-wait", "33m")
.put("query-manager.query-submission-max-threads", "5")
Expand Down Expand Up @@ -114,7 +116,8 @@ public void testExplicitPropertyMappings()
.setQueryMaxCpuTime(new Duration(2, TimeUnit.DAYS))
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES))
.setQuerySubmissionMaxThreads(5);
.setQuerySubmissionMaxThreads(5)
.setUseStreamingExchangeForMarkDistinct(true);

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.assertions.BasePlanTest;
import com.facebook.presto.sql.planner.assertions.ExpectedValueProvider;
Expand All @@ -34,8 +32,13 @@
import java.util.function.BiConsumer;

import static com.facebook.presto.SystemSessionProperties.AGGREGATION_PARTITIONING_MERGING_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY;
import static com.facebook.presto.SystemSessionProperties.USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT;
import static com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy.ALL;
import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
Expand All @@ -49,6 +52,7 @@
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values;
import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_MATERIALIZED;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
Expand Down Expand Up @@ -415,16 +419,57 @@ public void testMarkDistinctIsExactlyPartitioned()
"orderdate", "orderdate"))))))))));
}

@Test
public void testMarkDistinctStreamingExchange()
{
assertMaterializedWithStreamingMarkDistinctDistributedPlan(
" SELECT\n" +
" orderkey,\n" +
" orderstatus,\n" +
" COUNT(DISTINCT orderdate),\n" +
" COUNT(DISTINCT clerk)\n" +
" FROM orders\n" +
" WHERE\n" +
" orderdate > CAST('2042-01-01' AS DATE)\n" +
" GROUP BY\n" +
" orderkey,\n" +
" orderstatus\n",
anyTree(
exchange(REMOTE_MATERIALIZED, REPARTITION,
anyTree(
exchange(REMOTE_STREAMING, REPARTITION,
anyTree(
exchange(REMOTE_STREAMING, REPARTITION,
anyTree(
tableScan("orders", ImmutableMap.of(
"orderstatus", "orderstatus",
"orderkey", "orderkey",
"clerk", "clerk",
"orderdate", "orderdate"))))))))));
}

void assertMaterializedWithStreamingMarkDistinctDistributedPlan(String sql, PlanMatchPattern pattern)
{
assertDistributedPlan(
sql,
TestingSession.testSessionBuilder()
.setCatalog("local")
.setSchema("tiny")
.setSystemProperty(PARTITIONING_PRECISION_STRATEGY, PREFER_EXACT_PARTITIONING.toString())
.setSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ALL.toString())
.setSystemProperty(USE_STREAMING_EXCHANGE_FOR_MARK_DISTINCT, "true")
.build(),
pattern);
}

void assertExactDistributedPlan(String sql, PlanMatchPattern pattern)
{
assertDistributedPlan(
sql,
TestingSession.testSessionBuilder()
.setCatalog("local")
.setSchema("tiny")
.setSystemProperty(
SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY,
FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING.toString())
.setSystemProperty(PARTITIONING_PRECISION_STRATEGY, PREFER_EXACT_PARTITIONING.toString())
.build(),
pattern);
}
Expand Down