Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "Extended Clickbench" benchmark for median and approx_median for high cardinality aggregates #12438

Merged
merged 7 commits into from
Sep 16, 2024

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Sep 11, 2024

Which issue does this PR close?

Part of #11819

Rationale for this change

As part of reviewing #11827 from @Rachelint I realized there is no benchmark coverage for the code being optimized. Thus we should implement such a benchmark.

What changes are included in this PR?

  1. Add a new benchmark for median and approx_median for high cardinality aggregates in the "extended clickbench" queries

Are these changes tested?

Are there any user-facing changes?

@@ -2,3 +2,4 @@ SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DIST
SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTINCT "BrowserLanguage") FROM hits;
SELECT "BrowserCountry", COUNT(DISTINCT "SocialNetwork"), COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserLanguage"), COUNT(DISTINCT "SocialAction") FROM hits GROUP BY 1 ORDER BY 2 DESC LIMIT 10;
SELECT "SocialSourceNetworkID", "RegionID", COUNT(*), AVG("Age"), AVG("ParamPrice"), STDDEV("ParamPrice") as s, VAR("ParamPrice") FROM hits GROUP BY "SocialSourceNetworkID", "RegionID" HAVING s IS NOT NULL ORDER BY s DESC LIMIT 10;
SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax, "UserID" FROM hits GROUP BY "UserID" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since by default partial aggregation is skipped in case unique groups / input records > 0.8, this grouping cardinality is likely not high enough. Maybe it's worth using "WatchID", "ClientIP" from q32 here, as it for sure benefited from skipping partial aggregation?

Copy link
Contributor

@Rachelint Rachelint Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think q32 may be the typical high cardinality query too, I am modifying the query and checking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran EXPLAIN ANALYZE on this branch and I believe you are correct that the partial aggregation skipping is not happening

Specifically skipped_aggregation_rows=0 in the partial aggregate:

AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=21164213, elapsed_compute=142.856836062s, skipped_aggregation_rows=0]

> EXPLAIN ANALYZE SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax,  "UserID" FROM 'hits.parquet' GROUP BY "UserID" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;

| plan_type         | plan|

