Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public final class SystemSessionProperties
public static final String LEGACY_ROW_FIELD_ORDINAL_ACCESS = "legacy_row_field_ordinal_access";
public static final String ITERATIVE_OPTIMIZER = "iterative_optimizer_enabled";
public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout";
public static final String ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID = "enable_forced_exchange_below_group_id";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add_exchange_before_group_id

public static final String EXCHANGE_COMPRESSION = "exchange_compression";
public static final String LEGACY_TIMESTAMP = "legacy_timestamp";
public static final String ENABLE_INTERMEDIATE_AGGREGATIONS = "enable_intermediate_aggregations";
Expand Down Expand Up @@ -427,6 +428,11 @@ public SystemSessionProperties(
false,
value -> Duration.valueOf((String) value),
Duration::toString),
booleanProperty(
ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID,
"Enable a stats-based rule adding exchanges below GroupId",
featuresConfig.isEnableForcedExchangeBelowGroupId(),
true),
booleanProperty(
EXCHANGE_COMPRESSION,
"Enable compression in exchanges",
Expand Down Expand Up @@ -765,6 +771,11 @@ public static Duration getOptimizerTimeout(Session session)
return session.getSystemProperty(ITERATIVE_OPTIMIZER_TIMEOUT, Duration.class);
}

public static boolean isEnableForcedExchangeBelowGroupId(Session session)
{
return session.getSystemProperty(ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID, Boolean.class);
}

public static boolean isExchangeCompressionEnabled(Session session)
{
return session.getSystemProperty(EXCHANGE_COMPRESSION, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public CostCalculatorUsingExchanges(NodeSchedulerConfig nodeSchedulerConfig, Int
this(currentNumberOfWorkerNodes(nodeSchedulerConfig.isIncludeCoordinator(), nodeManager));
}

static IntSupplier currentNumberOfWorkerNodes(boolean includeCoordinator, InternalNodeManager nodeManager)
public static IntSupplier currentNumberOfWorkerNodes(boolean includeCoordinator, InternalNodeManager nodeManager)
{
requireNonNull(nodeManager, "nodeManager is null");
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class FeaturesConfig
private double spillMaxUsedSpaceThreshold = 0.9;
private boolean iterativeOptimizerEnabled = true;
private boolean enableStatsCalculator = true;
private boolean enableForcedExchangeBelowGroupId = true;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have it disabled by default?

private boolean pushAggregationThroughJoin = true;
private double memoryRevokingTarget = 0.5;
private double memoryRevokingThreshold = 0.9;
Expand Down Expand Up @@ -593,6 +594,18 @@ public FeaturesConfig setEnableStatsCalculator(boolean enableStatsCalculator)
return this;
}

public boolean isEnableForcedExchangeBelowGroupId()
{
return enableForcedExchangeBelowGroupId;
}

@Config("enable-forced-exchange-below-group-id")
public FeaturesConfig setEnableForcedExchangeBelowGroupId(boolean enableForcedExchangeBelowGroupId)
{
this.enableForcedExchangeBelowGroupId = enableForcedExchangeBelowGroupId;
return this;
}

public DataSize getAggregationOperatorUnspillMemoryLimit()
{
return aggregationOperatorUnspillMemoryLimit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
import com.facebook.presto.cost.CostCalculator.EstimatedExchanges;
import com.facebook.presto.cost.CostComparator;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.execution.TaskManagerConfig;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
import com.facebook.presto.metadata.InternalNodeManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.iterative.IterativeOptimizer;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.iterative.rule.AddExchangesBelowPartialAggregationOverGroupIdRuleSet;
import com.facebook.presto.sql.planner.iterative.rule.AddIntermediateAggregations;
import com.facebook.presto.sql.planner.iterative.rule.CanonicalizeExpressions;
import com.facebook.presto.sql.planner.iterative.rule.CreatePartialTopN;
Expand Down Expand Up @@ -127,6 +131,9 @@

import java.util.List;
import java.util.Set;
import java.util.function.IntSupplier;

import static com.facebook.presto.cost.CostCalculatorUsingExchanges.currentNumberOfWorkerNodes;

public class PlanOptimizers
{
Expand All @@ -140,6 +147,9 @@ public PlanOptimizers(
Metadata metadata,
SqlParser sqlParser,
FeaturesConfig featuresConfig,
NodeSchedulerConfig nodeSchedulerConfig,
InternalNodeManager nodeManager,
TaskManagerConfig taskManagerConfig,
MBeanExporter exporter,
SplitManager splitManager,
PageSourceManager pageSourceManager,
Expand All @@ -151,6 +161,8 @@ public PlanOptimizers(
this(metadata,
sqlParser,
featuresConfig,
currentNumberOfWorkerNodes(nodeSchedulerConfig.isIncludeCoordinator(), nodeManager),
taskManagerConfig,
false,
exporter,
splitManager,
Expand Down Expand Up @@ -179,6 +191,8 @@ public PlanOptimizers(
Metadata metadata,
SqlParser sqlParser,
FeaturesConfig featuresConfig,
IntSupplier numberOfNodes,
TaskManagerConfig taskManagerConfig,
boolean forceSingleNode,
MBeanExporter exporter,
SplitManager splitManager,
Expand Down Expand Up @@ -484,6 +498,11 @@ public PlanOptimizers(
new PushPartialAggregationThroughJoin(),
new PushPartialAggregationThroughExchange(metadata.getFunctionRegistry()),
new PruneJoinColumns())));
builder.add(new IterativeOptimizer(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule uses cost model. Cost model loses estimates when there are any partial aggregation in between.

ruleStats,
statsCalculator,
costCalculator,
new AddExchangesBelowPartialAggregationOverGroupIdRuleSet(metadata, sqlParser, numberOfNodes, taskManagerConfig).rules()));
builder.add(new IterativeOptimizer(
ruleStats,
statsCalculator,
Expand Down
Loading