diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index efbd8a51ccacb..6100e40054a71 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -20,6 +20,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.session.PropertyMetadata; import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType; import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy; import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy; @@ -135,6 +136,7 @@ public final class SystemSessionProperties public static final String OPTIMIZE_FULL_OUTER_JOIN_WITH_COALESCE = "optimize_full_outer_join_with_coalesce"; public static final String INDEX_LOADER_TIMEOUT = "index_loader_timeout"; public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning"; + public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy"; private final List> sessionProperties; @@ -664,7 +666,19 @@ public SystemSessionProperties( OPTIMIZED_REPARTITIONING_ENABLED, "Experimental: Use optimized repartitioning", featuresConfig.isOptimizedRepartitioningEnabled(), - false)); + false), + new PropertyMetadata<>( + AGGREGATION_PARTITIONING_MERGING_STRATEGY, + format("Strategy to merge partition preference in aggregation node. Options are %s", + Stream.of(AggregationPartitioningMergingStrategy.values()) + .map(AggregationPartitioningMergingStrategy::name) + .collect(joining(","))), + VARCHAR, + AggregationPartitioningMergingStrategy.class, + featuresConfig.getAggregationPartitioningMergingStrategy(), + false, + value -> AggregationPartitioningMergingStrategy.valueOf(((String) value).toUpperCase()), + AggregationPartitioningMergingStrategy::name)); } public List> getSessionProperties() @@ -1134,4 +1148,9 @@ public static boolean isOptimizedRepartitioningEnabled(Session session) { return session.getSystemProperty(OPTIMIZED_REPARTITIONING_ENABLED, Boolean.class); } + + public static AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy(Session session) + { + return session.getSystemProperty(AGGREGATION_PARTITIONING_MERGING_STRATEGY, AggregationPartitioningMergingStrategy.class); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 8f96c291c7aa5..aef43cdf7595d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -36,6 +36,7 @@ import java.nio.file.Paths; import java.util.List; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS; import static com.facebook.presto.sql.analyzer.RegexLibrary.JONI; @@ -132,6 +133,7 @@ public class FeaturesConfig private int filterAndProjectMinOutputPageRowCount = 256; private int maxGroupingSets = 2048; private boolean legacyUnnestArrayRows; + private AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = LEGACY; private boolean jsonSerdeCodeGenerationEnabled; private int maxConcurrentMaterializations = 3; @@ -173,6 +175,23 @@ public enum PartialMergePushdownStrategy PUSH_THROUGH_LOW_MEMORY_OPERATORS } + public enum AggregationPartitioningMergingStrategy + { + LEGACY, // merge partition preference with parent but apply current partition preference + TOP_DOWN, // merge partition preference with parent and apply the merged partition preference + BOTTOM_UP; // don't merge partition preference and apply current partition preference only + + public boolean isMergingWithParent() + { + return this == LEGACY || this == TOP_DOWN; + } + + public boolean isAdoptingMergedPreference() + { + return this == TOP_DOWN; + } + } + public double getCpuCostWeight() { return cpuCostWeight; @@ -487,6 +506,18 @@ public FeaturesConfig setMaxReorderedJoins(int maxReorderedJoins) return this; } + public AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy() + { + return aggregationPartitioningMergingStrategy; + } + + @Config("optimizer.aggregation-partition-merging") + public FeaturesConfig setAggregationPartitioningMergingStrategy(AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy) + { + this.aggregationPartitioningMergingStrategy = requireNonNull(aggregationPartitioningMergingStrategy, "aggregationPartitioningMergingStrategy is null"); + return this; + } + public boolean isRedistributeWrites() { return redistributeWrites; diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java index 8445d65ab6598..5ab78e8bef8d1 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/Partitioning.java @@ -59,7 +59,7 @@ private Partitioning(PartitioningHandle handle, List arguments) this.arguments = ImmutableList.copyOf(requireNonNull(arguments, "arguments is null")); } - public static Partitioning create(PartitioningHandle handle, List columns) + public static Partitioning create(PartitioningHandle handle, Collection columns) { return new Partitioning(handle, columns.stream() .map(RowExpression.class::cast) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java index 7019ff58dfa92..a3ce8d99f27c9 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddExchanges.java @@ -36,6 +36,7 @@ import com.facebook.presto.spi.relation.RowExpression; import com.facebook.presto.spi.relation.VariableReferenceExpression; import com.facebook.presto.spi.type.Type; +import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy; import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.planner.ExpressionDomainTranslator; @@ -84,6 +85,7 @@ import com.google.common.collect.SetMultimap; import java.util.ArrayList; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -93,6 +95,7 @@ import java.util.Set; import java.util.function.Function; +import static com.facebook.presto.SystemSessionProperties.getAggregationPartitioningMergingStrategy; import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy; import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount; import static com.facebook.presto.SystemSessionProperties.getPartialMergePushdownStrategy; @@ -240,11 +243,20 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper Set partitioningRequirement = ImmutableSet.copyOf(node.getGroupingKeys()); boolean preferSingleNode = hasSingleNodeExecutionPreference(node, metadata.getFunctionManager()); + boolean hasMixedGroupingSets = node.hasEmptyGroupingSet() && node.hasNonEmptyGroupingSet(); PreferredProperties preferredProperties = preferSingleNode ? PreferredProperties.undistributed() : PreferredProperties.any(); - if (!node.getGroupingKeys().isEmpty()) { + // If aggregation has a mixed of non-global and global grouping set, an repartition exchange is any way needed to eliminate duplicate default outputs + // from partial aggregations (enforced in `ValidateAggregationWithDefaultValues.java`). Therefore, we don't have preference on what the child will return. + if (!node.getGroupingKeys().isEmpty() && !hasMixedGroupingSets) { + FeaturesConfig.AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = getAggregationPartitioningMergingStrategy(session); preferredProperties = PreferredProperties.partitionedWithLocal(partitioningRequirement, grouped(node.getGroupingKeys())) - .mergeWithParent(parentPreferredProperties); + .mergeWithParent(parentPreferredProperties, aggregationPartitioningMergingStrategy.isMergingWithParent()); + + if (aggregationPartitioningMergingStrategy.isAdoptingMergedPreference()) { + checkState(preferredProperties.getGlobalProperties().isPresent() && preferredProperties.getGlobalProperties().get().getPartitioningProperties().isPresent()); + partitioningRequirement = ImmutableSet.copyOf(preferredProperties.getGlobalProperties().get().getPartitioningProperties().get().getPartitioningColumns()); + } } PlanWithProperties child = planChild(node, preferredProperties); @@ -259,13 +271,14 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child.getNode()), child.getProperties()); } - else if (!child.getProperties().isStreamPartitionedOn(partitioningRequirement) && !child.getProperties().isNodePartitionedOn(partitioningRequirement)) { + else if (hasMixedGroupingSets + || !child.getProperties().isStreamPartitionedOn(partitioningRequirement) && !child.getProperties().isNodePartitionedOn(partitioningRequirement)) { child = withDerivedProperties( partitionedExchange( idAllocator.getNextId(), selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false), child.getNode(), - createPartitioning(node.getGroupingKeys()), + createPartitioning(partitioningRequirement), node.getHashVariable()), child.getProperties()); } @@ -299,7 +312,7 @@ private Function partitionedExchange( idAllocator.getNextId(), selectExchangeScopeForPartitionedRemoteExchange(partial, false), @@ -988,7 +1001,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, PreferredProperties List> desiredLocalProperties = preferredProperties.getLocalProperties().isEmpty() ? grouped(joinColumns) : ImmutableList.of(); PlanWithProperties probeSource = node.getProbeSource().accept(this, PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(joinColumns), desiredLocalProperties) - .mergeWithParent(preferredProperties)); + .mergeWithParent(preferredProperties, true)); ActualProperties probeProperties = probeSource.getProperties(); PlanWithProperties indexSource = node.getIndexSource().accept(this, PreferredProperties.any()); @@ -1329,7 +1342,7 @@ private ActualProperties derivePropertiesRecursively(PlanNode result) return PropertyDerivations.derivePropertiesRecursively(result, metadata, session, types, parser); } - private Partitioning createPartitioning(List partitioningColumns) + private Partitioning createPartitioning(Collection partitioningColumns) { // TODO: Use SystemTablesMetadata instead of introducing a special case if (GlobalSystemConnector.NAME.equals(partitioningProviderCatalog)) { diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java index abfea1e5f83a1..03483a8ab7b19 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/PreferredProperties.java @@ -130,7 +130,7 @@ public List> getLocalProperties() return localProperties; } - public PreferredProperties mergeWithParent(PreferredProperties parent) + public PreferredProperties mergeWithParent(PreferredProperties parent, boolean mergePartitionPreference) { List> newLocal = ImmutableList.>builder() .addAll(localProperties) @@ -143,7 +143,7 @@ public PreferredProperties mergeWithParent(PreferredProperties parent) if (globalProperties.isPresent()) { Global currentGlobal = globalProperties.get(); Global newGlobal = parent.getGlobalProperties() - .map(currentGlobal::mergeWithParent) + .map(global -> currentGlobal.mergeWithParent(global, mergePartitionPreference)) .orElse(currentGlobal); builder.global(newGlobal); } @@ -249,7 +249,7 @@ public Optional getPartitioningProperties() return partitioningProperties; } - public Global mergeWithParent(Global parent) + public Global mergeWithParent(Global parent, boolean mergePartitionPreference) { if (distributed != parent.distributed) { return this; @@ -257,7 +257,7 @@ public Global mergeWithParent(Global parent) if (!partitioningProperties.isPresent()) { return parent; } - if (!parent.partitioningProperties.isPresent()) { + if (!parent.partitioningProperties.isPresent() || !mergePartitionPreference) { return this; } return new Global(distributed, Optional.of(partitioningProperties.get().mergeWithParent(parent.partitioningProperties.get()))); diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 892827345da1f..e8c3d2bec7b93 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -27,6 +27,8 @@ import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY; +import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.TOP_DOWN; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED; import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS; @@ -73,6 +75,7 @@ public void testDefaults() .setOptimizeHashGeneration(true) .setPushTableWriteThroughUnion(true) .setDictionaryAggregation(false) + .setAggregationPartitioningMergingStrategy(LEGACY) .setLegacyArrayAgg(false) .setGroupByUsesEqualTo(false) .setLegacyMapSubscript(false) @@ -167,6 +170,7 @@ public void testExplicitPropertyMappings() .put("optimizer.push-table-write-through-union", "false") .put("optimizer.dictionary-aggregation", "true") .put("optimizer.push-aggregation-through-join", "false") + .put("optimizer.aggregation-partition-merging", "top_down") .put("regex-library", "RE2J") .put("re2j.dfa-states-limit", "42") .put("re2j.dfa-retries", "42") @@ -236,6 +240,7 @@ public void testExplicitPropertyMappings() .setOptimizeMixedDistinctAggregations(true) .setPushTableWriteThroughUnion(false) .setDictionaryAggregation(true) + .setAggregationPartitioningMergingStrategy(TOP_DOWN) .setPushAggregationThroughJoin(false) .setLegacyArrayAgg(true) .setGroupByUsesEqualTo(true) diff --git a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java index 684bcdcf69ce9..e6bea2d209d8b 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/planner/optimizations/TestAddExchangesPlans.java @@ -14,21 +14,34 @@ package com.facebook.presto.sql.planner.optimizations; +import com.facebook.presto.Session; +import com.facebook.presto.spi.plan.AggregationNode; +import com.facebook.presto.sql.planner.Plan; import com.facebook.presto.sql.planner.assertions.BasePlanTest; +import com.facebook.presto.sql.planner.assertions.PlanMatchPattern; +import com.facebook.presto.sql.planner.plan.ExchangeNode; +import com.facebook.presto.sql.planner.plan.GroupIdNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; +import java.util.function.BiConsumer; + +import static com.facebook.presto.SystemSessionProperties.AGGREGATION_PARTITIONING_MERGING_STRATEGY; +import static com.facebook.presto.SystemSessionProperties.TASK_CONCURRENCY; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.exchange; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join; +import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.node; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.tableScan; import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values; +import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING; import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION; import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER; +import static org.testng.Assert.assertEquals; /** * These are plan tests similar to what we have for other optimizers (e.g. {@link com.facebook.presto.sql.planner.TestPredicatePushdown}) @@ -101,4 +114,70 @@ public void testRepartitionForUnionAllBeforeHashJoin() anyTree( tableScan("region", ImmutableMap.of("regionkey", "regionkey")))))))); } + + private void assertPlanWithMergePartitionStrategy( + String sql, + String partitionMergingStrategy, + int remoteRepartitionExchangeCount, + PlanMatchPattern pattern) + { + Session session = Session.builder(this.getQueryRunner().getDefaultSession()) + .setSystemProperty(AGGREGATION_PARTITIONING_MERGING_STRATEGY, partitionMergingStrategy) + .setSystemProperty(TASK_CONCURRENCY, "2") + .build(); + BiConsumer validateMultipleRemoteRepartitionExchange = (plan, count) -> assertEquals( + searchFrom(plan.getRoot()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope() == REMOTE_STREAMING && ((ExchangeNode) node).getType() == REPARTITION).count(), + count.intValue()); + + assertPlanWithSession(sql, session, false, pattern, plan -> validateMultipleRemoteRepartitionExchange.accept(plan, remoteRepartitionExchangeCount)); + } + + @Test + public void testMergePartitionWithGroupingSets() + { + String sql = "SELECT orderkey, count(distinct(custkey)) FROM orders GROUP BY GROUPING SETS((orderkey), ())"; + + assertPlanWithMergePartitionStrategy(sql, "bottom_up", 2, + anyTree(node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + node(AggregationNode.class, + anyTree(node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + node(AggregationNode.class, + anyTree(node(GroupIdNode.class, + tableScan("orders")))))))))))))); + assertPlanWithMergePartitionStrategy(sql, "top_down", 2, + anyTree(node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + node(AggregationNode.class, + anyTree(node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + node(AggregationNode.class, + anyTree(node(GroupIdNode.class, + tableScan("orders")))))))))))))); + } + + @Test + public void testMergePartitionWithAggregation() + { + String sql = "SELECT count(orderdate), custkey FROM (SELECT orderdate, custkey FROM orders GROUP BY orderdate, custkey) GROUP BY custkey"; + + // disable merging partition preference + assertPlanWithMergePartitionStrategy(sql, "bottom_up", 2, + anyTree(node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + anyTree(node(AggregationNode.class, + node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + node(AggregationNode.class, + anyTree(tableScan("orders"))))))))))))); + + // enable merging partition preference + assertPlanWithMergePartitionStrategy(sql, "top_down", 1, + anyTree(node(AggregationNode.class, + node(AggregationNode.class, + anyTree(exchange(REMOTE_STREAMING, REPARTITION, + anyTree(node(AggregationNode.class, + anyTree(tableScan("orders")))))))))); + } }