-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support input reordering for NestedLoopJoinExec
#9676
Conversation
Regarding benchmarks -- this PR affects q11 and q22 in tpch, but the results differ much for tpch and tpch_mem (tpch_mem statistics estimations differ from ones in tpch over parquet files): tpch_mem
tpch
|
let's run /benchmark |
Benchmark resultsBenchmarks comparing 35ff7a6 (main) and 2da33f4 (PR)
|
--SortExec: TopK(fetch=10), expr=[value@1 DESC] | ||
----ProjectionExec: expr=[ps_partkey@0 as ps_partkey, SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@1 as value] | ||
------NestedLoopJoinExec: join_type=Inner, filter=CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty)@0 AS Decimal128(38, 15)) > SUM(partsupp.ps_supplycost * partsupp.ps_availqty) * Float64(0.0001)@1 | ||
--------CoalescePartitionsExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem to use run less in parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, inner (always left) input must be collected into single partition, and outer (always right) input is executed in parallel. In case reordering is not happening due to lacking / misestimated statistics -- it'll also cause lack of parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case reordering is not happening due to lacking / misestimated statistics -- it'll also cause lack of parallelism.
should_swap_join_order
is based on total_byte_size
. It would become Precision::Absent
after some operator like AggregateExec
and ProjectionExec
. I don't know whether it is a good idea to add a check by num_rows
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It already falls back to num_rows if any of join inputs doesn't provide bytes statistics.
--------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti, on=[(c_custkey@0, o_custkey@0)], projection=[c_phone@1, c_acctbal@2] | ||
----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | ||
------------------NestedLoopJoinExec: join_type=Inner, filter=CAST(c_acctbal@0 AS Decimal128(19, 6)) > AVG(customer.c_acctbal)@1 | ||
--------------------CoalescePartitionsExec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as well. before we had no CoalescePartitionsExec
here so from the plan it looks like this join was using more parallelism before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above -- the reason is that currently result of final aggregation (estimated as single row), cannot be reordered with result of partitioned aggregation (abscent statistics)
Here, QQuery 22 seems to run slower(1.36x slower) for tpch (non-memory) as well. |
🤔 I'll recheck -- I've obtained my results from older version, and maybe statistics output has been affected by some commit since then |
Well, seems that I've screwed up while benchmarking locally as there is no significant runtime diff, and results are consistent with ones produced by GH action. |
/benchmark |
Update: running tpch (parquet) on merged master with Semi/Anti join stats doesn't produce performance regressions for tpch anymore. |
/benchmark |
Benchmark resultsBenchmarks comparing 2dad904 (main) and bdd5905 (PR)
|
What is the current status of this PR? Is it ready to go? |
Join behavior is now consistent with HJ, and it doesn't introduce any performance regressions for tpch. The only issue is incorrect left/right placing in tpch_mem q11 caused by difference in parquet & memory table statistics content -- I don't think it is relevant to this PR, so I suppose this PR to be "ready for review". (Some clarification regarding "I don't think it is relevant to this PR" -- even with inputs misplacing due to incorrect/absent statistics, this PR gives an option to disable optimizer rule and specify join inputs as required -- this option is not available in current NLJ implementation, as build-side is picked based on logical join type) |
Ok, thanks @korowa -- I will try and find time to review it over the next day or so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much @korowa -- this looks like a very nice improvement to the NestedLoopsJoinExec
. I am sorry it took so long to find time to review it (I should have known reviewing this would be straightforward given your past history of writing well documented and reviewed code 🙏 )
I think the unit test (big_col > small_col
vs big_col > big_col
) is worth double checking but otherwise I think this PR is good to go.
Thanks again
@@ -73,7 +79,7 @@ async fn test_full_join_1k() { | |||
} | |||
|
|||
#[tokio::test] | |||
async fn test_semi_join_1k() { | |||
async fn test_semi_join_10k() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a nice drive by cleanup
/// This step is also executed in parallel (once per probe input partition), and to avoid | ||
/// duplicate output of unmatched data (due to shared nature build-side data), each thread | ||
/// "reports" about probe phase completion (which means that "visited" bitmap won't be | ||
/// updated anymore), and only the last thread, reporting about completion, will return output. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the documentation updates 👍
Thanks again @korowa |
Thank you @korowa 🙏 |
* support input reordering for NestedLoopJoinExec * renamed variables and struct fields * fixed nl join filter expression in tests * Update datafusion/physical-plan/src/joins/nested_loop_join.rs Co-authored-by: Andrew Lamb <[email protected]> * typo fixed --------- Co-authored-by: Andrew Lamb <[email protected]>
Which issue does this PR close?
Closes #8393.
Rationale for this change
Making plans, containing
NestedLoopJoinExec
optimizeable byjoin_selection
rule of physical optimizerWhat changes are included in this PR?
NestedLoopJoinExec
covered byjoin_selection
rule of physical optimizerNestedLoopJoinExec
always picks left input as build side (which makes it consistent with e.g HashJoinExec operator), and reuses utility functions for other join implementations.Are these changes tested?
Added tests for physical optimizer + NLJoinExec added to
join_fuzz
testsAre there any user-facing changes?
In case both inputs have proper statistics, physical optimizer now picks build side properly. In addition, now there is an option to disable join_selection rule, and manually specify required join order.