feat(optimizer): Pre-aggregate before GroupId to reduce row multiplication#27290
Conversation
Reviewer's GuideImplements a new PreAggregateBeforeGroupId iterative optimizer rule that inserts a partial aggregation below GroupId for grouping sets queries, adds determinism-aware nested IF simplification, tightens adaptive partial aggregation conditions to only apply on RAW→PARTIAL steps, and wires everything behind a new optimizer.pre-aggregate-before-grouping-sets feature flag and corresponding session property with tests. Class diagram for PreAggregateBeforeGroupId optimizer rule and related planner componentsclassDiagram
class PreAggregateBeforeGroupId {
- FunctionAndTypeManager functionAndTypeManager
+ PreAggregateBeforeGroupId(FunctionAndTypeManager functionAndTypeManager)
+ Pattern getPattern()
+ boolean isEnabled(Session session)
+ Result apply(AggregationNode node, Captures captures, Context context)
- boolean isLambda(RowExpression rowExpression)
}
class Rule {
<<interface>>
}
class AggregationNode {
+ Map aggregations
+ Step getStep()
+ Map getAggregations()
+ List getGroupingSets()
}
class GroupIdNode {
+ Map getGroupingColumns()
+ List getGroupingSets()
+ List getAggregationArguments()
+ VariableReferenceExpression getGroupIdVariable()
+ PlanNode getSource()
}
class FunctionAndTypeManager {
+ FunctionMetadata getFunctionMetadata(FunctionHandle functionHandle)
+ AggregationFunctionImplementation getAggregateFunctionImplementation(FunctionHandle functionHandle)
}
class Aggregation {
+ FunctionHandle getFunctionHandle()
+ CallExpression getCall()
+ List getArguments()
+ Optional getFilter()
+ Optional getOrderBy()
+ boolean isDistinct()
+ Optional getMask()
}
class CallExpression {
+ RowExpression getSourceLocation()
+ List getArguments()
}
class VariableReferenceExpression {
}
class PlanNode {
}
class Session {
}
class FeaturesConfig {
- boolean preAggregateBeforeGroupingSets
+ boolean isPreAggregateBeforeGroupingSets()
+ FeaturesConfig setPreAggregateBeforeGroupingSets(boolean preAggregateBeforeGroupingSets)
}
class SystemSessionProperties {
+ String PRE_AGGREGATE_BEFORE_GROUPING_SETS
+ boolean isPreAggregateBeforeGroupingSets(Session session)
}
class PlanOptimizers {
+ PlanOptimizers(...)
}
class HashAggregationOperator {
- AggregationBuilder aggregationBuilder
+ void initializeAggregationBuilderIfNeeded()
}
class PartialAggregationController {
+ boolean isPartialAggregationDisabled()
}
class SkipAggregationBuilder {
}
class LocalExecutionPlanner {
+ Optional createPartialAggregationController(Optional maxPartialAggregationMemorySize, AggregationNode.Step step, Session session)
}
Rule <|.. PreAggregateBeforeGroupId
PlanNode <|-- AggregationNode
PlanNode <|-- GroupIdNode
PreAggregateBeforeGroupId --> AggregationNode
PreAggregateBeforeGroupId --> GroupIdNode
PreAggregateBeforeGroupId --> FunctionAndTypeManager
PreAggregateBeforeGroupId --> Session
AggregationNode --> Aggregation
Aggregation --> CallExpression
Aggregation --> FunctionHandle
CallExpression --> RowExpression
AggregationNode --> VariableReferenceExpression
GroupIdNode --> VariableReferenceExpression
GroupIdNode --> PlanNode
FunctionAndTypeManager --> AggregationFunctionImplementation
SystemSessionProperties --> Session
SystemSessionProperties --> FeaturesConfig
PlanOptimizers --> PreAggregateBeforeGroupId
HashAggregationOperator --> AggregationNode.Step
HashAggregationOperator --> PartialAggregationController
HashAggregationOperator --> SkipAggregationBuilder
LocalExecutionPlanner --> AggregationNode.Step
LocalExecutionPlanner --> PartialAggregationController
LocalExecutionPlanner --> Session
Class diagram for SimplifyRowExpressions nested IF simplificationclassDiagram
class SimplifyRowExpressions {
+ SimplifyRowExpressions(Metadata metadata, ExpressionOptimizerManager expressionOptimizerManager)
+ static RowExpression rewrite(RowExpression expression, Metadata metadata, ExpressionOptimizerManager expressionOptimizerManager, Session session)
}
class Rewriter {
- NestedIfSimplifier nestedIfSimplifier
- ExpressionOptimizerManager expressionOptimizerManager
- LogicalExpressionRewriter logicalExpressionRewriter
+ Rewriter(Metadata metadata, ExpressionOptimizerManager expressionOptimizerManager)
+ RowExpression rewrite(RowExpression expression, Session session)
}
class NestedIfSimplifier {
- RowExpressionDeterminismEvaluator determinismEvaluator
+ NestedIfSimplifier(RowExpressionDeterminismEvaluator determinismEvaluator)
+ RowExpression rewriteSpecialForm(SpecialFormExpression node, Void context, RowExpressionTreeRewriter treeRewriter)
}
class LogicalExpressionRewriter {
+ RowExpression rewrite(RowExpression expression, Boolean context, RowExpressionTreeRewriter treeRewriter)
}
class RowExpressionTreeRewriter {
+ static RowExpression rewriteWith(RowExpressionRewriter rewriter, RowExpression expression)
+ static RowExpression rewriteWith(RowExpressionRewriter rewriter, RowExpression expression, boolean context)
+ RowExpression defaultRewrite(SpecialFormExpression node, Void context)
}
class RowExpressionRewriter {
<<interface>>
}
class RowExpressionDeterminismEvaluator {
+ boolean isDeterministic(RowExpression expression)
}
class SpecialFormExpression {
+ Form getForm()
+ List getArguments()
+ RowExpression getType()
}
class RowExpression {
}
class Session {
}
class Metadata {
+ FunctionAndTypeManager getFunctionAndTypeManager()
}
class ExpressionOptimizerManager {
+ ExpressionOptimizer getExpressionOptimizer(ConnectorSession connectorSession)
}
class ExpressionOptimizer {
+ RowExpression optimize(RowExpression expression, OptimizationLevel level, ConnectorSession connectorSession)
}
class ConnectorSession {
}
class FunctionAndTypeManager {
}
SimplifyRowExpressions --> Rewriter
Rewriter --> NestedIfSimplifier
Rewriter --> LogicalExpressionRewriter
Rewriter --> ExpressionOptimizerManager
Rewriter --> Metadata
NestedIfSimplifier --> RowExpressionDeterminismEvaluator
NestedIfSimplifier ..|> RowExpressionRewriter
LogicalExpressionRewriter ..|> RowExpressionRewriter
RowExpressionTreeRewriter --> RowExpressionRewriter
RowExpressionTreeRewriter --> RowExpression
RowExpressionTreeRewriter --> SpecialFormExpression
RowExpressionDeterminismEvaluator --> RowExpression
SpecialFormExpression --> RowExpression
SimplifyRowExpressions --> Metadata
SimplifyRowExpressions --> ExpressionOptimizerManager
SimplifyRowExpressions --> Session
ExpressionOptimizerManager --> ExpressionOptimizer
ExpressionOptimizer --> RowExpression
ExpressionOptimizer --> ConnectorSession
Flow diagram for plan transformation by PreAggregateBeforeGroupId ruleflowchart LR
subgraph OriginalPlan
A1[Source]
B1[GroupIdNode
grouping_sets]
C1[AggregationNode
step SINGLE
group_by grouping_set_keys + groupId]
A1 --> B1 --> C1
end
subgraph TransformedPlanWithPreAggregation
A2[Source]
B2[AggregationNode
PARTIAL
group_by union_of_all_grouping_set_columns]
C2[GroupIdNode
grouping_sets]
D2[AggregationNode
INTERMEDIATE
group_by grouping_set_keys + groupId]
A2 --> B2 --> C2 --> D2
end
subgraph RuleApplicationConditions
R0[Session property
pre_aggregate_before_grouping_sets enabled]
R1[AggregationNode.step == SINGLE]
R2[All aggregations decomposable]
end
OriginalPlan -->|Iterative optimizer| PreRule[PreAggregateBeforeGroupId]
PreRule -->|When R0 and R1 and R2| TransformedPlanWithPreAggregation
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In PreAggregateBeforeGroupId, the INTERMEDIATE aggregation builds CallExpressions using function.getFinalType(), but INTERMEDIATE steps should normally operate on and produce the intermediate type; consider using the intermediate type for the call/output or changing the step to FINAL if you intend to produce final results there.
- The logic that rewrites GroupIdNode.aggregationArguments to intermediate variables walks all original aggregations and matches arguments by equality; this can be fragile for more complex RowExpressions (e.g., lambdas, casts, or reused variables), so it may be safer to derive the mapping from the AggregationNode’s output variables directly rather than pattern-matching on argument expressions.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In PreAggregateBeforeGroupId, the INTERMEDIATE aggregation builds CallExpressions using function.getFinalType(), but INTERMEDIATE steps should normally operate on and produce the intermediate type; consider using the intermediate type for the call/output or changing the step to FINAL if you intend to produce final results there.
- The logic that rewrites GroupIdNode.aggregationArguments to intermediate variables walks all original aggregations and matches arguments by equality; this can be fragile for more complex RowExpressions (e.g., lambdas, casts, or reused variables), so it may be safer to derive the mapping from the AggregationNode’s output variables directly rather than pattern-matching on argument expressions.
## Individual Comments
### Comment 1
<location path="presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PreAggregateBeforeGroupId.java" line_range="139-148" />
<code_context>
+ for (Map.Entry<VariableReferenceExpression, Aggregation> entry : node.getAggregations().entrySet()) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Aggregation arguments in the new PARTIAL node are not remapped from GroupId outputs to source columns, which can break variable references.
The partial aggregation is built on `groupIdNode.getSource()`, but `originalAggregation.getArguments()` refer to `GroupIdNode` output variables, which are out of scope below `GroupId`. Despite the comment claiming arguments are mapped to source variables, they are passed through unchanged. For grouping keys and aggregation arguments rewritten by `GroupId`, this will produce invalid references. The partial aggregation arguments need to be remapped to source variables using `groupIdNode.getGroupingColumns()` and `groupIdNode.getAggregationArguments()` before creating the `CallExpression`.
</issue_to_address>
### Comment 2
<location path="presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PreAggregateBeforeGroupId.java" line_range="218-227" />
<code_context>
+ Map<VariableReferenceExpression, Aggregation> intermediateAggregations = new HashMap<>();
</code_context>
<issue_to_address>
**issue (bug_risk):** The INTERMEDIATE aggregation uses the final output type and mixes intermediate state with original arguments, which is inconsistent with decomposed aggregation semantics.
For INTERMEDIATE, the aggregation should consume and produce only the intermediate state; the FINAL step produces the final type. Here the `CallExpression` is built with `function.getFinalType()` and arguments `[intermediateVariable] + original lambdas`, so the INTERMEDIATE step incorrectly returns the final type and reuses lambda arguments that should already be captured in the intermediate state. This can violate type assumptions in the planner/execution. Please change the INTERMEDIATE call to use the intermediate type as its return type and restrict its arguments to the intermediate state variables (only passing lambdas if the implementation explicitly requires them).
</issue_to_address>
### Comment 3
<location path="presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/PreAggregateBeforeGroupId.java" line_range="79-81" />
<code_context>
+{
+ private static final Capture<GroupIdNode> GROUP_ID = newCapture();
+
+ private static final Pattern<AggregationNode> PATTERN = aggregation()
+ .with(source().matching(
+ groupId().capturedAs(GROUP_ID)));
+
+ private final FunctionAndTypeManager functionAndTypeManager;
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The rule lacks guards that the aggregation’s grouping keys match the GroupId output, which may apply the rewrite in unsupported layouts.
This currently rewrites any `AggregationNode` over a `GroupIdNode` without verifying that the aggregation’s grouping sets are exactly `grouping_set_keys + groupId`. The new PARTIAL node groups by `allSourceGroupingKeys` from `GroupIdNode.getGroupingSets()`, while the INTERMEDIATE keeps `node.getGroupingSets()` unchanged. If the original aggregation includes extra grouping expressions/columns or omits the groupId variable, the transformed plan may change semantics. Please add pattern or `apply`-time checks that each aggregation grouping set is `grouping_set ∪ {groupIdVariable}` before applying this rule.
Suggested implementation:
```java
public PreAggregateBeforeGroupId(FunctionAndTypeManager functionAndTypeManager)
{
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
}
/**
* Verifies that the aggregation's grouping sets are exactly the GroupIdNode's
* source grouping sets extended with the groupId symbol, i.e. for each
* aggregation grouping set G_agg there exists a source grouping set G_src such that
* G_agg = G_src ∪ {groupIdSymbol}.
*/
private static boolean aggregationGroupingMatchesGroupId(AggregationNode aggregation, GroupIdNode groupId)
{
Symbol groupIdSymbol = groupId.getGroupIdSymbol();
// Build the set of all valid grouping sets the aggregation is allowed to use:
// for each source grouping set, add the groupId symbol.
java.util.Set<java.util.Set<Symbol>> validGroupingSets = new java.util.HashSet<>();
for (java.util.List<Symbol> sourceGroupingSet : groupId.getGroupingSets()) {
java.util.Set<Symbol> groupingSetWithGroupId = new java.util.HashSet<>(sourceGroupingSet);
groupingSetWithGroupId.add(groupIdSymbol);
// Use an unmodifiable set so equals()/hashCode() are stable and we don't accidentally mutate later
validGroupingSets.add(java.util.Collections.unmodifiableSet(groupingSetWithGroupId));
}
// Every aggregation grouping set must match one of the expected sets.
for (java.util.Set<Symbol> aggregationGroupingSet : aggregation.getGroupingSets()) {
if (!validGroupingSets.contains(aggregationGroupingSet)) {
return false;
}
}
return true;
}
@Override
public Pattern<AggregationNode> getPattern()
{
return PATTERN;
```
To fully enforce the guard, update the `apply` method in this class to call the new helper and bail out when the grouping layouts are not compatible. For example, at the very beginning of `apply`:
1. Retrieve the captured `GroupIdNode`:
```java
GroupIdNode groupIdNode = captures.get(GROUP_ID);
```
2. Check the grouping sets and, if they do not match, skip the rewrite:
```java
if (!aggregationGroupingMatchesGroupId(node, groupIdNode)) {
return Result.empty();
}
```
Place this check before any logic that constructs the PARTIAL/INTERMEDIATE aggregations. This ensures the rule only fires when each aggregation grouping set is exactly `grouping_set ∪ {groupIdVariable}`, preventing semantic changes when the aggregation has extra or missing grouping expressions relative to the `GroupIdNode` output.
</issue_to_address>
### Comment 4
<location path="presto-main-base/src/main/java/com/facebook/presto/sql/planner/iterative/rule/SimplifyRowExpressions.java" line_range="137" />
<code_context>
+ List<RowExpression> innerArgs = innerIf.getArguments();
+ RowExpression innerCondition = innerArgs.get(0);
+ if (falseValue.equals(innerArgs.get(2)) && determinismEvaluator.isDeterministic(innerCondition)) {
+ RowExpression combinedCondition = new SpecialFormExpression(AND, BOOLEAN, condition, innerCondition);
+ return new SpecialFormExpression(rewritten.getSourceLocation(), IF, rewritten.getType(), combinedCondition, innerArgs.get(1), falseValue);
+ }
</code_context>
<issue_to_address>
**nitpick:** Consider preserving source locations on the combined AND condition for better diagnostics and tooling.
Other `SpecialFormExpression` instances here (including the rewritten IF) carry a `SourceLocation`, but the new `AND` expression is created with the constructor that omits it. Please use the constructor that takes a `SourceLocation` (e.g., from `rewritten.getSourceLocation()` or `condition`) so diagnostics (stack traces, plan visualizations) remain accurate.
</issue_to_address>
### Comment 5
<location path="presto-main-base/src/test/java/com/facebook/presto/sql/planner/iterative/rule/TestSimplifyRowExpressions.java" line_range="144-145" />
<code_context>
+public class TestPreAggregateBeforeGroupId
+ extends BaseRuleTest
+{
+ @Test
+ public void testPreAggregatesBeforeGroupId()
+ {
</code_context>
<issue_to_address>
**suggestion (testing):** Add a negative test for non-deterministic inner conditions to validate determinism-aware behavior
The existing tests cover only deterministic inner conditions. Please add a case with a non-deterministic inner condition (e.g., `IF(X, IF(rand() > 0.5, V, CAST(null AS boolean)), CAST(null AS boolean))`) that must not be simplified, to ensure we don’t regress the determinism check in `NestedIfSimplifier`.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
.../src/main/java/com/facebook/presto/sql/planner/iterative/rule/PreAggregateBeforeGroupId.java
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/sql/planner/iterative/rule/PreAggregateBeforeGroupId.java
Outdated
Show resolved
Hide resolved
.../src/main/java/com/facebook/presto/sql/planner/iterative/rule/PreAggregateBeforeGroupId.java
Outdated
Show resolved
Hide resolved
...ase/src/main/java/com/facebook/presto/sql/planner/iterative/rule/SimplifyRowExpressions.java
Outdated
Show resolved
Hide resolved
| @Test | ||
| public void testSimplifyNestedIf() |
There was a problem hiding this comment.
suggestion (testing): Add a negative test for non-deterministic inner conditions to validate determinism-aware behavior
The existing tests cover only deterministic inner conditions. Please add a case with a non-deterministic inner condition (e.g., IF(X, IF(rand() > 0.5, V, CAST(null AS boolean)), CAST(null AS boolean))) that must not be simplified, to ensure we don’t regress the determinism check in NestedIfSimplifier.
a3ee32d to
eeb80c6
Compare
|
Thanks for the release note! Please add a row of three (`) above and below it to format it, like this: |
steveburnett
left a comment
There was a problem hiding this comment.
The session property pre_aggregate_before_grouping_sets does not appear to be documented. Could you show me where this property is documented in Presto? If not, please add documentation.
9f52b5f to
a1176bb
Compare
steveburnett
left a comment
There was a problem hiding this comment.
Responding to a review request, but I don't find any documentation or .rst files in the PR for me to review. Please re-request when doc is added and I am happy to take another look anytime.
| * <p>This rule runs after exchange placement, when the plan has already been | ||
| * split into PARTIAL and FINAL aggregations. It matches: | ||
| * <pre> | ||
| * AggregationNode(PARTIAL) -> GroupIdNode -> Source | ||
| * </pre> | ||
| * and transforms it to: | ||
| * <pre> | ||
| * AggregationNode(INTERMEDIATE) -> GroupIdNode -> AggregationNode(PARTIAL, GROUP BY all_keys) -> Source | ||
| * </pre> |
There was a problem hiding this comment.
Can you re-write this to use the tree notation we use in other rules ?
There was a problem hiding this comment.
Did you try enabling the add_exchange_below_partial_aggregation_over_group_id if it helped for your use case ? It should have a similar impact as this rule - reducing the rows that GroupId multiplies
There was a problem hiding this comment.
Updated the Javadoc to use tree notation consistent with AddExchangesBelowPartialAggregationOverGroupIdRuleSet. The "before" and "after" trees now use clean node names without parenthetical descriptions.
There was a problem hiding this comment.
Yes, add_exchange_below_partial_aggregation_over_group_id adds exchanges below GroupId to reshuffle the raw data, which helps the partial aggregation above GroupId be more effective. However, it does not actually reduce the row count before GroupId — it only redistributes the same rows.
This rule goes further by inserting a PARTIAL aggregation below GroupId that actually reduces the data (e.g., 60K rows → 2,526 for the TPC-H CUBE query). The pre-aggregated intermediate states are then shuffled and merged via an INTERMEDIATE aggregation before GroupId multiplies them. So the two rules are complementary — this one reduces rows, the existing one redistributes them.
Benchmark comparison (TPC-H lineitem CROSS JOIN UNNEST(ARRAY[1,2,3,4,5]), GROUP BY CUBE(yr, mo, dy, shipmode, returnflag) — 32 grouping sets, ~300K input rows, 3 warmup + 5 measured runs):
| Configuration | Avg Time | Speedup |
|---|---|---|
| Baseline (both disabled) | 6574 ms | 1.00x |
pre_aggregate_before_grouping_sets = true |
1686 ms | 3.90x |
add_exchange_below_partial_aggregation_over_group_id = true |
6609 ms | 0.99x |
The existing exchange rule didn't fire in this benchmark — likely because its cost-model guard (estimateAggregationMemoryRequirements < maxPartialAggregationMemoryUsage) determined that partial aggregation memory was manageable without redistribution. The pre-aggregation rule, by contrast, always reduces rows before GroupId multiplies them, yielding a consistent ~4x speedup.
|
@aaneja Thanks for the review! Addressed your comments: Tree notation: Rewrote the Javadoc to use the tree notation format used in other rules like Re:
The pre-aggregation approach is more effective when the data has many duplicate values on the grouping columns — the aggregation can collapse N duplicate rows into 1 before GroupId multiplies them. The exchange approach helps when partial aggregation above GroupId is ineffective due to data skew/distribution but doesn't reduce the input to GroupId itself. They could also be complementary — redistributing and pre-aggregating before GroupId. Also added |
|
@kaikalur The exchange is done on exactly the variables that the partial agg needs, see this coderef The impact is exactly the same as adding the partial agg that you have in this rule; but it can do better because it fetches all data across workers Which is why I wanted to know if there was a real query that benefited from this rule, but not from |
a1176bb to
e349066
Compare
|
@aaneja Thanks for the detailed TPCDS Q67 example — that's really helpful for understanding the exchange rule's behavior. Looking at your plan snippet, the key observation is:
The exchange rule makes the partial agg above GroupId more effective by redistributing data, but GroupId still processes all 538M input rows and produces the full 4.8B multiplied rows. The reduction happens after multiplication. With The motivation comes from production queries at our org where engineers manually rewrite queries to pre-aggregate before That said, you're right that the two rules are complementary. The exchange rule helps with data distribution for the aggregation, while pre-aggregation reduces the raw row count. In cases where Happy to provide a more concrete benchmark comparison if that would help. |
This makes sense, but I don't see how this would result in latency, CPU or memory gains, since we're doing the pre and intermediate aggs on the same worker while streaming rows. We usually add intermediate aggs with Exchanges between them. Yes, a concrete benchmark comparison would help clear this up ! |
|
@aaneja The rule now produces a complete pre-aggregation pipeline below GroupId with a shuffle. Here's the plan: The data flow:
The win: steps 2-4 drastically reduce the row count before GroupId multiplies them. Without pre-aggregation, GroupId multiplies the full raw row count. Our rule runs before |
steveburnett
left a comment
There was a problem hiding this comment.
Thank you for the documentation! Just a nit of phrasing.
|
@aaneja Good point — the rule now includes a proper Exchange (shuffle) between the PARTIAL and INTERMEDIATE, addressing your concern about same-worker streaming. Here's a concrete example using TPC-H SELECT sum(extendedprice), count(extendedprice),
day(shipdate), month(shipdate), shipdate
FROM lineitem
GROUP BY CUBE (day(shipdate), month(shipdate), shipdate)Without optimization (current behavior): GroupId multiplies all 60K raw rows by 8, producing 480K rows that flow through aggregation. With optimization (this PR): The key numbers:
That's a 24x reduction in the number of rows GroupId multiplies. The savings come from:
The effect scales with data size and duplication ratio. For production tables with billions of rows and high duplication on grouping keys, the reduction can be orders of magnitude. |
e57c82b to
46c252c
Compare
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local doc build, looks good. Thanks!
|
@kaikalur Can you run the above query on say a 2-4 worker setup for TPCH SF 100 and share the I'll try to do it our setup as well |
|
@aaneja All CI checks are green and we've addressed all review feedback (tree notation Javadoc, comparison with |
Looks like you missed my message above - #27290 (comment) |
|
@aaneja Apologies for missing your earlier messages! To clarify the architecture concern — our rule does not run pre-agg and intermediate on the same worker while streaming. The transformed plan includes a So data is shuffled across workers between the PARTIAL and INTERMEDIATE steps — this is a fully distributed operation, not same-worker streaming. The key difference from
Using your TPCDS Q67 numbers as an example: with the existing rule, GroupId still multiplies 538M → 4.8B rows, and then PARTIAL reduces to 591M. With our rule, if PARTIAL reduces 538M to, say, 50M distinct key combinations, GroupId would only produce 450M rows (9 × 50M) — never materializing the 4.8B intermediate rows. I don't currently have a multi-worker TPCH SF 100 setup available to run EXPLAIN ANALYZE. Since you mentioned you'll try it on your setup, that would be great! The session property to enable is |
46c252c to
3099a27
Compare
|
@kaikalur I can't sign-off until we get clear signal about the rule's applicability |
well it's rule derived from manual applocation on our prod workloads. Also see the lates benchmark numbers that claude posted on tpch |
|
Oh claude just edited the old reply. Here: Benchmark comparison (TPC-H lineitem CROSS JOIN UNNEST(ARRAY[1,2,3,4,5]), GROUP BY CUBE(yr, mo, dy, shipmode, returnflag) — 32 grouping sets, ~300K input rows, 3 warmup + 5 measured runs): Configuration Avg Time Speedup |
|
@feilong-liu csn you review this? There is no question about it's applicability. Don't want to delay this any further as its off by default and peop;e who don't want don't need to use it |
explain does not show the power of this. See the benchmark we posted |
3099a27 to
fe6f33e
Compare
|
@aaneja Added a reusable You can run it directly: It compares three scenarios (baseline, Latest results: The |
b21ea89 to
c3a35f8
Compare
…ation Add a new iterative optimizer rule PreAggregateBeforeGroupId that inserts a PARTIAL aggregation below GroupIdNode to reduce the number of rows that GroupId multiplies across grouping sets. Transforms: Agg(SINGLE) -> GroupId -> Source into: Agg(INTERMEDIATE) -> GroupId -> Agg(PARTIAL, GROUP BY all_keys) -> Source The PARTIAL aggregation groups by the union of all grouping set columns, which drastically reduces row count before GroupId multiplies them. The INTERMEDIATE aggregation above GroupId merges partial states within each grouping set. Also fixes a bug in HashAggregationOperator where SkipAggregationBuilder could be incorrectly used for INTERMEDIATE aggregations, which expect intermediate-format input rather than raw input. Gated behind session property pre_aggregate_before_grouping_sets (disabled by default).
c3a35f8 to
a039db3
Compare
|
@aaneja We now have a distributedbenchmarkrunner and it clearly shows the power of this optiomziation. Also in general any optimization I implemented is vetted with our prod workloads so we add it disabled and we enable it here based on need/sometimes stats. Hope that helps in you reviews. |
|
@feilong-liu we now even have a benchmark and also this optimization is off by default - so let's merge it asap |
feilong-liu
left a comment
There was a problem hiding this comment.
Approve since this optimization is off by default and also has shown benchmark numbers required in the review.
Thanks @feilong-liu @aaneja if you have any other comments/reviews, we can still iterate and fix them as needed |
Summary
PreAggregateBeforeGroupIdthat inserts a PARTIAL aggregation belowGroupIdNodeto reduce the number of rows that GroupId multiplies across grouping setsHashAggregationOperatorwhereSkipAggregationBuildercould be incorrectly used for INTERMEDIATE aggregationspre_aggregate_before_grouping_sets(disabled by default)Test plan
orderstable