-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat: Add Semi/Anti join to PiecewiseMergeJoin #18392
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
@2010YOUY01 I noticed after coding that the collect buffered batch can just be loaded after when it is needed for the I think I'm going to do this in a follow up instead to keep the current changes be only relevant for the existence join + to reduce noise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@2010YOUY01 @comphead PR is ready for review, i added some review comments that you can read first to help with the whole review process.
Here are how the benchmarks are:
Queries:
Click to expand
r#"
SELECT t1.*
FROM range(30000) AS t1
LEFT SEMI JOIN range(30000) AS t2
ON t1.value < t2.value;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t1
LEFT ANTI JOIN range(30000) AS t2
ON t1.value < t2.value;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t2
RIGHT SEMI JOIN range(30000) AS t1
ON t2.value < t1.value;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t2
RIGHT ANTI JOIN range(30000) AS t1
ON t2.value < t1.value;
"#,
r#"
set datafusion.optimizer.enable_piecewise_merge_join = true;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t1
LEFT SEMI JOIN range(30000) AS t2
ON t1.value < t2.value;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t1
LEFT ANTI JOIN range(30000) AS t2
ON t1.value < t2.value;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t2
RIGHT SEMI JOIN range(30000) AS t1
ON t2.value < t1.value;
"#,
r#"
SELECT t1.*
FROM range(30000) AS t2
RIGHT ANTI JOIN range(30000) AS t1
ON t2.value < t1.value;
"#,
Benchmark results (~100-400 x faster) 🔥
Click to expand
Query 1 iteration 0 returned 29999 rows in 92.105833ms
Query 1 iteration 1 returned 29999 rows in 80.096958ms
Query 1 iteration 2 returned 29999 rows in 77.611ms
Query 1 iteration 3 returned 29999 rows in 80.158458ms
Query 1 iteration 4 returned 29999 rows in 84.724ms
Query 2 iteration 0 returned 1 rows in 80.679084ms
Query 2 iteration 1 returned 1 rows in 92.0405ms
Query 2 iteration 2 returned 1 rows in 115.497709ms
Query 2 iteration 3 returned 1 rows in 143.337208ms
Query 2 iteration 4 returned 1 rows in 79.985583ms
Query 3 iteration 0 returned 29999 rows in 80.447292ms
Query 3 iteration 1 returned 29999 rows in 86.385708ms
Query 3 iteration 2 returned 29999 rows in 84.197166ms
Query 3 iteration 3 returned 29999 rows in 83.120125ms
Query 3 iteration 4 returned 29999 rows in 83.668042ms
Query 4 iteration 0 returned 1 rows in 85.033042ms
Query 4 iteration 1 returned 1 rows in 80.9975ms
Query 4 iteration 2 returned 1 rows in 85.006625ms
Query 4 iteration 3 returned 1 rows in 81.335333ms
Query 4 iteration 4 returned 1 rows in 81.747542ms
using pwmj
Query 5 iteration 0 returned 0 rows in 25.792µs
Query 5 iteration 1 returned 0 rows in 21.792µs
Query 5 iteration 2 returned 0 rows in 20.083µs
Query 5 iteration 3 returned 0 rows in 19.208µs
Query 5 iteration 4 returned 0 rows in 18.708µs
using pwmj
Query 6 iteration 0 returned 29999 rows in 518.542µs
Query 6 iteration 1 returned 29999 rows in 283.834µs
Query 6 iteration 2 returned 29999 rows in 322.459µs
Query 6 iteration 3 returned 29999 rows in 299.458µs
Query 6 iteration 4 returned 29999 rows in 291.084µs
using pwmj
Query 7 iteration 0 returned 1 rows in 291.875µs
Query 7 iteration 1 returned 1 rows in 217.792µs
Query 7 iteration 2 returned 1 rows in 213.459µs
Query 7 iteration 3 returned 1 rows in 297.416µs
Query 7 iteration 4 returned 1 rows in 296.375µs
using pwmj
Query 8 iteration 0 returned 29999 rows in 266.041µs
Query 8 iteration 1 returned 29999 rows in 258.416µs
Query 8 iteration 2 returned 29999 rows in 231.209µs
Query 8 iteration 3 returned 29999 rows in 204.875µs
Query 8 iteration 4 returned 29999 rows in 318.417µs
using pwmj
Query 9 iteration 0 returned 1 rows in 224.042µs
Query 9 iteration 1 returned 1 rows in 221.208µs
Query 9 iteration 2 returned 1 rows in 271.584µs
Query 9 iteration 3 returned 1 rows in 225.375µs
Query 9 iteration 4 returned 1 rows in 270µs
| collect_threshold_byte_size, | ||
| collect_threshold_num_rows, | ||
| )? | ||
| let transformed = if let Some(hash_join) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know why these showed a diff but there isn't anything different here except for the addition of the PiecewiseMergeJoin branch for swapping.
| // PiecewiseMergeJoin wants the left side to be the larger one, so only swap if | ||
| // left < right | ||
| && (!should_swap_join_order(&**left, &**right)? | ||
| || matches!(pwmj.join_type(), JoinType::RightSemi | JoinType::RightAnti)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeftSemi and Left Anti do not swap and only right existence joins do. This is explained above swap_inputs in PiecewiseMergeJoinExec
| use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult}; | ||
|
|
||
| pub(super) enum PiecewiseMergeJoinStreamState { | ||
| pub(super) enum ClassicPWMJStreamState { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to make it more distinct between Existence join streamstate and Classic join stream state.
| } else { | ||
| SortOptions::new(false, true) | ||
| } | ||
| SortOptions::new(true, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was changed because for whatever reason I had thought that swapping happened before join creation 😆
| "PiecewiseMergeJoinExec requires valid sort expressions for its left side" | ||
| ); | ||
| }; | ||
| let Some(right_batch_required_orders) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer needed, only the left side needs to be sorted.
|
|
||
| // TODO | ||
| // Inner, Outer, Left, Right joins are swapped so that left side is larger than right | ||
| // Left Semi/Anti joins are not swapped while Right Semi/Anti joins are always swapped. This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will explain the logic for why only Right semi/anti joins are swapped.
datafusion/physical-plan/src/joins/piecewise_merge_join/exec.rs
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| // `ExistencePWMJStreamState` is separated into `CollectBufferedSide`, `FetchAndProcessStreamBatch`, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read the main logic here, after the min//max value is found for the streamed side, then the buffered side is probed. After it finds the first match it will emit all rows from matched_idx - buffered_batch_length for Semi joins (matches) and 0 - matched_idx for anti joins (non-matches)
You can refer back to the PiecewiseMergeJoinExec docs if need a refresher.
Which issue does this PR close?
PiecewiseMergeJoinwork in Datafusion #17427 .Rationale for this change
Adds Semi/Anti join to piecewisemerge join.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?