Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType;
import com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
Expand Down Expand Up @@ -135,6 +136,7 @@ public final class SystemSessionProperties
public static final String OPTIMIZE_FULL_OUTER_JOIN_WITH_COALESCE = "optimize_full_outer_join_with_coalesce";
public static final String INDEX_LOADER_TIMEOUT = "index_loader_timeout";
public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning";
public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -664,7 +666,19 @@ public SystemSessionProperties(
OPTIMIZED_REPARTITIONING_ENABLED,
"Experimental: Use optimized repartitioning",
featuresConfig.isOptimizedRepartitioningEnabled(),
false));
false),
new PropertyMetadata<>(
AGGREGATION_PARTITIONING_MERGING_STRATEGY,
format("Strategy to merge partition preference in aggregation node. Options are %s",
Stream.of(AggregationPartitioningMergingStrategy.values())
.map(AggregationPartitioningMergingStrategy::name)
.collect(joining(","))),
VARCHAR,
AggregationPartitioningMergingStrategy.class,
featuresConfig.getAggregationPartitioningMergingStrategy(),
false,
value -> AggregationPartitioningMergingStrategy.valueOf(((String) value).toUpperCase()),
AggregationPartitioningMergingStrategy::name));
}

public List<PropertyMetadata<?>> getSessionProperties()
Expand Down Expand Up @@ -1134,4 +1148,9 @@ public static boolean isOptimizedRepartitioningEnabled(Session session)
{
return session.getSystemProperty(OPTIMIZED_REPARTITIONING_ENABLED, Boolean.class);
}

public static AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy(Session session)
{
return session.getSystemProperty(AGGREGATION_PARTITIONING_MERGING_STRATEGY, AggregationPartitioningMergingStrategy.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.nio.file.Paths;
import java.util.List;

import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
import static com.facebook.presto.sql.analyzer.RegexLibrary.JONI;
Expand Down Expand Up @@ -132,6 +133,7 @@ public class FeaturesConfig
private int filterAndProjectMinOutputPageRowCount = 256;
private int maxGroupingSets = 2048;
private boolean legacyUnnestArrayRows;
private AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = LEGACY;

private boolean jsonSerdeCodeGenerationEnabled;
private int maxConcurrentMaterializations = 3;
Expand Down Expand Up @@ -173,6 +175,23 @@ public enum PartialMergePushdownStrategy
PUSH_THROUGH_LOW_MEMORY_OPERATORS
}

public enum AggregationPartitioningMergingStrategy
{
LEGACY, // merge partition preference with parent but apply current partition preference
TOP_DOWN, // merge partition preference with parent and apply the merged partition preference
BOTTOM_UP; // don't merge partition preference and apply current partition preference only

public boolean isMergingWithParent()
{
return this == LEGACY || this == TOP_DOWN;
}

public boolean isAdoptingMergedPreference()
{
return this == TOP_DOWN;
}
}

public double getCpuCostWeight()
{
return cpuCostWeight;
Expand Down Expand Up @@ -487,6 +506,18 @@ public FeaturesConfig setMaxReorderedJoins(int maxReorderedJoins)
return this;
}

public AggregationPartitioningMergingStrategy getAggregationPartitioningMergingStrategy()
{
return aggregationPartitioningMergingStrategy;
}

@Config("optimizer.aggregation-partition-merging")
public FeaturesConfig setAggregationPartitioningMergingStrategy(AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy)
{
this.aggregationPartitioningMergingStrategy = requireNonNull(aggregationPartitioningMergingStrategy, "aggregationPartitioningMergingStrategy is null");
return this;
}

public boolean isRedistributeWrites()
{
return redistributeWrites;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private Partitioning(PartitioningHandle handle, List<RowExpression> arguments)
this.arguments = ImmutableList.copyOf(requireNonNull(arguments, "arguments is null"));
}

public static <T extends RowExpression> Partitioning create(PartitioningHandle handle, List<T> columns)
public static <T extends RowExpression> Partitioning create(PartitioningHandle handle, Collection<T> columns)
{
return new Partitioning(handle, columns.stream()
.map(RowExpression.class::cast)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.spi.relation.RowExpression;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.PartialMergePushdownStrategy;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.ExpressionDomainTranslator;
Expand Down Expand Up @@ -84,6 +85,7 @@
import com.google.common.collect.SetMultimap;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -93,6 +95,7 @@
import java.util.Set;
import java.util.function.Function;

import static com.facebook.presto.SystemSessionProperties.getAggregationPartitioningMergingStrategy;
import static com.facebook.presto.SystemSessionProperties.getExchangeMaterializationStrategy;
import static com.facebook.presto.SystemSessionProperties.getHashPartitionCount;
import static com.facebook.presto.SystemSessionProperties.getPartialMergePushdownStrategy;
Expand Down Expand Up @@ -240,11 +243,20 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper
Set<VariableReferenceExpression> partitioningRequirement = ImmutableSet.copyOf(node.getGroupingKeys());

boolean preferSingleNode = hasSingleNodeExecutionPreference(node, metadata.getFunctionManager());
boolean hasMixedGroupingSets = node.hasEmptyGroupingSet() && node.hasNonEmptyGroupingSet();
PreferredProperties preferredProperties = preferSingleNode ? PreferredProperties.undistributed() : PreferredProperties.any();

if (!node.getGroupingKeys().isEmpty()) {
// If aggregation has a mixed of non-global and global grouping set, an repartition exchange is any way needed to eliminate duplicate default outputs
// from partial aggregations (enforced in `ValidateAggregationWithDefaultValues.java`). Therefore, we don't have preference on what the child will return.
if (!node.getGroupingKeys().isEmpty() && !hasMixedGroupingSets) {
FeaturesConfig.AggregationPartitioningMergingStrategy aggregationPartitioningMergingStrategy = getAggregationPartitioningMergingStrategy(session);
preferredProperties = PreferredProperties.partitionedWithLocal(partitioningRequirement, grouped(node.getGroupingKeys()))
.mergeWithParent(parentPreferredProperties);
.mergeWithParent(parentPreferredProperties, aggregationPartitioningMergingStrategy.isMergingWithParent());

if (aggregationPartitioningMergingStrategy.isAdoptingMergedPreference()) {
checkState(preferredProperties.getGlobalProperties().isPresent() && preferredProperties.getGlobalProperties().get().getPartitioningProperties().isPresent());
partitioningRequirement = ImmutableSet.copyOf(preferredProperties.getGlobalProperties().get().getPartitioningProperties().get().getPartitioningColumns());
}
}

PlanWithProperties child = planChild(node, preferredProperties);
Expand All @@ -259,13 +271,14 @@ public PlanWithProperties visitAggregation(AggregationNode node, PreferredProper
gatheringExchange(idAllocator.getNextId(), REMOTE_STREAMING, child.getNode()),
child.getProperties());
}
else if (!child.getProperties().isStreamPartitionedOn(partitioningRequirement) && !child.getProperties().isNodePartitionedOn(partitioningRequirement)) {
else if (hasMixedGroupingSets
|| !child.getProperties().isStreamPartitionedOn(partitioningRequirement) && !child.getProperties().isNodePartitionedOn(partitioningRequirement)) {
child = withDerivedProperties(
partitionedExchange(
idAllocator.getNextId(),
selectExchangeScopeForPartitionedRemoteExchange(child.getNode(), false),
child.getNode(),
createPartitioning(node.getGroupingKeys()),
createPartitioning(partitioningRequirement),
node.getHashVariable()),
child.getProperties());
}
Expand Down Expand Up @@ -299,7 +312,7 @@ private Function<VariableReferenceExpression, Optional<VariableReferenceExpressi
public PlanWithProperties visitMarkDistinct(MarkDistinctNode node, PreferredProperties preferredProperties)
{
PreferredProperties preferredChildProperties = PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getDistinctVariables()), grouped(node.getDistinctVariables()))
.mergeWithParent(preferredProperties);
.mergeWithParent(preferredProperties, true);
PlanWithProperties child = node.getSource().accept(this, preferredChildProperties);

if (child.getProperties().isSingleNode() ||
Expand Down Expand Up @@ -332,7 +345,7 @@ public PlanWithProperties visitWindow(WindowNode node, PreferredProperties prefe
PlanWithProperties child = planChild(
node,
PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getPartitionBy()), desiredProperties)
.mergeWithParent(preferredProperties));
.mergeWithParent(preferredProperties, true));

if (!child.getProperties().isStreamPartitionedOn(node.getPartitionBy()) &&
!child.getProperties().isNodePartitionedOn(node.getPartitionBy())) {
Expand Down Expand Up @@ -374,7 +387,7 @@ public PlanWithProperties visitRowNumber(RowNumberNode node, PreferredProperties
PlanWithProperties child = planChild(
node,
PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy()))
.mergeWithParent(preferredProperties));
.mergeWithParent(preferredProperties, true));

// TODO: add config option/session property to force parallel plan if child is unpartitioned and window has a PARTITION BY clause
if (!child.getProperties().isStreamPartitionedOn(node.getPartitionBy())
Expand Down Expand Up @@ -406,7 +419,7 @@ public PlanWithProperties visitTopNRowNumber(TopNRowNumberNode node, PreferredPr
}
else {
preferredChildProperties = PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(node.getPartitionBy()), grouped(node.getPartitionBy()))
.mergeWithParent(preferredProperties);
.mergeWithParent(preferredProperties, true);
addExchange = partial -> partitionedExchange(
idAllocator.getNextId(),
selectExchangeScopeForPartitionedRemoteExchange(partial, false),
Expand Down Expand Up @@ -988,7 +1001,7 @@ public PlanWithProperties visitIndexJoin(IndexJoinNode node, PreferredProperties
List<LocalProperty<VariableReferenceExpression>> desiredLocalProperties = preferredProperties.getLocalProperties().isEmpty() ? grouped(joinColumns) : ImmutableList.of();

PlanWithProperties probeSource = node.getProbeSource().accept(this, PreferredProperties.partitionedWithLocal(ImmutableSet.copyOf(joinColumns), desiredLocalProperties)
.mergeWithParent(preferredProperties));
.mergeWithParent(preferredProperties, true));
ActualProperties probeProperties = probeSource.getProperties();

PlanWithProperties indexSource = node.getIndexSource().accept(this, PreferredProperties.any());
Expand Down Expand Up @@ -1329,7 +1342,7 @@ private ActualProperties derivePropertiesRecursively(PlanNode result)
return PropertyDerivations.derivePropertiesRecursively(result, metadata, session, types, parser);
}

private Partitioning createPartitioning(List<VariableReferenceExpression> partitioningColumns)
private Partitioning createPartitioning(Collection<VariableReferenceExpression> partitioningColumns)
{
// TODO: Use SystemTablesMetadata instead of introducing a special case
if (GlobalSystemConnector.NAME.equals(partitioningProviderCatalog)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public List<LocalProperty<VariableReferenceExpression>> getLocalProperties()
return localProperties;
}

public PreferredProperties mergeWithParent(PreferredProperties parent)
public PreferredProperties mergeWithParent(PreferredProperties parent, boolean mergePartitionPreference)
{
List<LocalProperty<VariableReferenceExpression>> newLocal = ImmutableList.<LocalProperty<VariableReferenceExpression>>builder()
.addAll(localProperties)
Expand All @@ -143,7 +143,7 @@ public PreferredProperties mergeWithParent(PreferredProperties parent)
if (globalProperties.isPresent()) {
Global currentGlobal = globalProperties.get();
Global newGlobal = parent.getGlobalProperties()
.map(currentGlobal::mergeWithParent)
.map(global -> currentGlobal.mergeWithParent(global, mergePartitionPreference))
.orElse(currentGlobal);
builder.global(newGlobal);
}
Expand Down Expand Up @@ -249,15 +249,15 @@ public Optional<PartitioningProperties> getPartitioningProperties()
return partitioningProperties;
}

public Global mergeWithParent(Global parent)
public Global mergeWithParent(Global parent, boolean mergePartitionPreference)
{
if (distributed != parent.distributed) {
return this;
}
if (!partitioningProperties.isPresent()) {
return parent;
}
if (!parent.partitioningProperties.isPresent()) {
if (!parent.partitioningProperties.isPresent() || !mergePartitionPreference) {
return this;
}
return new Global(distributed, Optional.of(partitioningProperties.get().mergeWithParent(parent.partitioningProperties.get())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.LEGACY;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.AggregationPartitioningMergingStrategy.TOP_DOWN;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.BROADCAST;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
Expand Down Expand Up @@ -73,6 +75,7 @@ public void testDefaults()
.setOptimizeHashGeneration(true)
.setPushTableWriteThroughUnion(true)
.setDictionaryAggregation(false)
.setAggregationPartitioningMergingStrategy(LEGACY)
.setLegacyArrayAgg(false)
.setGroupByUsesEqualTo(false)
.setLegacyMapSubscript(false)
Expand Down Expand Up @@ -167,6 +170,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.push-table-write-through-union", "false")
.put("optimizer.dictionary-aggregation", "true")
.put("optimizer.push-aggregation-through-join", "false")
.put("optimizer.aggregation-partition-merging", "top_down")
.put("regex-library", "RE2J")
.put("re2j.dfa-states-limit", "42")
.put("re2j.dfa-retries", "42")
Expand Down Expand Up @@ -236,6 +240,7 @@ public void testExplicitPropertyMappings()
.setOptimizeMixedDistinctAggregations(true)
.setPushTableWriteThroughUnion(false)
.setDictionaryAggregation(true)
.setAggregationPartitioningMergingStrategy(TOP_DOWN)
.setPushAggregationThroughJoin(false)
.setLegacyArrayAgg(true)
.setGroupByUsesEqualTo(true)
Expand Down
Loading