-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join #41398
[SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join #41398
Conversation
…join build right in shuffled hash join
9d44062
to
ea32dff
Compare
Thank you for making a PR, @szehon-ho . |
937f1ee
to
532964b
Compare
Also, cc @viirya , @huaxingao , @sunchao , too. |
dbd8960
to
6089beb
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
@szehon-ho Thanks for the PR! The change looks reasonable to me. I have left a few minor comments. |
cc @maryannxue |
@@ -57,6 +57,8 @@ case class ShuffledHashJoinExec( | |||
|
|||
override def outputOrdering: Seq[SortOrder] = joinType match { | |||
case FullOuter => Nil | |||
case LeftOuter if buildSide == BuildLeft => Nil | |||
case RightOuter if buildSide == BuildRight => Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some comments to explain why the ordering can't be preserved?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added a comment as per my understanding (please let me know if I misunderstand something)
My thought was, because the second iteration on the build-side (for outer-join semantic) is on a hashedRelation, the result cannot be in order.
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
86c86d0
to
505e234
Compare
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
LGTM |
@szehon-ho Could you re-trigger the failed CI pipeline? |
Yea I couldn't reproduce errors, trying again.
|
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
@@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP | |||
assertShuffleHashJoin( | |||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft) | |||
assertShuffleHashJoin( | |||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight) | |||
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this situation, t1 is smaller than t2, so it now picks t1. Before it was not possible to pick t1 and so t2 was picked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I meant that the original test coverage (BuildRight
) is removed and lost.
val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter") | ||
assert(collect(shjDF.queryExecution.executedPlan) { | ||
case _: ShuffledHashJoinExec => true | ||
}.size === 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to verify build side of ShuffledHashJoinExec
is BuildLeft
here? Or hint
is always working?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only two minor comments left, otherwise looks good to me.
@viirya Thanks a lot for taking a look! Since these are minor comments for tests, I will merge this PR first, we will follow up after @szehon-ho comes back from vacation. |
Merged to master. Thanks @szehon-ho and et al. |
cc @c21 too |
Thanks everyone for the warm welcome to Spark, and really fast reviews! As I'm out of town, I will look at any follow up improvements when I'm back. |
…join build right in shuffled hash join ### What changes were proposed in this pull request? Add support for shuffle-hash join for following scenarios: * left outer join with left-side build * right outer join with right-side build The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join. The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations: 1. Iterate Stream side. a. If find match on build side, mark. b. If no match on build side, join with null build-side row and add to result 2. Iterate build side. a. If find marked for match, add joined row to result b. If no match marked, join with null stream-side row The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op. Codegen is left for a follow-up PR. ### Why are the changes needed? For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test in JoinSuite.scala Closes apache#41398 from szehon-ho/same_side_outer_build_join_master. Authored-by: Szehon Ho <[email protected]> Signed-off-by: huaxingao <[email protected]>
### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) The implementation of apache#41398 was only for non-codegen version, and codegen was disabled in this scenario. No New unit test in WholeStageCodegenSuite
…join build right in shuffled hash join ### What changes were proposed in this pull request? Add support for shuffle-hash join for following scenarios: * left outer join with left-side build * right outer join with right-side build The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join. The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations: 1. Iterate Stream side. a. If find match on build side, mark. b. If no match on build side, join with null build-side row and add to result 2. Iterate build side. a. If find marked for match, add joined row to result b. If no match marked, join with null stream-side row The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op. Codegen is left for a follow-up PR. ### Why are the changes needed? For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit test in JoinSuite.scala Closes apache#41398 from szehon-ho/same_side_outer_build_join_master. Authored-by: Szehon Ho <[email protected]> Signed-off-by: huaxingao <[email protected]> (cherry picked from commit 0effbec)
### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) ### Why are the changes needed? The implementation of apache#41398 was only for non-codegen version, and codegen was disabled in this scenario. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test in WholeStageCodegenSuite
### What changes were proposed in this pull request? Codegen of shuffled hash join of build side outer join (ie, left outer join build left or right outer join build right) ### Why are the changes needed? The implementation of #41398 was only for non-codegen version, and codegen was disabled in this scenario. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New unit test in WholeStageCodegenSuite Closes #41614 from szehon-ho/same_side_outer_join_codegen_master. Authored-by: Szehon Ho <[email protected]> Signed-off-by: huaxingao <[email protected]>
…without codegen ### What changes were proposed in this pull request? This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…without codegen This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang correctness fix Yes, the query result will be corrected. new test no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit af5e0a2) Signed-off-by: Wenchen Fan <[email protected]>
…without codegen ### What changes were proposed in this pull request? This is a re-submitting of apache#43938 to fix a join correctness bug caused by apache#41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…without codegen ### What changes were proposed in this pull request? This is a re-submitting of apache#43938 to fix a join correctness bug caused by apache#41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
…without codegen ### What changes were proposed in this pull request? This is a re-submitting of apache#43938 to fix a join correctness bug caused by apache#41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Add support for shuffle-hash join for following scenarios:
The algorithm is similar to SPARK-32399, which supports shuffle-hash join for full outer join.
The same methods fullOuterJoinWithUniqueKey and fullOuterJoinWithNonUniqueKey are improved to support the new case. These methods are called after the HashedRelation is already constructed of the build side, and do these two iterations:
a. If find match on build side, mark.
b. If no match on build side, join with null build-side row and add to result
a. If find marked for match, add joined row to result
b. If no match marked, join with null stream-side row
The left outer join with left-side build, and right outer join with right-side build, need only a subset of these logics, namely replacing 1b above with a no-op.
Codegen is left for a follow-up PR.
Why are the changes needed?
For joins of these types, shuffle-hash join can be more performant than sort-merge join, especially if the big table is large, as it skips an expensive sort of the big table.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test in JoinSuite.scala