-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Add Exchange before GroupId to improve Partial Aggregation #11741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Exchange before GroupId to improve Partial Aggregation #11741
Conversation
The idea was abandoned during #11267 review.
The rule brings significant improvement in TPC-DS Q22 and Q67 while not causing much regression in other TPC-H, TPC-DS queries. (The only observably regressing queries were still much better than non-CBO baseline.)
kokosing
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM because it already went through the internal review process.
Some nit comments.
| * GroupId (before multiplication) makes partial aggregation more effective, resulting in less data being | ||
| * exchanged afterwards. | ||
| */ | ||
| public class AddExchangesBelowPartialAggregationOverGroupIdRuleSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have some unit tests, but that would require a lot of plumbing.
| return transform(aggregation, groupId, context) | ||
| .map(newAggregation -> { | ||
| PlanNode newExchange = exchange.replaceChildren(ImmutableList.of(newAggregation)); | ||
| return Result.ofPlanNode(newExchange); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline newExchange?
|
|
||
| List<Symbol> desiredHashSymbols = groupingSetHistogram.entrySet().stream() | ||
| // Take only frequently used symbols | ||
| .filter(entry -> entry.getCount() >= groupId.getGroupingSets().size() * GROUPING_SETS_SYMBOL_REQUIRED_FREQUENCY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if all symbols are less frequent, shouldn't we return Optional.empty() here (short circuit)?
|
|
||
| StreamPreferredProperties requiredProperties = fixedParallelism().withPartitioning(desiredHashSymbols); | ||
| StreamProperties sourceProperties = derivePropertiesRecursively(groupId.getSource(), context); | ||
| if (requiredProperties.isSatisfiedBy(sourceProperties)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to extract this as a separate rule that would remove unnecessary partial aggregations
arhimondr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please provide some high level overview for the problem you are trying to solve?
| public static final String LEGACY_ROW_FIELD_ORDINAL_ACCESS = "legacy_row_field_ordinal_access"; | ||
| public static final String ITERATIVE_OPTIMIZER = "iterative_optimizer_enabled"; | ||
| public static final String ITERATIVE_OPTIMIZER_TIMEOUT = "iterative_optimizer_timeout"; | ||
| public static final String ENABLE_FORCED_EXCHANGE_BELOW_GROUP_ID = "enable_forced_exchange_below_group_id"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add_exchange_before_group_id
| private double spillMaxUsedSpaceThreshold = 0.9; | ||
| private boolean iterativeOptimizerEnabled = true; | ||
| private boolean enableStatsCalculator = true; | ||
| private boolean enableForcedExchangeBelowGroupId = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have it disabled by default?
| new PushPartialAggregationThroughJoin(), | ||
| new PushPartialAggregationThroughExchange(metadata.getFunctionRegistry()), | ||
| new PruneJoinColumns()))); | ||
| builder.add(new IterativeOptimizer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rule uses cost model. Cost model loses estimates when there are any partial aggregation in between.
| * GroupId (before multiplication) makes partial aggregation more effective, resulting in less data being | ||
| * exchanged afterwards. | ||
| */ | ||
| public class AddExchangesBelowPartialAggregationOverGroupIdRuleSet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just AddExchangesBeforeGroupId?
The rule brings significant improvement in TPC-DS Q22 and Q67 while not
causing much regression in other TPC-H, TPC-DS queries. (The only
observably regressing queries were still much better than non-CBO
baseline.)