| Plan with Metrics | SortPreservingMergeExec: [tp95@2 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=2.666µs|
|                   |   SortExec: TopK(fetch=10), expr=[tp95@2 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=54.827251ms, row_replacements|
|                   |     ProjectionExec: expr=[min(hits.parquet.ResponseStartTiming)@1 as tmin, median(hits.parquet.ResponseStartTiming)@2 as tmed, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp95, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp99, max(hits.parquet.ResponseStartTiming)@4 as tmax, UserID@0 as UserID], metrics=[output_rows=1994511, elapsed_compute=82.127µs|
|                   |       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1994511, elapsed_compute=250.672µs|
|                   |         FilterExec: min(hits.parquet.ResponseStartTiming)@1 > 0 AND median(hits.parquet.ResponseStartTiming)@2 > 0, metrics=[output_rows=1994511, elapsed_compute=37.563416ms|
|                   |           AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=17630976, elapsed_compute=45.058854603s|
|                   |             CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=21164213, elapsed_compute=135.756351ms|
|                   |               RepartitionExec: partitioning=Hash([UserID@0], 16), input_partitions=16, metrics=[send_time=8.034269627s, repartition_time=2.160110804s, fetch_time=144.660128063s|
|                   |                 AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=21164213, elapsed_compute=142.856836062s, skipped_aggregation_rows|
|                   |                   ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits.parquet:3694994112..4618742640], ...]}, projection=[UserID, ResponseStartTiming], metrics=[output_rows=99997497, elapsed_compute=16ns, bytes_scanned=334711589, file_scan_errors=0, row_groups_pruned_bloom_filter=0, row_groups_matched_statistics=0, page_index_rows_filtered=0, row_groups_pruned_statistics=0, pushdown_rows_filtered=0, num_predicate_creation_errors=0, predicate_evaluation_errors=0, file_open_errors=0, row_groups_matched_bloom_filter=0, pushdown_eval_time=32ns, time_elapsed_opening=308.845002ms, time_elapsed_processing=1.314136113s, time_elapsed_scanning_total=82.246142751s, page_index_eval_time=32ns, time_elapsed_scanning_until_data=25.709795ms] |
|                   ||

1 row(s) fetched.
Elapsed 14.296 seconds.

main:

> EXPLAIN ANALYZE SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax,  "UserID" FROM 'hits.parquet' GROUP BY "UserID" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;

| plan_type         | plan|
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [tp95@2 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=6.583µs|
|                   |   SortExec: TopK(fetch=10), expr=[tp95@2 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=44.490069ms, row_replacements|
|                   |     ProjectionExec: expr=[min(hits.parquet.ResponseStartTiming)@1 as tmin, median(hits.parquet.ResponseStartTiming)@2 as tmed, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp95, approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95))@3 as tp99, max(hits.parquet.ResponseStartTiming)@4 as tmax, UserID@0 as UserID], metrics=[output_rows=1994511, elapsed_compute=80.704µs|
|                   |       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1994511, elapsed_compute=266.832µs|
|                   |         FilterExec: min(hits.parquet.ResponseStartTiming)@1 > 0 AND median(hits.parquet.ResponseStartTiming)@2 > 0, metrics=[output_rows=1994511, elapsed_compute=50.816018ms|
|                   |           AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=17630976, elapsed_compute=43.153463868s|
|                   |             CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=21164213, elapsed_compute=86.665877ms|
|                   |               RepartitionExec: partitioning=Hash([UserID@0], 16), input_partitions=16, metrics=[repartition_time=2.097894133s, fetch_time=141.020531259s, send_time=8.982681573s|
|                   |                 AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=21164213, elapsed_compute=139.1215463s|
|                   |                   ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Downloads/hits.parquet:0..923748528], [Users/andrewlamb/Downloads/hits.parquet:923748528..1847497056], [Users/andrewlamb/Downloads/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Downloads/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Downloads/hits.parquet:3694994112..4618742640], ...]}, projection=[UserID, ResponseStartTiming], metrics=[output_rows=99997497, elapsed_compute=16ns, predicate_evaluation_errors=0, file_open_errors=0, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bytes_scanned=334711589, page_index_rows_filtered=0, num_predicate_creation_errors=0, file_scan_errors=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=0, pushdown_rows_filtered=0, time_elapsed_opening=285.717582ms, time_elapsed_scanning_until_data=64.90646ms, time_elapsed_scanning_total=80.855628351s, time_elapsed_processing=1.21575998s, page_index_eval_time=32ns, pushdown_eval_time=32ns] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |


Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I changed it to GROUP BY "UserID","WatchID", "ClientIP" I see the skipped aggregation rows now:

skipped_aggregation_rows=98293561

AggregateExec: mode=Partial, gby=[UserID@2 as UserID, WatchID@0 as WatchID, ClientIP@1 as ClientIP], aggr=[min(hits.parquet.ResponseStartTiming), median(hits.parquet.ResponseStartTiming), approx_percentile_cont(hits.parquet.ResponseStartTiming,Float64(0.95)), max(hits.parquet.ResponseStartTiming)], metrics=[output_rows=99997497, elapsed_compute=246.594546516s, skipped_aggregation_rows=98293561]

> EXPLAIN ANALYZE SELECT MIN("ResponseStartTiming") tmin, MEDIAN("ResponseStartTiming") tmed, approx_percentile_cont("ResponseStartTiming", 0.95) tp95, approx_percentile_cont("ResponseStartTiming", 0.95) tp99, MAX("ResponseStartTiming") tmax,  "UserID", "WatchID", "ClientIP" FROM 'hits.parquet' GROUP BY "UserID","WatchID", "ClientIP" HAVING tmin > 0 AND tmed > 0 ORDER BY tp95 DESC LIMIT 10;

And with that change now the query doesn't finish on my laptop (I killed it after using 100GB of memory) but with #11827 it finishes in 45 seconds.

I will think about how to modify this query to reflect the benefit without making it impossibly to run on low resource machines

Copy link
Contributor

@Rachelint Rachelint Sep 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After creating a q32 like query, seems obvious improvement in my local
(I run it with just a subset of hits_partitioned to avoid situation that the quey can't finish).
#11827 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After creating a q32 like query, seems obvious improvement in my local

Yes, I agree that #11827 provides an improvement. I just posted a comment #11827 (comment)

Copy link
Contributor

@korowa korowa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you @alamb

@alamb alamb merged commit 416ff2d into apache:main Sep 16, 2024
24 checks passed
@alamb
Copy link
Contributor Author

alamb commented Sep 16, 2024

Thank you for the review @korowa

@alamb alamb deleted the alamb/test_mediate_perf branch September 16, 2024 17:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants