Skip to content

Track result sizes of partial aggregate evaluation in HBO#21160

Merged
arhimondr merged 1 commit intoprestodb:masterfrom
mlyublena:hbo-partial-agg
Nov 15, 2023
Merged

Track result sizes of partial aggregate evaluation in HBO#21160
arhimondr merged 1 commit intoprestodb:masterfrom
mlyublena:hbo-partial-agg

Conversation

@mlyublena
Copy link
Contributor

@mlyublena mlyublena commented Oct 16, 2023

Until now HBO tracked only the final input and output sizes of evaluating the aggregate, but not the result of partial agg evaluation. This may lead to incorrect estimation of data size reduction when the final aggregate reduces data size, but the partial aggregate does not. This changelist adds the ability to track input and output sizes of the partial aggregate to help make a better decision of when to split an aggregate into partial and final.

Example partial agg stats:

      "partialAggregationStatsEstimate" : {
        "inputBytes" : 0.0,
        "outputBytes" : 23976.0,
        "inputRowCount" : 5.6717574E7,
        "outputRowCount" : 2664.0
      }

This PR also adds a new flag use_partial_aggregation_history to enable using the new statistics during optimization.

Description

Most of the non-trivial changes are in the following classes:

  • PartialAggregationStatistics, PlanStatistics, PartialAggregationStatsEstimate, PlanNodeStatsEstimate
    • add partial aggregation statistics/estimate
  • PushPartialAggregationThroughExchange
    • when splitting an aggregate into partial and final, make sure they are assigned the same aggregation id
    • if available, use partial aggregation statistics from the stats estimate instead of the final node input/output bytes
  • HistoryBasedPlanStatisticsTracker.java
    • implement logic for storing partial aggregation statistics inside the final aggregation node

Most of the other changes are due to changing the signature of the AggregationNode constructor to take an optional aggregationId argument.

Motivation and Context

Impact

Test Plan

Unit tests and verification on production queries

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Extends HBO to track statistics of execution partial aggregation nodes
* Introduces a new flag use_partial_aggregation_history, which controls whether or not partial aggregation histories are used in deciding whether to split aggregates into partial and final
* Change PushPartialAggregationThroughExchange optimization to use partial aggregation statistics when available, and when use_partial_aggregation_history=true
* When partial aggregation statistics are used in PushPartialAggregationThroughExchange, the optimization also applies to multi-key aggregations (as opposed to single-key only when CBO is used)


