Skip to content

Improve aggregation cardinality estimation#19595

Open
fgwang7w wants to merge 2 commits intoprestodb:masterfrom
fgwang7w:aggregatefunctionstats
Open

Improve aggregation cardinality estimation#19595
fgwang7w wants to merge 2 commits intoprestodb:masterfrom
fgwang7w:aggregatefunctionstats

Conversation

@fgwang7w
Copy link
Member

@fgwang7w fgwang7w commented May 9, 2023

This PR intents to improve cardinality estimation for aggregation.

  1. For aggregation functions like: min, max, avg, count, sum that are simple scalar functions, it is deterministic to return a single row. So if the source only return a single row, the estimate is guaranteed to a single row output
  2. Define statistics estimation boundary to avoid returning unknown estimates. Use the output row count from the source stats as the estimated output row count as the upper bound limit given Aggregation node cannot return output row count
    more than estimated row count of source node.
== RELEASE NOTES ==

General Changes
*  Improve cardinality estimation for aggregation functions

resolve #19354
co-authored-by: bmckennaah

BEFORE:

presto:tpch> explain with cte as (select min(orderkey) as min from orders) select count(*) from customer c, nation n, cte where c.custkey=cte.min and n.nationkey=c.nationkey;
                                                                                                                                                             Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 - Output[_col0] => [count:bigint]
         _col0 := count (1:70)
     - Aggregate(FINAL) => [count:bigint]
             count := "presto.default.count"((count_20)) (1:70)
         - LocalExchange[SINGLE] () => [count_20:bigint]
             - RemoteStreamingExchange[GATHER] => [count_20:bigint]
                 - Aggregate(PARTIAL) => [count_20:bigint]
                         count_20 := "presto.default.count"(*) (1:70)
                     - InnerJoin[("custkey" = "min")][$hashvalue_26, $hashvalue_27] => []
                             Distribution: REPLICATED
                         - Project[projectLocality = LOCAL] => [custkey:bigint, $hashvalue_26:bigint]
                                 Estimates: {rows: 1500 (26.37kB), cpu: 191025.00, memory: 450.00, network: 40950.00}
                                 $hashvalue_26 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(custkey), BIGINT'0')) (1:85)
                             - InnerJoin[("nationkey" = "nationkey_0")][$hashvalue, $hashvalue_23] => [custkey:bigint]
                                     Estimates: {rows: 1500 (13.18kB), cpu: 164025.00, memory: 450.00, network: 40950.00}
                                     Distribution: PARTITIONED
                                 - RemoteStreamingExchange[REPARTITION][$hashvalue] => [custkey:bigint, nationkey:bigint, $hashvalue:bigint]
                                         Estimates: {rows: 1500 (39.55kB), cpu: 108000.00, memory: 0.00, network: 40500.00}
                                     - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.customer{}]'}, projectLocality = LOCAL] => [custkey:bigint, nationkey:bigint, $hashvalue_22:bigint]
                                             Estimates: {rows: 1500 (39.55kB), cpu: 27000.00, memory: 0.00, network: 0.00}/{rows: 1500 (39.55kB), cpu: 67500.00, memory: 0.00, network: 0.00}
                                             $hashvalue_22 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(nationkey), BIGINT'0')) (1:84)
                                             LAYOUT: tpch.customer{}
                                             custkey := custkey:bigint:0:REGULAR (1:84)
                                             nationkey := nationkey:bigint:3:REGULAR (1:84)
                                 - LocalExchange[HASH][$hashvalue_23] (nationkey_0) => [nationkey_0:bigint, $hashvalue_23:bigint]
                                         Estimates: {rows: 25 (450B), cpu: 1575.00, memory: 0.00, network: 450.00}
                                     - RemoteStreamingExchange[REPARTITION][$hashvalue_24] => [nationkey_0:bigint, $hashvalue_24:bigint]
                                             Estimates: {rows: 25 (450B), cpu: 1125.00, memory: 0.00, network: 450.00}
                                         - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=nation, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.nation{}]'}, projectLocality = LOCAL] => [nationkey_0:bigint, $hashvalue_25:bigint]
                                                 Estimates: {rows: 25 (450B), cpu: 225.00, memory: 0.00, network: 0.00}/{rows: 25 (450B), cpu: 675.00, memory: 0.00, network: 0.00}
                                                 $hashvalue_25 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(nationkey_0), BIGINT'0')) (1:96)
                                                 LAYOUT: tpch.nation{}
                                                 nationkey_0 := nationkey:bigint:0:REGULAR (1:96)
                         - LocalExchange[HASH][$hashvalue_27] (min) => [min:bigint, $hashvalue_27:bigint]
                             - RemoteStreamingExchange[REPLICATE] => [min:bigint, $hashvalue_28:bigint]
                                 - Project[projectLocality = LOCAL] => [min:bigint, $hashvalue_29:bigint]
                                         $hashvalue_29 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(min), BIGINT'0')) (1:29)
                                     - Aggregate(FINAL) => [min:bigint]
                                             min := "presto.default.min"((min_21)) (1:29)
                                         - LocalExchange[SINGLE] () => [min_21:bigint]
                                             - RemoteStreamingExchange[GATHER] => [min_21:bigint]
                                                 - Aggregate(PARTIAL) => [min_21:bigint]
                                                         min_21 := "presto.default.min"((orderkey)) (1:29)
                                                     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{}]'}] => [orderkey:bigint]
                                                             Estimates: {rows: 15000 (131.84kB), cpu: 135000.00, memory: 0.00, network: 0.00}
                                                             LAYOUT: tpch.orders{}
                                                             orderkey := orderkey:bigint:0:REGULAR (1:55)

(1 row)

Query 20230509_033906_00029_xx9gk, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
115ms [0 rows, 0B] [0 rows/s, 0B/s]

AFTER:

presto:tpch> explain with cte as (select min(orderkey) as min from orders) select count(*) from customer c, nation n, cte where c.custkey=cte.min and n.nationkey=c.nationkey;
                                                                                                                                                               Query Plan
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 - Output[_col0] => [count:bigint]
         Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
         _col0 := count (1:70)
     - Aggregate(FINAL) => [count:bigint]
             Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
             count := "presto.default.count"((count_20)) (1:70)
         - LocalExchange[SINGLE] () => [count_20:bigint]
                 Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
             - RemoteStreamingExchange[GATHER] => [count_20:bigint]
                     Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                 - Aggregate(PARTIAL) => [count_20:bigint]
                         Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                         count_20 := "presto.default.count"(*) (1:70)
                     - InnerJoin[("nationkey_0" = "nationkey")][$hashvalue, $hashvalue_23] => []
                             Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                             Distribution: PARTITIONED
                         - RemoteStreamingExchange[REPARTITION][$hashvalue] => [nationkey_0:bigint, $hashvalue:bigint]
                                 Estimates: {rows: 25 (225B), cpu: 1125.00, memory: 0.00, network: 450.00}
                             - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=nation, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.nation{}]'}, projectLocality = LOCAL] => [nationkey_0:bigint, $hashvalue_22:bigint]
                                     Estimates: {rows: 25 (225B), cpu: 225.00, memory: 0.00, network: 0.00}/{rows: 25 (225B), cpu: 675.00, memory: 0.00, network: 0.00}
                                     $hashvalue_22 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(nationkey_0), BIGINT'0')) (1:96)
                                     LAYOUT: tpch.nation{}
                                     nationkey_0 := nationkey:bigint:0:REGULAR (1:96)
                         - LocalExchange[HASH][$hashvalue_23] (nationkey) => [nationkey:bigint, $hashvalue_23:bigint]
                                 Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                             - RemoteStreamingExchange[REPARTITION][$hashvalue_24] => [nationkey:bigint, $hashvalue_24:bigint]
                                     Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                 - Project[projectLocality = LOCAL] => [nationkey:bigint, $hashvalue_29:bigint]
                                         Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                         $hashvalue_29 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(nationkey), BIGINT'0')) (1:84)
                                     - InnerJoin[("custkey" = "min")][$hashvalue_25, $hashvalue_26] => [nationkey:bigint]
                                             Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                             Distribution: REPLICATED
                                         - ScanProject[table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.customer{}]'}, projectLocality = LOCAL] => [custkey:bigint, nationkey:bigint, $hashvalue_25:bigint]
                                                 Estimates: {rows: 1500 (13.18kB), cpu: 27000.00, memory: 0.00, network: 0.00}/{rows: 1500 (13.18kB), cpu: 67500.00, memory: 0.00, network: 0.00}
                                                 $hashvalue_25 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(custkey), BIGINT'0')) (1:85)
                                                 LAYOUT: tpch.customer{}
                                                 custkey := custkey:bigint:0:REGULAR (1:84)
                                                 nationkey := nationkey:bigint:3:REGULAR (1:84)
                                         - LocalExchange[HASH][$hashvalue_26] (min) => [min:bigint, $hashvalue_26:bigint]
                                                 Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                             - RemoteStreamingExchange[REPLICATE] => [min:bigint, $hashvalue_27:bigint]
                                                     Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                                 - Project[projectLocality = LOCAL] => [min:bigint, $hashvalue_28:bigint]
                                                         Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                                         $hashvalue_28 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(min), BIGINT'0')) (1:29)
                                                     - Aggregate(FINAL) => [min:bigint]
                                                             Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                                             min := "presto.default.min"((min_21)) (1:29)
                                                         - LocalExchange[SINGLE] () => [min_21:bigint]
                                                                 Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                                             - RemoteStreamingExchange[GATHER] => [min_21:bigint]
                                                                     Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                                                 - Aggregate(PARTIAL) => [min_21:bigint]
                                                                         Estimates: {rows: 1 (9B), cpu: ?, memory: ?, network: ?}
                                                                         min_21 := "presto.default.min"((orderkey)) (1:29)
                                                                     - TableScan[TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpch, tableName=orders, analyzePartitionValues=Optional.empty}', layout='Optional[tpch.orders{}]'}] => [orderkey:bigint]
                                                                             Estimates: {rows: 15000 (131.84kB), cpu: 135000.00, memory: 0.00, network: 0.00}
                                                                             LAYOUT: tpch.orders{}
                                                                             orderkey := orderkey:bigint:0:REGULAR (1:55)

(1 row)

Query 20230509_032011_00015_8efw7, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
101ms [0 rows, 0B] [0 rows/s, 0B/s]

Query plan improvement on Tpch:
q15:
image

Query plan improvement on TPC-DS:
q4, q11, q14_1, q14_2, q34, q73, q74, q78, q94

@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 534a3ed to 225d43c Compare May 9, 2023 15:33
@fgwang7w fgwang7w requested a review from bmckennaah May 9, 2023 15:55
@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 225d43c to 3a327cd Compare May 12, 2023 19:02
@fgwang7w fgwang7w changed the title Improve aggregation cardinality estimation for simple scalar functions Improve aggregation cardinality estimation May 12, 2023
@fgwang7w fgwang7w added TPC-H Optimizations discovered from the TPC-H benchmark TPC-DS Optimizations discovered from the TPC-DS benchmark labels May 13, 2023
@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 3a327cd to 8dab780 Compare May 15, 2023 15:18
@fgwang7w fgwang7w marked this pull request as ready for review May 15, 2023 15:24
@fgwang7w fgwang7w requested a review from a team as a code owner May 15, 2023 15:24
@fgwang7w fgwang7w requested a review from ajaygeorge May 15, 2023 15:24
@fgwang7w
Copy link
Member Author

This PR enhances the aggregation stats model to provide better estimates when aggregate functions are present. Could you please help review? @bmckennaah @rschlussel @prestodb/committers Thank you!

@fgwang7w fgwang7w requested a review from rschlussel May 15, 2023 15:41
@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch 2 times, most recently from 2009a15 to 2c56be5 Compare May 18, 2023 05:23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be all covered by the groupBy logic. not sure why this extra logic is added here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for reviewing. Initially I was planning to use this logic to make sure all scalar functions that are bounded by single row but you are right, this is an extra logic. I will remove this out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you remove this? e.g. partial aggregation output should probably be estimated to be the same as source stats. But in any case, the node step only shows up in the final plan. At the time when we are making cost based decisions, the only step we see is single

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remove this logic so that it can sort of "propagate" the stats calculated during SINGLE step phase to the double agg phase where plan will show up as having missing estimated stats. In other words, if we keep this logic, the aggregation nodes with PARTIAL step and FINAL step would return NaN stats causing confusing as if the aggregation node has an unknown estimates but it should have given some stats estimation during SINGLE step phase. Do you have any suggestion on how to avoid this? because current cost model derived from stats has this issue where CostCalculatorUsingExchanges cannot derive cost from PARTIAL step which returns unknown cost instead

if (node.getStep() != FINAL && node.getStep() != SINGLE) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Instead could we have only single/final use these estimates and partial/intermediate just propagate the estimates from the source table. I think that will be a better reflection of what to expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. use NEGATIVE_INFINITY and POSITIVE_INFINITY to represent the full range.
  2. aggregations CAN return null on null input, so if the source stats are 100% null and the function is not called on null input, then the nulls fraction here will also be 1.0 (not sure if there are any aggs that return null on mix of null and non-null input, but should be okay to skip this case).
  3. it's not at all guaranteed that the aggregation columns will be distinct (for the stats estimates i guess it's ok as a heuristic, but NaN would be more accurate. It definitely needs a comment) As an example, if you did SELECT count(*) from nation GROUP BY regionkey, every row would have a value of 5.
  4. as a further optimization for min/max/count that are only one row, we can get the min, max or count of the source stats.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1. fixed
