perf(optimizer): Make cost-based strategy of using parent preference in AddLocalExchanges#26960
Conversation
Reviewer's GuideIntroduces a configurable, cost-based strategy for whether AddLocalExchanges should honor parent partitioning preferences for aggregations, wiring it through session/config properties, using stats when in AUTOMATIC mode, and adding planner tests to validate the behavior. Sequence diagram for AddLocalExchanges optimization with AUTOMATIC strategysequenceDiagram
participant Planner as PlanOptimizers
participant Optimizer as AddLocalExchanges
participant Session as Session
participant StatsCalc as StatsCalculator
participant StatsProv as CachingStatsProvider
participant Plan as PlanNode
participant Rew as Rewriter
Planner->>Optimizer: optimize(Plan, Session, TypeProvider, VariableAllocator, PlanNodeIdAllocator, WarningCollector)
Optimizer->>Session: getLocalExchangeParentPreferenceStrategy(Session)
Session-->>Optimizer: LocalExchangeParentPreferenceStrategy strategy
alt strategy == AUTOMATIC
Optimizer->>StatsCalc: construct CachingStatsProvider(Session, TypeProvider)
StatsCalc-->>Optimizer: StatsProv
Optimizer->>Optimizer: preComputeStats(Plan, StatsProv)
loop preComputeStats recursion
Optimizer->>Plan: getSources()
Plan-->>Optimizer: child PlanNodes
Optimizer->>StatsProv: getStats(node)
StatsProv-->>Optimizer: PlanNodeStatsEstimate
end
Optimizer->>Rew: new Rewriter(VariableAllocator, PlanNodeIdAllocator, Session, strategy, Optional StatsProv, nativeExecution)
else strategy == ALWAYS or NEVER
Optimizer->>Rew: new Rewriter(VariableAllocator, PlanNodeIdAllocator, Session, strategy, Optional empty, nativeExecution)
end
Optimizer->>Rew: accept(Plan, any StreamPreferredProperties)
Rew-->>Optimizer: PlanWithProperties result
Optimizer->>Optimizer: check for local ExchangeNode
Optimizer-->>Planner: PlanOptimizerResult
Class diagram for new local exchange parent preference strategy integrationclassDiagram
class AddLocalExchanges {
- Metadata metadata
- StatsCalculator statsCalculator
- boolean nativeExecution
+ AddLocalExchanges(Metadata metadata, StatsCalculator statsCalculator, boolean nativeExecution)
+ PlanOptimizerResult optimize(PlanNode plan, Session session, TypeProvider types, VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, WarningCollector warningCollector)
- void preComputeStats(PlanNode node, StatsProvider statsProvider)
}
class Rewriter {
- VariableAllocator variableAllocator
- PlanNodeIdAllocator idAllocator
- Session session
- TypeProvider types
- LocalExchangeParentPreferenceStrategy parentPreferenceStrategy
- Optional~StatsProvider~ statsProvider
- boolean nativeExecution
+ Rewriter(VariableAllocator variableAllocator, PlanNodeIdAllocator idAllocator, Session session, LocalExchangeParentPreferenceStrategy parentPreferenceStrategy, Optional~StatsProvider~ statsProvider, boolean nativeExecution)
+ PlanWithProperties visitAggregation(AggregationNode node, StreamPreferredProperties parentPreferences)
}
class LocalExchangeParentPreferenceStrategy {
<<enum>>
+ ALWAYS
+ NEVER
+ AUTOMATIC
}
class FeaturesConfig {
- LocalExchangeParentPreferenceStrategy localExchangeParentPreferenceStrategy
+ LocalExchangeParentPreferenceStrategy getLocalExchangeParentPreferenceStrategy()
+ FeaturesConfig setLocalExchangeParentPreferenceStrategy(LocalExchangeParentPreferenceStrategy localExchangeParentPreferenceStrategy)
}
class SystemSessionProperties {
+ String LOCAL_EXCHANGE_PARENT_PREFERENCE_STRATEGY
+ LocalExchangeParentPreferenceStrategy getLocalExchangeParentPreferenceStrategy(Session session)
}
class PlanOptimizers {
+ PlanOptimizers(Metadata metadata, StatsCalculator statsCalculator, FeaturesConfig featuresConfig)
}
AddLocalExchanges *-- Rewriter
Rewriter --> LocalExchangeParentPreferenceStrategy
FeaturesConfig --> LocalExchangeParentPreferenceStrategy
SystemSessionProperties --> LocalExchangeParentPreferenceStrategy
PlanOptimizers --> AddLocalExchanges
AddLocalExchanges --> StatsCalculator
AddLocalExchanges --> StatsProvider
Rewriter --> StatsProvider
Rewriter --> AggregationNode
Rewriter --> StreamPreferredProperties
Flow diagram for AUTOMATIC local exchange parent preference decisionflowchart TD
A[Start visitAggregation] --> B{parentPreferenceStrategy}
B -->|ALWAYS| C[useParentPreferences = true]
B -->|NEVER| D[useParentPreferences = false]
B -->|AUTOMATIC| E[Initialize parentPartitionCardinality = 1]
E --> F{Has partitioningColumns and statsProvider}
F -->|No| G[useParentPreferences = parentPartitionCardinality >= taskConcurrency]
F -->|Yes| H[Get stats for source node]
H --> I[For each partitionColumn]
I --> J{distinctCount is NaN}
J -->|Yes| K[Set parentPartitionCardinality = 0 and break]
J -->|No| L[Multiply parentPartitionCardinality by distinctCount]
L --> M{More partitionColumns?}
M -->|Yes| I
M -->|No| G
K --> G
C --> N[Derive childRequirements from parentPreferences]
D --> O[Derive childRequirements from any StreamPreferredProperties]
G --> P{parentPartitionCardinality >= taskConcurrency}
P -->|true| N
P -->|false| O
N --> Q[Proceed with local exchange using parent preferences]
O --> R[Proceed with local exchange using grouping keys only]
Q --> S[End]
R --> S[End]
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
feilong-liu
left a comment
There was a problem hiding this comment.
Add some comments, also consider add unit test for it
| { | ||
| PlanWithProperties result = new Rewriter(variableAllocator, idAllocator, session, nativeExecution).accept(plan, any()); | ||
| StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types); | ||
| preComputeStats(plan, statsProvider); |
There was a problem hiding this comment.
Why need precompute here?
There was a problem hiding this comment.
Hi @feilong-liu, this is because visitAggregation() is called top-down from the root node recursively. The use of parent preference in visitAggregation() is done during the winding phase of the recursion (i.e., parent preference is passed from root to leaf), so the decision of whether to use parent preference should be made during the winding phase too. On the other hand, CachingStatsProvider produce stats during unwinding of its recursion (i.e., parent's stats is based on child's stats). If we call CachingStatsProvider::getStats() during the winding phase of visitAggregation(), the stats would be unknown because child's stats hasn't been calculated yet.
Update: Looked deeper and found that precomputing the stats is indeed unnecessary. The precomputation was added before when I saw that a test query got no stats in visitAggregation(). But I stepped through the execution and found that it was actually due to an internal plan node blocking the stats propagation. So I removed the preComputeStats here since it's irrelevant and unnecessary.
| .constrainTo(node.getSource().getOutputVariables()) | ||
| .withDefaultParallelism(session) | ||
| .withPartitioning(groupingKeys); | ||
| double parentPartitionCardinality = 1; |
There was a problem hiding this comment.
In addition to rely on stats, perhaps also add one session property to control the behavior. It can be always_enabled, disabled, and cost_based. This will help the case where stats are not available or not accurate
There was a problem hiding this comment.
Added. Thanks for the suggestion.
4710a54 to
12d596d
Compare
12d596d to
e99a9c8
Compare
There was a problem hiding this comment.
Hey - I've found 2 issues, and left some high level feedback:
- In the AUTOMATIC strategy,
parentPartitionCardinalitydefaults to1when parent partitioning columns are absent or stats are unavailable, which can still enable parent preference fortask_concurrency = 1; if the intent is to skip parent preference when stats are missing (as described in the PR), consider explicitly treating the "no stats / no partitioning" case asuseParentPreferences = false. - The
preComputeStatsmethod eagerly traverses the entire plan tree and computes stats for every node; given thatCachingStatsProvideralready memoizes, you might want to rely on lazygetStatscalls instead to avoid unnecessary work on large plans.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In the AUTOMATIC strategy, `parentPartitionCardinality` defaults to `1` when parent partitioning columns are absent or stats are unavailable, which can still enable parent preference for `task_concurrency = 1`; if the intent is to skip parent preference when stats are missing (as described in the PR), consider explicitly treating the "no stats / no partitioning" case as `useParentPreferences = false`.
- The `preComputeStats` method eagerly traverses the entire plan tree and computes stats for every node; given that `CachingStatsProvider` already memoizes, you might want to rely on lazy `getStats` calls instead to avoid unnecessary work on large plans.
## Individual Comments
### Comment 1
<location> `presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java:159-164` </location>
<code_context>
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
}
+ private void preComputeStats(PlanNode node, StatsProvider statsProvider)
+ {
+ for (PlanNode child : node.getSources()) {
+ preComputeStats(child, statsProvider);
+ }
+ statsProvider.getStats(node);
+ }
+
</code_context>
<issue_to_address>
**suggestion (performance):** Consider limiting or guarding the full-plan pre-computation of stats to avoid unnecessary work on large plans.
preComputeStats() recursively walks the full plan and calls statsProvider.getStats() on every node when the strategy is AUTOMATIC. For large plans this can be quite expensive, especially since AddLocalExchanges runs for every query and relatively late in optimization. If the decision logic only needs stats for specific node types or subtrees, consider narrowing the traversal or adding a cheap guard (e.g., early exit when stats are disabled/unavailable). Alternatively, prefer on-demand stats with caching instead of a full upfront traversal.
Suggested implementation:
```java
StatsProvider provider = new CachingStatsProvider(statsCalculator, session, types);
if (shouldPreComputeStats(plan, provider)) {
preComputeStats(plan, provider, MAX_PRECOMPUTE_STATS_NODES);
}
statsProvider = Optional.of(provider);
}
PlanWithProperties result = new Rewriter(variableAllocator, idAllocator, session, strategy, statsProvider, nativeExecution).accept(plan, any());
boolean optimizerTriggered = PlanNodeSearcher.searchFrom(result.getNode()).where(node -> node instanceof ExchangeNode && ((ExchangeNode) node).getScope().isLocal()).findFirst().isPresent();
return PlanOptimizerResult.optimizerResult(result.getNode(), optimizerTriggered);
}
private static final int MAX_PRECOMPUTE_STATS_NODES = 10_000;
private boolean shouldPreComputeStats(PlanNode root, StatsProvider statsProvider)
{
// Cheap guard: if we can't get meaningful stats for the root, skip the full traversal.
PlanNodeStatsEstimate rootStats = statsProvider.getStats(root);
return !rootStats.isOutputRowCountUnknown();
}
private void preComputeStats(PlanNode node, StatsProvider statsProvider, int remainingBudget)
{
if (remainingBudget <= 0) {
return;
}
int budget = remainingBudget - 1;
for (PlanNode child : node.getSources()) {
if (budget <= 0) {
break;
}
int before = budget;
preComputeStats(child, statsProvider, budget);
// Heuristic: assume each recursive call consumes at least one unit of budget.
// For safety, decrement by one after each child to avoid unbounded traversal.
budget = before - 1;
}
statsProvider.getStats(node);
}
```
1. If the actual `PlanNodeStatsEstimate` API differs (e.g., no `isOutputRowCountUnknown()`), adjust `shouldPreComputeStats` to use the correct way of checking whether stats are unavailable, such as comparing to `PlanNodeStatsEstimate.unknown()`.
2. The `MAX_PRECOMPUTE_STATS_NODES` value (10_000) can be tuned or made configurable via a session property if your codebase typically uses a different limit for “large” plans.
3. If you prefer a more precise budget accounting, you can rework `preComputeStats` to return the remaining budget instead of using the simple decrement heuristic shown here, but keep the same guard and budget pattern to avoid full-plan walks.
</issue_to_address>
### Comment 2
<location> `presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java:2162-2165` </location>
<code_context>
+ assertLocalExchangeWithParentPreference(query, "NEVER", "4", false);
+ assertLocalExchangeWithParentPreference(query, "NEVER", "2", false);
+
+ // Test AUTOMATIC strategy: cost-based decision.
+ // When task concurrency (4) > parent cardinality (3), don't use parent preferences.
+ assertLocalExchangeWithParentPreference(query, "AUTOMATIC", "4", false);
+ // When task concurrency (2) <= parent cardinality (3), use parent preferences.
+ assertLocalExchangeWithParentPreference(query, "AUTOMATIC", "2", true);
+ }
</code_context>
<issue_to_address>
**issue (testing):** Add a test for AUTOMATIC strategy when stats are not available to assert we fall back to not using parent preference
The current test only covers AUTOMATIC when stats are available. Please add a case where stats are missing (e.g., a plan/table without stats or configuration that disables stats) and assert that AUTOMATIC behaves like NEVER (no parent preference). This ensures regressions in the `statsProvider.isPresent()` / pre-computation path are caught.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
...main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java
Outdated
Show resolved
Hide resolved
| // Test AUTOMATIC strategy: cost-based decision. | ||
| // When task concurrency (4) > parent cardinality (3), don't use parent preferences. | ||
| assertLocalExchangeWithParentPreference(query, "AUTOMATIC", "4", false); | ||
| // When task concurrency (2) <= parent cardinality (3), use parent preferences. |
There was a problem hiding this comment.
issue (testing): Add a test for AUTOMATIC strategy when stats are not available to assert we fall back to not using parent preference
The current test only covers AUTOMATIC when stats are available. Please add a case where stats are missing (e.g., a plan/table without stats or configuration that disables stats) and assert that AUTOMATIC behaves like NEVER (no parent preference). This ensures regressions in the statsProvider.isPresent() / pre-computation path are caught.
steveburnett
left a comment
There was a problem hiding this comment.
Please include documentation for the new property, as described in Designing Your Code in CONTRIBUTING.md:
"All new language features, new functions, session and config properties, and major features have documentation added"
e99a9c8 to
23c4f64
Compare
Hi @feilong-liu, do you have a recommendation of where the right place to document the new config added in FeaturesConfig.java would be? Thanks! |
presto-docs/src/main/sphinx/admin/properties-session.rst for session property Also you can try AI to write the doc, it's very good at it. |
| StatsProvider provider = new CachingStatsProvider(statsCalculator, session, types); | ||
| statsProvider = Optional.of(provider); |
There was a problem hiding this comment.
| StatsProvider provider = new CachingStatsProvider(statsCalculator, session, types); | |
| statsProvider = Optional.of(provider); | |
| statsProvider = Optional.of(new CachingStatsProvider(statsCalculator, session, types)); |
23c4f64 to
fcdbe23
Compare
…n limit parallelism when the cardinality of the partition column of parent preference is low. In a setup where a query is allowed to use many cores, limiting the parallelism significantly affect the query latency. More details can be found in prestodb#26961. This PR makes three changes: * This PR introduces a new feature config `localExchangeParentPreferenceStrategy` that has three values: ALWAYS, NEVER, and AUTOMATIC. The default value is ALWAYS (i.e., current behavior). * This PR makes AddLocalExchanges to use parent preference according to the localExchangeParentPreferenceStrategy. When localExchangeParentPreferenceStrategy is ALWAYS, it always uses parent preference. When localExchangeParentPreferenceStrategy is NEVER, it always not uses parent preference. When localExchangeParentPreferenceStrategy is AUTOMATIC, it uses parent preference only when the estimated cardinality is larger than the task concurrency. (If estimated stats is not available, parent preference is not used.) - Notice that the estimated stats is only calculated when localExchangeParentPreferenceStrategy is AUTOMATIC. * This PR adds unit tests of the new config and the change to local-exchange.
fcdbe23 to
553a31a
Compare
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull branch, local doc build, looks good.
Thank you for the documentation!
We observed that the use of parent preference in AddLocalExchanges can limit parallelism when the cardinality of the partition column of parent preference is low. In a setup where a query is allowed to use many cores, limiting the parallelism significantly affect the query latency. More details can be found in #26961.
This PR makes three changes:
localExchangeParentPreferenceStrategythat has three values: ALWAYS, NEVER, and AUTOMATIC. The default value is ALWAYS (i.e., current behavior).Description
Motivation and Context
Impact
Test Plan
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
Summary by Sourcery
Introduce a configurable strategy for using parent preferences in AddLocalExchanges and make local exchange partitioning for aggregations cost-aware based on estimated cardinality and task concurrency.
New Features:
Enhancements:
Tests: