From 6f8a09efe15a6526c4e5a056bd0b59529ade7e25 Mon Sep 17 00:00:00 2001 From: Ariel Weisberg Date: Fri, 6 Sep 2019 17:08:23 -0400 Subject: [PATCH 1/3] Add partitioning_precision_strategy session property --- .../presto/SystemSessionProperties.java | 22 +- .../presto/sql/analyzer/FeaturesConfig.java | 23 ++ .../presto/sql/planner/Partitioning.java | 21 ++ .../optimizations/ActualProperties.java | 36 ++- .../planner/optimizations/AddExchanges.java | 68 +++-- .../sql/analyzer/TestFeaturesConfig.java | 8 +- .../optimizations/TestAddExchangesPlans.java | 248 ++++++++++++++++++ 7 files changed, 391 insertions(+), 35 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index a524bc8f92ff6..01f01adcd0236 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -24,6 +24,7 @@ import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType; import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy; +import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy; import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -138,6 +139,7 @@ public final class SystemSessionProperties public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning"; public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy"; public static final String LIST_BUILT_IN_FUNCTIONS_ONLY = "list_built_in_functions_only"; + public static final String PARTITIONING_PRECISION_STRATEGY = "partitioning_precision_strategy"; private final List> sessionProperties; @@ -684,7 +686,19 @@ public SystemSessionProperties( LIST_BUILT_IN_FUNCTIONS_ONLY, "Only List built-in functions in SHOW FUNCTIONS", featuresConfig.isListBuiltInFunctionsOnly(), - false)); + false), + new PropertyMetadata<>( + PARTITIONING_PRECISION_STRATEGY, + format("The strategy to use to pick when to repartition. Options are %s", + Stream.of(PartitioningPrecisionStrategy.values()) + .map(PartitioningPrecisionStrategy::name) + .collect(joining(","))), + VARCHAR, + PartitioningPrecisionStrategy.class, + featuresConfig.getPartitioningPrecisionStrategy(), + false, + value -> PartitioningPrecisionStrategy.valueOf(((String) value).toUpperCase()), + PartitioningPrecisionStrategy::name)); } public List> getSessionProperties() @@ -1164,4 +1178,10 @@ public static boolean isListBuiltInFunctionsOnly(Session session) { return session.getSystemProperty(LIST_BUILT_IN_FUNCTIONS_ONLY, Boolean.class); } + + public static boolean isExactPartitioningPreferred(Session session) + { + return session.getSystemProperty(PARTITIONING_PRECISION_STRATEGY, PartitioningPrecisionStrategy.class) + == PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 89af54f7fd6dd..e61da7adf0e50 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -147,6 +147,16 @@ public class FeaturesConfig private boolean listBuiltInFunctionsOnly = true; + private PartitioningPrecisionStrategy partitioningPrecisionStrategy = PartitioningPrecisionStrategy.AUTOMATIC; + + public enum PartitioningPrecisionStrategy + { + // Let Presto decide when to repartition + AUTOMATIC, + // Use exact partitioning until Presto becomes smarter WRT to picking when to repartition + PREFER_EXACT_PARTITIONING + } + public enum JoinReorderingStrategy { NONE, @@ -1146,4 +1156,17 @@ public FeaturesConfig setListBuiltInFunctionsOnly(boolean listBuiltInFunctionsOn this.listBuiltInFunctionsOnly = listBuiltInFunctionsOnly; return this; } + + public PartitioningPrecisionStrategy getPartitioningPrecisionStrategy() + { + return partitioningPrecisionStrategy; + } + + @Config("partitioning-precision-strategy") + @ConfigDescription("Set strategy used to determine whether to repartition (AUTOMATIC, PREFER_EXACT)") + public FeaturesConfig setPartitioningPrecisionStrategy(PartitioningPrecisionStrategy partitioningPrecisionStrategy) + { + this.partitioningPrecisionStrategy = partitioningPrecisionStrategy; + return this; + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java index 5ab78e8bef8d1..8bf1c78a41a4f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java @@ -30,6 +30,7 @@ import javax.annotation.concurrent.Immutable; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -241,6 +242,26 @@ public boolean isPartitionedOn(Collection columns, return true; } + public boolean isPartitionedOnExactly(Collection columns, Set knownConstants) + { + Set toCheck = new HashSet<>(); + for (RowExpression argument : arguments) { + // partitioned on (k_1, k_2, ..., k_n) => partitioned on (k_1, k_2, ..., k_n, k_n+1, ...) + // can safely ignore all constant columns when comparing partition properties + if (argument instanceof ConstantExpression) { + continue; + } + if (!(argument instanceof VariableReferenceExpression)) { + return false; + } + if (knownConstants.contains(argument)) { + continue; + } + toCheck.add((VariableReferenceExpression) argument); + } + return ImmutableSet.copyOf(columns).equals(toCheck); + } + public boolean isEffectivelySinglePartition(Set knownConstants) { return isPartitionedOn(ImmutableSet.of(), knownConstants); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java index 7e807c80c7f0e..8166018c33790 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java @@ -105,24 +105,34 @@ public boolean isNullsAndAnyReplicated() return global.isNullsAndAnyReplicated(); } - public boolean isStreamPartitionedOn(Collection columns) + public boolean isStreamPartitionedOn(Collection columns, boolean exactly) { - return isStreamPartitionedOn(columns, false); + return isStreamPartitionedOn(columns, false, exactly); } - public boolean isStreamPartitionedOn(Collection columns, boolean nullsAndAnyReplicated) + public boolean isStreamPartitionedOn(Collection columns, boolean nullsAndAnyReplicated, boolean exactly) { - return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated); + if (exactly) { + return global.isStreamPartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated); + } + else { + return global.isStreamPartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated); + } } - public boolean isNodePartitionedOn(Collection columns) + public boolean isNodePartitionedOn(Collection columns, boolean exactly) { - return isNodePartitionedOn(columns, false); + return isNodePartitionedOn(columns, false, exactly); } - public boolean isNodePartitionedOn(Collection columns, boolean nullsAndAnyReplicated) + public boolean isNodePartitionedOn(Collection columns, boolean nullsAndAnyReplicated, boolean exactly) { - return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated); + if (exactly) { + return global.isNodePartitionedOnExactly(columns, constants.keySet(), nullsAndAnyReplicated); + } + else { + return global.isNodePartitionedOn(columns, constants.keySet(), nullsAndAnyReplicated); + } } @Deprecated @@ -471,6 +481,11 @@ private boolean isNodePartitionedOn(Collection colu return nodePartitioning.isPresent() && nodePartitioning.get().isPartitionedOn(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; } + private boolean isNodePartitionedOnExactly(Collection columns, Set constants, boolean nullsAndAnyReplicated) + { + return nodePartitioning.isPresent() && nodePartitioning.get().isPartitionedOnExactly(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; + } + private boolean isCompatibleTablePartitioningWith(Partitioning partitioning, boolean nullsAndAnyReplicated, Metadata metadata, Session session) { return nodePartitioning.isPresent() && nodePartitioning.get().isCompatibleWith(partitioning, metadata, session) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; @@ -531,6 +546,11 @@ private boolean isStreamPartitionedOn(Collection co return streamPartitioning.isPresent() && streamPartitioning.get().isPartitionedOn(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; } + private boolean isStreamPartitionedOnExactly(Collection columns, Set constants, boolean nullsAndAnyReplicated) + { + return streamPartitioning.isPresent() && streamPartitioning.get().isPartitionedOnExactly(columns, constants) && this.nullsAndAnyReplicated == nullsAndAnyReplicated; + } + /** * @return true if all the data will effectively land in a single stream */ diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index a3ce8d99f27c9..a80aa4e2380a2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -36,7 +36,7 @@ import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.spi.type.Type; -import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy; import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.planner.ExpressionDomainTranslator; @@ -103,6 +103,7 @@ import static com.facebook.presto.SystemSessionProperties.isColocatedJoinEnabled; import static com.facebook.presto.SystemSessionProperties.isDistributedIndexJoinEnabled; import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled; +import static com.facebook.presto.SystemSessionProperties.isExactPartitioningPreferred; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRedistributeWrites; import static com.facebook.presto.SystemSessionProperties.isScaleWriters; @@ -249,9 +250,9 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper // If aggregation has a mixed of non-global and global grouping set, an repartition exchange is any way needed to eliminate duplicate default outputs // from partial aggregations (enforced in `ValidateAggregationWithDefaultValues.java`). Therefore, we don't have preference on what the child will return. if (!node.getGroupingKeys().isEmpty() && !hasMixedGroupingSets) { - FeaturesConfig.AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = getAggregationPartitioningMergingStrategy(session); + AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = getAggregationPartitioningMergingStrategy(session); preferredProperties = PreferredProperties.partitionedWithLocal(partitioningRequirement, grouped(node.getGroupingKeys())) - .mergeWithParent(parentPreferredProperties, aggregationPartitioningMergingStrategy.isMergingWithParent()); + .mergeWithParent(parentPreferredProperties, shouldAggregationMergePartitionPreferences(aggregationPartitioningMergingStrategy)); if (aggregationPartitioningMergingStrategy.isAdoptingMergedPreference()) { checkState(preferredProperties.getGlobalProperties().isPresent() && preferredProperties.getGlobalProperties().get().getPartitioningProperties().isPresent()); @@ -272,7 +273,7 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper child.getProperties()); } else if (hasMixedGroupingSets - || !child.getProperties().isStreamPartitionedOn(partitioningRequirement) && !child.getProperties().isNodePartitionedOn(partitioningRequirement)) { + || !isStreamPartitionedOn(child.getProperties(), partitioningRequirement) && !isNodePartitionedOn(child.getProperties(), partitioningRequirement)) { child = withDerivedProperties( partitionedExchange( idAllocator.getNextId(), @@ -312,11 +313,11 @@ private Function partitionedExchange( idAllocator.getNextId(), selectExchangeScopeForPartitionedRemoteExchange(partial, false), @@ -429,8 +430,8 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredPr } PlanWithProperties child = planChild(node, preferredChildProperties); - if (!child.getProperties().isStreamPartitionedOn(node.getPartitionBy()) - && !child.getProperties().isNodePartitionedOn(node.getPartitionBy())) { + if (!isStreamPartitionedOn(child.getProperties(), node.getPartitionBy()) + && !isNodePartitionedOn(child.getProperties(), node.getPartitionBy())) { // add exchange + push function to child child = withDerivedProperties( new TopNRowNumberNode( @@ -719,7 +720,7 @@ public PlanWithProperties visitJoin(JoinNode node, PreferredProperties preferred // use partitioned join if probe side is naturally partitioned on join symbols (e.g: because of aggregation) if (!node.getCriteria().isEmpty() - && left.getProperties().isNodePartitionedOn(leftVariables) && !left.getProperties().isSingleNode()) { + && isNodePartitionedOn(left.getProperties(), leftVariables) && !left.getProperties().isSingleNode()) { return planPartitionedJoin(node, leftVariables, rightVariables, left); } @@ -742,7 +743,7 @@ private PlanWithProperties planPartitionedJoin(JoinNode node, List // Disable repartitioning if it would disrupt a parent's partitioning preference when streaming is enabled boolean parentAlreadyPartitionedOnChild = parentPartitioningPreferences - .map(partitioning -> probeProperties.isStreamPartitionedOn(partitioning.getPartitioningColumns())) + .map(partitioning -> probeProperties.isStreamPartitionedOn(partitioning.getPartitioningColumns(), false)) .orElse(false); if (preferStreamingOperators && parentAlreadyPartitionedOnChild) { return false; } // Otherwise, repartition if we need to align with the join columns - if (!probeProperties.isStreamPartitionedOn(joinColumns)) { + if (!probeProperties.isStreamPartitionedOn(joinColumns, false)) { return true; } @@ -1086,7 +1087,7 @@ private Partitioning selectUnionPartitioning(UnionNode node, PartitioningPropert // Don't select a single node partitioning so that we maintain query parallelism // Theoretically, if all children are single partitioned on the same node we could choose a single // partitioning, but as this only applies to a union of two values nodes, it isn't worth the added complexity - if (child.getProperties().isNodePartitionedOn(childPartitioning.getPartitioningColumns(), nullsAndAnyReplicated) && !child.getProperties().isSingleNode()) { + if (child.getProperties().isNodePartitionedOn(childPartitioning.getPartitioningColumns(), nullsAndAnyReplicated, isExactPartitioningPreferred(session)) && !child.getProperties().isSingleNode()) { Function> childToParent = createTranslator(createMapping( node.sourceOutputLayout(sourceIndex), node.getOutputVariables())); @@ -1390,6 +1391,25 @@ private Scope selectExchangeScopeForPartitionedRemoteExchange(PlanNode exchangeS throw new IllegalStateException("Unexpected exchange materialization strategy: " + exchangeMaterializationStrategy); } } + + private boolean isNodePartitionedOn(ActualProperties properties, Collection columns) + { + return properties.isNodePartitionedOn(columns, isExactPartitioningPreferred(session)); + } + + private boolean isStreamPartitionedOn(ActualProperties properties, Collection columns) + { + return properties.isStreamPartitionedOn(columns, isExactPartitioningPreferred(session)); + } + + private boolean shouldAggregationMergePartitionPreferences(AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy) + { + if (isExactPartitioningPreferred(session)) { + return false; + } + + return aggregationPartitioningMergingStrategy.isMergingWithParent(); + } } private boolean canPushdownPartialMerge(PlanNode node, PartialMergePushdownStrategy strategy) @@ -1477,7 +1497,7 @@ private static boolean meetsPartitioningRequirements(PreferredProperties preferr if (!preferredGlobal.getPartitioningProperties().isPresent()) { return !actual.isSingleNode(); } - return actual.isStreamPartitionedOn(preferredGlobal.getPartitioningProperties().get().getPartitioningColumns()); + return actual.isStreamPartitionedOn(preferredGlobal.getPartitioningProperties().get().getPartitioningColumns(), false); } // Prefer the match result that satisfied the most requirements diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index f0a0a4dc9b806..637f561d016d6 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -18,6 +18,7 @@ import com.facebook.presto.operator.aggregation.arrayagg.ArrayAggGroupImplementation; import com.facebook.presto.operator.aggregation.histogram.HistogramGroupImplementation; import com.facebook.presto.operator.aggregation.multimapagg.MultimapAggGroupImplementation; +import com.facebook.presto.sql.analyzer.FeaturesConfig.PartitioningPrecisionStrategy; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -125,7 +126,8 @@ public void testDefaults() .setOptimizeFullOuterJoinWithCoalesce(true) .setIndexLoaderTimeout(new Duration(20, SECONDS)) .setOptimizedRepartitioningEnabled(false) - .setListBuiltInFunctionsOnly(true)); + .setListBuiltInFunctionsOnly(true) + .setPartitioningPrecisionStrategy(PartitioningPrecisionStrategy.AUTOMATIC)); } @Test @@ -208,6 +210,7 @@ public void testExplicitPropertyMappings() .put("index-loader-timeout", "10s") .put("experimental.optimized-repartitioning", "true") .put("list-built-in-functions-only", "false") + .put("partitioning-precision-strategy", "PREFER_EXACT_PARTITIONING") .build(); FeaturesConfig expected = new FeaturesConfig() @@ -286,7 +289,8 @@ public void testExplicitPropertyMappings() .setOptimizeFullOuterJoinWithCoalesce(false) .setIndexLoaderTimeout(new Duration(10, SECONDS)) .setOptimizedRepartitioningEnabled(true) - .setListBuiltInFunctionsOnly(false); + .setListBuiltInFunctionsOnly(false) + .setPartitioningPrecisionStrategy(PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING); assertFullMapping(properties, expected); } diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java index e6bea2d209d8b..e832b2b4ec4f9 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java @@ -15,29 +15,40 @@ package com.facebook.presto.sql.planner.optimizations; import com.facebook.presto.Session; +import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.spi.plan.AggregationNode; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.assertions.BasePlanTest; +import com.facebook.presto.sql.planner.assertions.ExpectedValueProvider; import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; import com.facebook.presto.sql.planner.plan.ExchangeNode; import com.facebook.presto.sql.planner.plan.GroupIdNode; +import com.facebook.presto.sql.tree.FunctionCall; +import com.facebook.presto.testing.TestingSession; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; +import java.util.Optional; import java.util.function.BiConsumer; import static com.facebook.presto.SystemSessionProperties.AGGREGATION_PARTITIONING_MERGING_STRATEGY; import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY; +import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anySymbol; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.semiJoin; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; +import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; @@ -180,4 +191,241 @@ public void testMergePartitionWithAggregation() anyTree(node(AggregationNode.class, anyTree(tableScan("orders")))))))))); } + + @Test + public void testAggregateIsExactlyPartitioned() + { + assertExactDistributedPlan( + "SELECT\n" + + " AVG(1)\n" + + "FROM (\n" + + " SELECT\n" + + " orderkey,\n" + + " orderstatus,\n" + + " COUNT(*)\n" + + " FROM orders\n" + + " WHERE\n" + + " orderdate > CAST('2042-01-01' AS DATE)\n" + + " GROUP BY\n" + + " orderkey,\n" + + " orderstatus\n" + + ")\n" + + "GROUP BY\n" + + " orderkey", + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "ordertatus", "orderstatus", + "orderkey", "orderkey", + "orderdate", "orderdate")))))))); + } + + @Test + public void testWindowIsExactlyPartitioned() + { + assertExactDistributedPlan( + "SELECT\n" + + " AVG(otherwindow) OVER (\n" + + " PARTITION BY\n" + + " orderkey\n" + + " )\n" + + "FROM (\n" + + " SELECT\n" + + " orderkey,\n" + + " orderstatus,\n" + + " COUNT(*) OVER (\n" + + " PARTITION BY\n" + + " orderkey,\n" + + " orderstatus\n" + + " ) AS otherwindow\n" + + " FROM orders\n" + + " WHERE\n" + + " orderdate > CAST('2042-01-01' AS DATE)\n" + + ")", + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "orderkey", "orderkey", + "orderdate", "orderdate")))))))); + } + + @Test + public void testRowNumberIsExactlyPartitioned() + { + assertExactDistributedPlan( + "SELECT\n" + + " *\n" + + "FROM (\n" + + " SELECT\n" + + " a,\n" + + " ROW_NUMBER() OVER (\n" + + " PARTITION BY\n" + + " a\n" + + " ) rn\n" + + " FROM (\n" + + " VALUES\n" + + " (1)\n" + + " ) t (a)\n" + + ") t", + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + values("a"))))); + } + + @Test + public void testTopNRowNumberIsExactlyPartitioned() + { + assertExactDistributedPlan( + "SELECT\n" + + " a,\n" + + " ROW_NUMBER() OVER (\n" + + " PARTITION BY\n" + + " a\n" + + " ORDER BY\n" + + " a\n" + + " ) rn\n" + + "FROM (\n" + + " SELECT\n" + + " a,\n" + + " b,\n" + + " COUNT(*)\n" + + " FROM (\n" + + " VALUES\n" + + " (1, 2)\n" + + " ) t (a, b)\n" + + " GROUP BY\n" + + " a,\n" + + " b\n" + + ")\n" + + "LIMIT\n" + + " 2", + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + values("a", "b"))))); + } + + @Test + public void testJoinExactlyPartitioned() + { + ExpectedValueProvider arbitrary = PlanMatchPattern.functionCall("arbitrary", false, ImmutableList.of(anySymbol())); + assertExactDistributedPlan( + "SELECT\n" + + " orders.orderkey,\n" + + " orders.orderstatus\n" + + "FROM (\n" + + " SELECT\n" + + " orderkey,\n" + + " ARBITRARY(orderstatus) AS orderstatus,\n" + + " COUNT(*)\n" + + " FROM orders\n" + + " GROUP BY\n" + + " orderkey\n" + + ") t,\n" + + "orders\n" + + "WHERE\n" + + " orders.orderkey = t.orderkey\n" + + " AND orders.orderstatus = t.orderstatus", + anyTree( + join(INNER, ImmutableList.of( + equiJoinClause("ORDERKEY_LEFT", "ORDERKEY_RIGHT"), + equiJoinClause("orderstatus", "ORDERSTATUS_RIGHT")), + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + aggregation( + singleGroupingSet("ORDERKEY_LEFT"), + ImmutableMap.of(Optional.of("orderstatus"), arbitrary), + ImmutableList.of("ORDERKEY_LEFT"), + ImmutableMap.of(), + Optional.empty(), + SINGLE, + tableScan("orders", ImmutableMap.of( + "ORDERKEY_LEFT", "orderkey", + "ORDERSTATUS_LEFT", "orderstatus"))))), + exchange(LOCAL, REPARTITION, + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "ORDERKEY_RIGHT", "orderkey", + "ORDERSTATUS_RIGHT", "orderstatus")))))))); + } + + @Test + public void testSemiJoinExactlyPartitioned() + { + assertExactDistributedPlan( + "SELECT\n" + + " orderkey\n" + + "FROM orders\n" + + "WHERE\n" + + " orderkey IN (\n" + + " SELECT\n" + + " orderkey\n" + + " FROM orders\n" + + " WHERE\n" + + " orderkey IS NULL\n" + + " AND orderstatus IS NULL\n" + + " )", + anyTree( + semiJoin("ORDERKEY_OK", "VALUE_ORDERKEY", "S", + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "ORDERKEY_OK", "orderkey")))), + anyTree( + + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + values("VALUE_ORDERKEY"))))))); + } + + @Test + public void testMarkDistinctIsExactlyPartitioned() + { + assertExactDistributedPlan( + " SELECT\n" + + " orderkey,\n" + + " orderstatus,\n" + + " COUNT(DISTINCT orderdate),\n" + + " COUNT(DISTINCT clerk)\n" + + " FROM orders\n" + + " WHERE\n" + + " orderdate > CAST('2042-01-01' AS DATE)\n" + + " GROUP BY\n" + + " orderkey,\n" + + " orderstatus\n", + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + exchange(REMOTE_STREAMING, REPARTITION, + anyTree( + tableScan("orders", ImmutableMap.of( + "orderstatus", "orderstatus", + "orderkey", "orderkey", + "clerk", "clerk", + "orderdate", "orderdate")))))))))); + } + + void assertExactDistributedPlan(String sql, PlanMatchPattern pattern) + { + assertDistributedPlan( + sql, + TestingSession.testSessionBuilder() + .setCatalog("local") + .setSchema("tiny") + .setSystemProperty( + SystemSessionProperties.PARTITIONING_PRECISION_STRATEGY, + FeaturesConfig.PartitioningPrecisionStrategy.PREFER_EXACT_PARTITIONING.toString()) + .build(), + pattern); + } } From 01c486930af593f7389187200adfa95170c40d0a Mon Sep 17 00:00:00 2001 From: Ariel Weisberg Date: Fri, 20 Dec 2019 15:40:03 -0500 Subject: [PATCH 2/3] Make PreferredProperties.mergeWithParent methods private --- .../optimizations/PreferredProperties.java | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java index 03483a8ab7b19..bc05fa1780587 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java @@ -249,20 +249,6 @@ public Optional getPartitioningProperties() return partitioningProperties; } - public Global mergeWithParent(Global parent, boolean mergePartitionPreference) - { - if (distributed != parent.distributed) { - return this; - } - if (!partitioningProperties.isPresent()) { - return parent; - } - if (!parent.partitioningProperties.isPresent() || !mergePartitionPreference) { - return this; - } - return new Global(distributed, Optional.of(partitioningProperties.get().mergeWithParent(parent.partitioningProperties.get()))); - } - public Global translate(Function> translator) { if (!isDistributed()) { @@ -299,6 +285,20 @@ public String toString() .add("partitioningProperties", partitioningProperties) .toString(); } + + private Global mergeWithParent(Global parent, boolean mergePartitionPreference) + { + if (distributed != parent.distributed) { + return this; + } + if (!partitioningProperties.isPresent()) { + return parent; + } + if (!parent.partitioningProperties.isPresent() || !mergePartitionPreference) { + return this; + } + return new Global(distributed, Optional.of(partitioningProperties.get().mergeWithParent(parent.partitioningProperties.get()))); + } } @Immutable @@ -352,29 +352,6 @@ public boolean isNullsAndAnyReplicated() return nullsAndAnyReplicated; } - public PartitioningProperties mergeWithParent(PartitioningProperties parent) - { - // Non-negotiable if we require a specific partitioning - if (partitioning.isPresent()) { - return this; - } - - // Partitioning with different replication cannot be compared - if (nullsAndAnyReplicated != parent.nullsAndAnyReplicated) { - return this; - } - - if (parent.partitioning.isPresent()) { - // If the parent has a partitioning preference, propagate parent only if the parent's partitioning columns satisfies our preference. - // Otherwise, ignore the parent since the parent will have to repartition anyways. - return partitioningColumns.containsAll(parent.partitioningColumns) ? parent : this; - } - - // Otherwise partition on any common columns if available - Set common = Sets.intersection(partitioningColumns, parent.partitioningColumns); - return common.isEmpty() ? this : partitioned(common).withNullsAndAnyReplicated(nullsAndAnyReplicated); - } - public Optional translateVariable(Function> translator) { Set newPartitioningColumns = partitioningColumns.stream() @@ -430,5 +407,28 @@ public String toString() .add("nullsAndAnyReplicated", nullsAndAnyReplicated) .toString(); } + + private PartitioningProperties mergeWithParent(PartitioningProperties parent) + { + // Non-negotiable if we require a specific partitioning + if (partitioning.isPresent()) { + return this; + } + + // Partitioning with different replication cannot be compared + if (nullsAndAnyReplicated != parent.nullsAndAnyReplicated) { + return this; + } + + if (parent.partitioning.isPresent()) { + // If the parent has a partitioning preference, propagate parent only if the parent's partitioning columns satisfies our preference. + // Otherwise, ignore the parent since the parent will have to repartition anyways. + return partitioningColumns.containsAll(parent.partitioningColumns) ? parent : this; + } + + // Otherwise partition on any common columns if available + Set common = Sets.intersection(partitioningColumns, parent.partitioningColumns); + return common.isEmpty() ? this : partitioned(common).withNullsAndAnyReplicated(nullsAndAnyReplicated); + } } } From 67449a1a77a084f816ed21bfd7356f0e7851365a Mon Sep 17 00:00:00 2001 From: Ariel Weisberg Date: Mon, 23 Dec 2019 17:20:03 -0500 Subject: [PATCH 3/3] Add comment to ActualProperties.Global.nullsAndAnyReplicated --- .../presto/sql/planner/optimizations/ActualProperties.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java index 8166018c33790..e99882c87b11f 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/ActualProperties.java @@ -378,6 +378,9 @@ public static final class Global // will be executed on multiple servers, but only one server will get all the data. // Description of whether rows with nulls in partitioning columns or some arbitrary rows have been replicated to all *nodes* + // When doing an IN query NULL in empty set is false, NULL in non-empty set is NULL. Say non-NULL element A (number 1) in + // a set that is missing A ( say 2, 3) is false, but A in (2, 3, NULL) is NULL. + // IN is equivalent to "a = b OR a = c OR a = d...). private final boolean nullsAndAnyReplicated; private Global(Optional nodePartitioning, Optional streamPartitioning, boolean nullsAndAnyReplicated)