@mlyublena mlyublena marked this pull request as ready for review October 17, 2023 23:12
@mlyublena mlyublena requested review from a team and shrinidhijoshi as code owners October 17, 2023 23:12
@mlyublena mlyublena requested a review from presto-oss October 17, 2023 23:12
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove debug printf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the inputBytes here the same as the inputBytes above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think you are right, the inputs are the same and only the outputs are different (one is the output of partial agg, and the other output of the final agg)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input bytes is available by checking the output of the aggregation input, hence not necessary here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually these are different: in one case it's the input to the final agg (available from the child node), in the other case it's the input to the partial agg (which is a node several levels down the query tree: we'll cache it at the level of the final agg)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the output of the partial aggregation node? Maybe either rename this to partialAggregationOutputBytes or rename this class to PartialAggregationNodeStatsEstimate to make it clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, will rename

Comment on lines 157 to 166
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole block can be moved to the beginning of the for loop?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The for loop can exit early once a match is found?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have two aggregations with the same grouping sets hence match a different one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a better way to find which partial aggregate corresponds to which final one - I found out that there are cases where the grouping columns get renamed in the middle (perhaps by a different transformation) and we no longer can match the aggregation nodes based on grouping columns.
I'm thinking matching them either by planNode.sourceLocation or introducing another variable in aggregation nodes aggregationId to know which 2 nodes came from the same original aggregate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced a new field aggregationId in AggregationNode: it's normally empty, but when an aggregate is split into partial and final, the new nodes get the same id so they can be matched later

@mlyublena mlyublena force-pushed the hbo-partial-agg branch 3 times, most recently from 81aa6ab to 434afbb Compare October 20, 2023 23:59
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

@mlyublena mlyublena requested a review from arhimondr October 27, 2023 20:33
@mlyublena mlyublena force-pushed the hbo-partial-agg branch 4 times, most recently from ad7e1f0 to e031a92 Compare November 1, 2023 20:59
@mlyublena mlyublena requested a review from feilong-liu November 1, 2023 21:02
@mlyublena mlyublena force-pushed the hbo-partial-agg branch 2 times, most recently from b7c15c8 to 5f477c9 Compare November 1, 2023 23:46
@mlyublena mlyublena force-pushed the hbo-partial-agg branch 4 times, most recently from fa85601 to 923a13d Compare November 8, 2023 22:03
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's usually more convenient when config names and session property names are consistent. Also I remember there used to be a rule of thumb to call boolean properties as ...-enabled.

What do you think about history_based_partial_aggregation_optimization_enabled (optimizer.history-based-partial-aggregation-optimization-enabled) (or something along the line to keep it close to how HISTORY_BASED_SCALED_WRITER is called)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense.
The only caveat here is that this optimization was already history-based, but now we use the statistics from the partial aggregation instead of the final aggregation. I added the flag to avoid possible regressions and be able to gradually deploy.
If you have more name suggestions let me know :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Let's keep the name. However It may still be worth to have a consistent session property name (use_partial_aggregation_history) and config property name (optimizer.use-partial-aggregation-history)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: @Test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reformat

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe add a helper method isUnknown to avoid comparing the references (it can be potentially fragile)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's named partialAggregationStatsInfo, my understanding is that aggregationNodeStats stores the information of final aggregation node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cache historical partial stats execution at the level of the final aggregation because the partial agg is not part of the canonical plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aggregationNodeStats is a helper structure where we accumulate information about results of aggregation execution: the statistics comes from the partial agg node but ends up being tracked in the final agg node because the partial agg is not part of the canonical plan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I misread your comment @feilong-liu
you are right, the variable should be named finalAggregationStatsInfo, will fix that in the code

Comment on lines 231 to 233
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planStatisticsFinalAgg comes from partialAggregationStatsInfo, this is confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix the naming

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why !outputVariables.isEmpty() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was an assumption that if there is at least one output row, there must be at least one output byte.
This is not the case with partial aggs that don't project any new columns (for example "select count(*)"): in that case the output bytes were tracked as 0 and we ended up replacing it with NaN which raises an exception later on
I'll add some comments

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlyublena I think this may be the culprit, we are directly using the output from this function to populate the aggregation stats, which can be a NaN.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be equal check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, introduced this in the last refactoring
will fix!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why removing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this check to the function partialAggregationNotUseful
The original PR which introduced cost-based reasoning for partial aggs was conservative and only allowed skipping partial aggs for single-column GROUP BY-s because of possible estimation errors with multi-key aggs:
https://github.com/prestodb/presto/pull/16175/files#r657520172

If we are tracking partial history in HBO, we don't need to be that conservative and can allow more cases

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarification. I guess we will still keep this behaviour for CBO and only extend for HBO right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use partial aggregation stats when it's unknown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was a mistake on my part: the function was actually checking for isNotUnknown: I fixed the naming and the semantics

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why single-key aggregation is special?

Copy link
Contributor Author

@mlyublena mlyublena Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above: the original implementation of this optimizer had this to avoid mis-estimation with multi-key aggregates in CBO. I moved the check inside the function and relaxed it if we know we're using partial HBO stats

Copy link
Contributor

@feilong-liu feilong-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look good. Remember to fill the PR description field and release note etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Let's keep the name. However It may still be worth to have a consistent session property name (use_partial_aggregation_history) and config property name (optimizer.use-partial-aggregation-history)

Until now HBO tracked only the final input and output sizes of evaluating the aggregate, but not the result of partial agg evaluation.
This may lead to incorrect estimation of data size reduction when the final aggregate reduces data size, but the partial aggregate does not.
This changelist adds the ability to track input and output sizes of the partial aggregate to help make a better decision of when to split an aggregate
into partial and final.

This change adds a new field aggregationId to AggregationNode to track split partial/final aggregate pairs, and uses that during history tracking to record partial agg execution details.

The new statistic estimates now contains the following for Final aggregation nodes:

      "partialAggregationStatsEstimate" : {
        "inputBytes" : 0.0,
        "outputBytes" : 23976.0,
        "inputRowCount" : 5.6717574E7,
        "outputRowCount" : 2664.0
      }

This CL also changes the PushPartialAggregationThroughExchange optimizer to use the partial agg statistics when available.

In addition, the following modifications to the PushPartialAggregationThroughExchange optimization were made:

- when using partial aggregation statistics, apply optimization to multi-key aggregates. The original optimization only triggers for single-key aggregate nodes.
- Use rows instead of bytes when use_partial_aggregation_history flag is on
@arhimondr arhimondr merged commit 27c6d9e into prestodb:master Nov 15, 2023
Comment on lines +250 to +253
return new PartialAggregationStatistics(Estimate.of(partialAggregationInputBytes),
Estimate.of(outputBytes),
Estimate.of(childNodeStats.getPlanNodeOutputPositions()),
Estimate.of(outputPositions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlyublena In the following line Estimate.of(nan) throws IllegalArgumentException in Presto-on-Spark. Can you please add the necessary handling, please?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stack trace for reference

java.lang.IllegalArgumentException: value is NaN
	at com.facebook.presto.spi.statistics.Estimate.of(Estimate.java:54)
	at com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker.constructAggregationNodeStatistics(HistoryBasedPlanStatisticsTracker.java:250)
	at com.facebook.presto.cost.HistoryBasedPlanStatisticsTracker.getQueryStats(HistoryBasedPlanStatisticsTracker.java:164)
	at com.facebook.presto.event.QueryMonitor.queryCompletedEvent(QueryMonitor.java:267)
	at com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution.queryCompletedEvent(AbstractPrestoSparkQueryExecution.java:607)
	at com.facebook.presto.spark.execution.AbstractPrestoSparkQueryExecution.execute(AbstractPrestoSparkQueryExecution.java:430)
	at com.facebook.presto.spark.launcher.PrestoSparkRunner.execute(PrestoSparkRunner.java:181)
	at com.facebook.presto.spark.launcher.PrestoSparkRunner.run(PrestoSparkRunner.java:125)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlyublena I think this may be the culprit, we are directly using the output from this function to populate the aggregation stats, which can be a NaN.

PlanNode childNode = planNode.getSources().get(0);
PlanNodeStats childNodeStats = planNodeStatsMap.get(childNode.getId());
if (childNodeStats != null) {
double partialAggregationInputBytes = adjustedOutputBytes(childNode, childNodeStats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlyublena where the adjustedOutputBytes are directly used to populate the PartialAggregationStatistics

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @feilong-liu , that is indeed the fix, I wanted to investigate the case where this happens so I can test that it is fixed. The problem was when doing a GROUP BY on the partition key, the partition key is not materialized in the page sent through partial aggregation, so the byte count is 0 (and then becomes negative because we exclude the hash variables). So the problematic queries were of this pattern:

select ds from lineitem group by ds;

new PR here:
#21502

PlanNodeStats childNodeStats = planNodeStatsMap.get(childNode.getId());
if (childNodeStats != null) {
double partialAggregationInputBytes = adjustedOutputBytes(childNode, childNodeStats);
return new PartialAggregationStatistics(Estimate.of(partialAggregationInputBytes),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isNaN(partialAggregationInputBytes) ? Estimate.unknown() : Estimate.of(partialAggregationInputBytes)

@wanglinsong wanglinsong mentioned this pull request Dec 8, 2023
26 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants