Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of broadcast hash join #808

Open
Tracked by #717
andygrove opened this issue Aug 11, 2024 · 3 comments
Open
Tracked by #717

Improve performance of broadcast hash join #808

andygrove opened this issue Aug 11, 2024 · 3 comments
Labels
enhancement New feature or request performance

Comments

@andygrove
Copy link
Member

andygrove commented Aug 11, 2024

What is the problem the feature request solves?

Query:

select ss_sold_date_sk, ss_sold_time_sk, ss_quantity, d_year, d_moy, d_dom
from date_dim join store_sales on d_date_sk = ss_sold_date_sk
where d_year = 2000;

Benchmark results:

AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
join_inner                                          495            509          13        582.3           1.7       1.0X
join_inner: Comet (Scan)                            736            750          14        391.4           2.6       0.7X
join_inner: Comet (Scan, Exec)                     1094           1110          22        263.3           3.8       0.5X

Native metrics (for one task).

ProjectionExec: expr=[col_0@4 as col_0, col_1@5 as col_1, col_2@6 as col_2, col_1@1 as col_3, col_2@2 as col_4, col_3@3 as col_5], metrics=[output_rows=582202, elapsed_compute=181.746µs]
  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_0@0)], metrics=[output_rows=582202, build_input_rows=366, output_batches=370, build_input_batches=1, input_rows=2894083, input_batches=370, build_mem_used=15032, build_time=46.427µs, join_time=18.433483ms]
    CopyExec, metrics=[output_rows=366, elapsed_compute=9.938µs]
      ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int32, col_1: Int32, col_2: Int32, col_3: Int32], metrics=[output_rows=366, elapsed_compute=560ns]
    CopyExec, metrics=[output_rows=2894083, elapsed_compute=4.801962ms]
      FilterExec: col_0@0 IS NOT NULL, metrics=[output_rows=2894083, elapsed_compute=23.927183ms]
        ScanExec: source=[CometScan parquet spark_catalog.default.store_sales (unknown)], schema=[col_0: Int32, col_1: Int32, col_2: Int32], metrics=[output_rows=3030375, elapsed_compute=12.800384ms]

Describe the potential solution

No response

Additional context

No response

@andygrove
Copy link
Member Author

The FilterExec in the above example is even more expensive than the HashJoinExec. Evaluating the predicate is cheap but copying data to the filtered batch takes 99% of the time. We could potentially avoid this copy by using a selection vector approach instead.

Time to compute filter mask on batch of 32768 rows is: 581ns
Time to filter batch is: 252.194µs

@andygrove andygrove self-assigned this Aug 12, 2024
@andygrove andygrove changed the title Improve performance of inner hash join Improve performance of broadcast hash join Aug 12, 2024
@andygrove
Copy link
Member Author

andygrove commented Aug 12, 2024

The filter on the probe input is very simple (col_0@0 IS NOT NULL) and it should be possible to push down to the parquet scan?

edit: we do push the filter down to the scan:

      +- CometFilter [ss_sold_date_sk#1545, ss_sold_time_sk#1546, ss_quantity#1555], isnotnull(ss_sold_date_sk#1545)
         +- CometScan parquet ...  PushedFilters: [IsNotNull(ss_sold_date_sk)], ...

The FilterExec does receive batches where ss_sold_date_sk is null though:

predicate: length=8192, true=7843, false=349
predicate: length=8192, true=7832, false=360
predicate: length=8192, true=7846, false=346

@andygrove
Copy link
Member Author

Latest results after merging #835

sf 10

AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
join_inner                                           98            110           8        293.4           3.4       1.0X
join_inner: Comet (Scan)                            125            137          11        231.2           4.3       0.8X
join_inner: Comet (Scan, Exec)                      151            164          12        190.9           5.2       0.7X

sf 100

OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-45-generic
AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
join_inner                                          516            535          23        558.4           1.8       1.0X
join_inner: Comet (Scan)                            747            767          10        385.8           2.6       0.7X
join_inner: Comet (Scan, Exec)                      990           1018          17        291.1           3.4       0.5X

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request performance
Projects
None yet
Development

No branches or pull requests

1 participant