Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public final class SystemSessionProperties
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED = "fault_tolerant_execution_event_driven_scheduler_enabled";
public static final String COMBINE_SIMILAR_SUB_PLANS = "combine_similar_sub_plans";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -865,7 +866,12 @@ public SystemSessionProperties(
FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED,
"Enable event driven scheduler for fault tolerant execution",
queryManagerConfig.isFaultTolerantExecutionEventDrivenSchedulerEnabled(),
true));
true),
booleanProperty(
COMBINE_SIMILAR_SUB_PLANS,
"Enables optimizer rules that combine similar sub-plans",
optimizerConfig.isCombineSimilarSubPlans(),
false));
}

@Override
Expand Down Expand Up @@ -1548,4 +1554,9 @@ public static boolean isFaultTolerantExecutionEventDriverSchedulerEnabled(Sessio
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED, Boolean.class);
}

public static boolean isCombineSimilarSubPlans(Session session)
{
return session.getSystemProperty(COMBINE_SIMILAR_SUB_PLANS, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class OptimizerConfig
private double adaptivePartialAggregationUniqueRowsRatioThreshold = 0.8;
private long joinPartitionedBuildMinRowCount = 1_000_000L;

private boolean combineSimilarSubPlans = true;

public enum JoinReorderingStrategy
{
NONE,
Expand Down Expand Up @@ -756,4 +758,17 @@ public OptimizerConfig setUseExactPartitioning(boolean useExactPartitioning)
this.useExactPartitioning = useExactPartitioning;
return this;
}

public boolean isCombineSimilarSubPlans()
{
return combineSimilarSubPlans;
}

@Config("optimizer.combine-similar-sub-plans")
@ConfigDescription("Enables optimizer rules that combine similar sub-plans")
public OptimizerConfig setCombineSimilarSubPlans(boolean combineSimilarSubPlans)
{
this.combineSimilarSubPlans = combineSimilarSubPlans;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@
import io.trino.sql.planner.iterative.rule.UnwrapRowSubscript;
import io.trino.sql.planner.iterative.rule.UnwrapSingleColumnRowInApply;
import io.trino.sql.planner.iterative.rule.UseNonPartitionedJoinLookupSource;
import io.trino.sql.planner.iterative.rule.fuse.FuseCrossJoinedGlobalAggregations;
import io.trino.sql.planner.optimizations.AddExchanges;
import io.trino.sql.planner.optimizations.AddLocalExchanges;
import io.trino.sql.planner.optimizations.BeginTableWrite;
Expand Down Expand Up @@ -420,6 +421,18 @@ public PlanOptimizers(
.addAll(new CanonicalizeExpressions(plannerContext, typeAnalyzer).rules())
.add(new OptimizeRowPattern())
.build()),
// Run FuseCrossJoinedGlobalAggregations before columnPruningOptimizer, PushPredicateIntoTableScan, AddExchanges
Comment thread
lukasz-stec marked this conversation as resolved.
Outdated
// because columnPruningOptimizer and PushPredicateIntoTableScan can modify table scans in a way
// that two table scans on the same table could not be fused.
// Also fusing operation does not support exchanges and partial aggregation
new IterativeOptimizer(
plannerContext,
ruleStats,
statsCalculator,
costCalculator,
ImmutableSet.of(
new RemoveRedundantIdentityProjections(),
new FuseCrossJoinedGlobalAggregations())),
new IterativeOptimizer(
plannerContext,
ruleStats,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.sql.planner.iterative.rule.fuse;

import io.trino.Session;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
import io.trino.sql.planner.iterative.Rule;
import io.trino.sql.planner.plan.AggregationNode;
import io.trino.sql.planner.plan.JoinNode;

import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.SystemSessionProperties.isCombineSimilarSubPlans;
import static io.trino.matching.Capture.newCapture;
import static io.trino.sql.planner.plan.Patterns.Join.left;
import static io.trino.sql.planner.plan.Patterns.Join.right;
import static io.trino.sql.planner.plan.Patterns.aggregation;
import static io.trino.sql.planner.plan.Patterns.join;
import static io.trino.sql.tree.BooleanLiteral.TRUE_LITERAL;

/**
* Simplified "Computation Reuse via Fusion in Amazon Athena" paper JoinOnKeys rule
* that works only on a single cross join over global aggregations over the same table.
* It transforms
* <pre>
* cross join
* aggregation global
* agg_left <- aggLeft()
* filter(filter_left)
* table scan(table)
* aggregation global
* agg_right <- aggRight()
* filter(filter_right)
* table scan(table)
* </pre>
* into:
* <pre>
* aggregation global
* agg_left <- aggLeft(mask = agg_mask_left)
* agg_right <- aggRight(mask = agg_mask_right)
* project(agg_mask_left = filter_left, agg_mask_left = filter_right)
* filter(filter_left or filter_right)
* table scan
* </pre>
* <p>
* This makes the plan to read the source table only once,
* resulting in the potentially significant performance improvement.
*/
public class FuseCrossJoinedGlobalAggregations
implements Rule<JoinNode>
{
private static final Capture<AggregationNode> LEFT_AGGREGATION_NODE = newCapture();
private static final Capture<AggregationNode> RIGHT_AGGREGATION_NODE = newCapture();
private static final Pattern<JoinNode> PATTERN = join()
.matching(JoinNode::isCrossJoin)
.with(left()
.matching(aggregation()
.matching(AggregationNode::hasSingleGlobalAggregation)
.capturedAs(LEFT_AGGREGATION_NODE)))
.with(right()
.matching(aggregation()
.matching(AggregationNode::hasSingleGlobalAggregation)
.capturedAs(RIGHT_AGGREGATION_NODE)));

@Override
public Pattern<JoinNode> getPattern()
{
return PATTERN;
}

@Override
public boolean isEnabled(Session session)
{
return isCombineSimilarSubPlans(session);
}

@Override
public Result apply(JoinNode node, Captures captures, Context context)
{
AggregationNode left = captures.get(LEFT_AGGREGATION_NODE);
AggregationNode right = captures.get(RIGHT_AGGREGATION_NODE);

return PlanNodeFuser.fuse(context, left, right)
.map(fused -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of exposing FusedPlanNode in public API, I think this logic can move to PlanNodeFuser and we have that return a Optional<PlanNode> instead

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO FusedPlanNode is the public API for fusion.
Now, for this specific case, I could move the check but for the more general version of this rule that is coming (JoinOnKeys) and for the other rules, the filters can be there

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, are the filters going to be handled differently based on what is the parent node ?
Can we move the part about adding projections into PlanNodeFuser by adding Set<Symbol> resultOutputSymbols to API or is this part also specific to this rule ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are the filters going to be handled differently based on what is the parent node ?

Yes, at least this is what I see in the paper (the implementation can vary)

Can we move the part about adding projections into PlanNodeFuser by adding Set resultOutputSymbols to API or is this part also specific to this rule ?

I don't think it's specific to this rule but also we don't have to add it to the fusion API.
I refactored this and moved this logic to the static io.trino.sql.planner.iterative.rule.fuse.PlanNodeFuser#fuse(io.trino.sql.planner.iterative.Rule.Context, io.trino.sql.planner.plan.PlanNode, io.trino.sql.planner.plan.PlanNode)

checkArgument(
fused.leftFilter().equals(TRUE_LITERAL) && fused.rightFilter().equals(TRUE_LITERAL),
"Expected both fused filter to be TRUE but got left %s, right %s",
fused.leftFilter(),
fused.rightFilter());
return Result.ofPlanNode(fused.plan());
})
.orElse(Result.empty());
}
}
Loading