#2. if all input are null, isn't the aggregate of all nulls be a single null?
image
#3 yes if agg columns are not all distinct, the estimate can only be heuristic in such case to return a max row count possible so that cost-based decision can avoid having NaN case
image
#4 if there's no global grouping key yes we can optimize further for min/max/count that only returns a single row output

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. agree
  2. Aggregations can return NULL. But for most queries this is not the case and I do not see how we could understand/leverage that in the optimizer.
  3. Returning NaN may be more "accurate" but isn't that what leads to the problem we are trying to solve? Ultimately almost all estimates used in the optimizer are "guesses", e.g. Presto uses 0.9 as the selectivity for some single table predicates. Not having some reasonable estimate for the NDV of an aggregate column leads to very bad plans even for simple cases, e.g. "explain select 1 from t1, t2 where a1=a2 and b1 = (select max(b3) from t3 group by a3)". Rewriting a query to get the desired join order is not an option in the vast majority of environments.
  4. agree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yeah, i think the heuristic is okay, but it would be good to have a small comment noting that it's a heuristic. I feel like all these stats rules are a combo of things that are definitionally true of the operator (e.g ungrouped aggregation only returns a single row) and some heuristics that we expect to be true more often than not. I don't think we've been very consistent about doing this throughout the stats rules, but for me it's helpful to call out when/why we are using a heuristic so it's easier to understand where the stats estimates come from.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. @fgwang7w your example is null grouping keys - I mean the aggregation inputs are null:
presto:di> with x as (SELECT CAST (null as INTEGER) a, 1 as b) SELECT count(a), sum(a), count(*), b from x group by b;
 _col0 | _col1 | _col2 | b 
-------+-------+-------+---
     0 | NULL  |     1 | 1 

I was suggesting that since we have the null fraction for a column and info for each function whether it returns null on null input, we could use that to be more precise for all null input if we wanted (for an additional improvement beyond what's covered in this PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the hints. I refactor the code a little bit, please help review.

Copy link
Contributor

@feilong-liu feilong-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curions, I see in the PR description

Improve cardinality estimation model for aggregation functions like: min, max, avg, count, sum.

But I do not see any logic specific related to what aggregation functions are in aggregation, which confused me. Just to clarify, it's not tied to specific functions, but general for aggregation, is this correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assume the input and output count is the same when no group by key stats available? Just curious if there is any reason/benefit of doing this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing the code. The assumption is that if the agg columns cannot guaranteed uniqueness, the agg estimates takes the source output row count as the upper bound limit to avoid returning NaN which breaks the join reordering.

@ajaygeorge ajaygeorge removed their request for review May 19, 2023 19:03
@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 2c56be5 to 1672d09 Compare May 20, 2023 20:47
@fgwang7w
Copy link
Member Author

@feilong-liu I have updated the PR description to avoid any confusion. please let me know if you have any further concerns. Thanks!

@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch 2 times, most recently from d78bd37 to 7e1396d Compare May 25, 2023 00:38
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if you've finished updating. don't see some of the changes you said you made.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Instead could we have only single/final use these estimates and partial/intermediate just propagate the estimates from the source table. I think that will be a better reflection of what to expect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. yeah, i think the heuristic is okay, but it would be good to have a small comment noting that it's a heuristic. I feel like all these stats rules are a combo of things that are definitionally true of the operator (e.g ungrouped aggregation only returns a single row) and some heuristics that we expect to be true more often than not. I don't think we've been very consistent about doing this throughout the stats rules, but for me it's helpful to call out when/why we are using a heuristic so it's easier to understand where the stats estimates come from.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. @fgwang7w your example is null grouping keys - I mean the aggregation inputs are null:
presto:di> with x as (SELECT CAST (null as INTEGER) a, 1 as b) SELECT count(a), sum(a), count(*), b from x group by b;
 _col0 | _col1 | _col2 | b 
-------+-------+-------+---
     0 | NULL  |     1 | 1 

I was suggesting that since we have the null fraction for a column and info for each function whether it returns null on null input, we could use that to be more precise for all null input if we wanted (for an additional improvement beyond what's covered in this PR)

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if you've finished updating. don't see some of the changes you said you made.

@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 7e1396d to 043c98c Compare May 25, 2023 20:33
@jaystarshot
Copy link
Member

I believe we should introduce a customizable constant, which can be used to scale the projected upper limit, particularly when statistics are not accessible.

@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 043c98c to 66e95c6 Compare May 30, 2023 17:02
@fgwang7w
Copy link
Member Author

Thank you @rschlussel @jaystarshot for giving the review. I have refactored the code a bit more. In addition, I revised the cost calculator to reflect the cpu/memory/io cost derived with the stats estimate. Here's how it would look like now.
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the constant should be multiplied everywhere where the result is updated like here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will factor in for all cases, thanks

@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 66e95c6 to f85a26a Compare May 30, 2023 19:00
bmckennaah
bmckennaah previously approved these changes May 31, 2023
fgwang7w added 2 commits June 2, 2023 08:38
Improve cardinality estimation model for aggregation functions
like: min, max, avg, count, sum. These are simple scalar functions
which are deterministic to return a single row.

In addition, define statistics estimation boundary:
1. If the source only return a single row, it's guaranteed a single row output
2. use the output row count from the source stats as the estimated output row count
as the upper bound limit given Aggregation node cannot return output row count
more than estimated row count of source node.
@fgwang7w fgwang7w force-pushed the aggregatefunctionstats branch from 74f9932 to 92da20d Compare June 2, 2023 15:40
@fgwang7w fgwang7w requested a review from rschlussel June 6, 2023 05:59

// TODO implement simple aggregations like: min, max, count, sum
return VariableStatsEstimate.unknown();
// Double.MIN_VALUE is a constant holding the smallest positive nonzero value of type double.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is no longer relevant

// count as the upper bound limit to avoid returning an unknown output potentially breaks the join search space
double outputRowCountFromSource = getDefaultAggregateSelectivityCoefficient(session) * sourceStats.getOutputRowCount();
if (!isNaN(rowsCount)) {
result.setOutputRowCount(min(rowsCount, outputRowCountFromSource));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to bound by the total size of the source, not the source * the selectivity coefficient.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initial thinking was to take the selectivity factor in across all stats even for the output estimation from source because if the selectivity factor 90%, it'll be a discounted upper bound limit for this aggregate operator, is it the right assumption? If not I can change the code back to ignore taking the selectivity factor into account.

}

double rowsCount = 1;
double rowsCount = getDefaultAggregateSelectivityCoefficient(session);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this handle ungrouped aggregations properly now? looks like we'd get a row count < 1 in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for catching, no this is a bug, will fix

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. make sure this and the other issues here are also covered by tests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes will do. I am running internal test, once all passed, the new commit will be pushed for you to re-review. thank you for your help!

int nullRow = (symbolStatistics.getNullsFraction() == 0.0) ? 0 : 1;
rowsCount *= symbolStatistics.getDistinctValuesCount() + nullRow;
if (!isNaN(symbolStatistics.getAverageRowSize())) {
totalSize += symbolStatistics.getAverageRowSize() * rowsCount;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems incorrect since the rowsCount will change as we go through each column. probably we want to add all the average row sizes per-column and then at the end multiply that by whatever row count we've ended up that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

result.addVariableStatistics(aggregationEntry.getKey(), estimateAggregationStats(aggregationEntry.getValue(), sourceStats));
VariableStatsEstimate aggVariableStatsEstimate = estimateAggregationStats(aggregationEntry.getValue(), sourceStats, rowsCount);
result.addVariableStatistics(aggregationEntry.getKey(), aggVariableStatsEstimate);
totalSize += aggVariableStatsEstimate.getAverageRowSize() * aggVariableStatsEstimate.getDistinctValuesCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused about the totalsize calculation in general here. I think the totalsize should something like the sum of the average size per-column (including both aggregation and group by columns) * the number of rows

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right. so the implementation will be changed to add up all the average per-column size from group-by variables first, and then sum it with the avg size from aggregation columns. Overall sum is multiplied by the number of rows. I add a comment to elaborate the logic.

@jaystarshot
Copy link
Member

@fgwang7w do you need any help in merging/testing this PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

TPC-DS Optimizations discovered from the TPC-DS benchmark TPC-H Optimizations discovered from the TPC-H benchmark

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Set statistics for column produced by aggregate expression in a subquery

5 participants