-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46037][SQL] When Left Join build Left and codegen is turned off, ShuffledHashJoinExec may result in incorrect results #43938
Closed
Closed
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1755,4 +1755,27 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan | |
cached.unpersist() | ||
} | ||
} | ||
|
||
test("SPARK-46037: When Left Join build Left, ShuffledHashJoinExec may " + | ||
JoshRosen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"result in incorrect results") { | ||
withSQLConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN.key -> "false") { | ||
val df1 = sql( | ||
""" | ||
|SELECT /*+ SHUFFLE_HASH(t1) */ * | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the golden file test |
||
|FROM testData t1 | ||
|LEFT OUTER JOIN | ||
|testData2 t2 | ||
|ON key = a AND concat(value, b) = '12' | ||
|""".stripMargin) | ||
val df2 = sql( | ||
""" | ||
|SELECT /*+ SHUFFLE_MERGE(t1) */ * | ||
|FROM testData t1 | ||
|LEFT OUTER JOIN | ||
|testData2 t2 | ||
|ON key = a AND concat(value, b) = '12' | ||
|""".stripMargin) | ||
checkAnswer(df1, df2.collect()) | ||
} | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether this whole
boundCondition
block can be simplified tosince the left side is always on the left and the right side is always on the right.
(Edit: the proposed simplification in ⬆️ is not correct (the bound condition input schema is not necessarily the join's output schema); see later comments for discussion of an alternative simplification).
Further down in this file, we have
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
Lines 104 to 107 in aa81f42
showing that
buildPlan
andstreamPlan
are just re-mappings ofleft
andright
depending on the build side.Several years ago, it looks like we used to do something similar to my proposal:
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
Lines 119 to 123 in 0340b3d
but we switched to the current use of
buildPlan
andstreamPlan
in a refactoring in #12102 (I'm not fully clear on why).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, It's fragile to keep 2 pieces of code in sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I guess it's wrong for inner join + build left?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this
I think stream side left and build side right is the common case, but we have some special cases for outer joins.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I don't know why outer join needs to be a special case. Can't we always put streaming side left and build side right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashJoin.scala
ShuffledJoin.scala
@cloud-fan Output determines that we cannot simply put streaming side left and build side right.
Am I understanding correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea that's why we have a
createResultProjection
to reorder the columns.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I think I understand now:
In all of the
*Join
methods in this file (which doesn't include full outer join), the streamed side is always the left side of thejoinRow
that is passed toboundCondition
: inHashJoin.scala
, the input toboundCondition
is not necessarily the same as the output of the join operator itself (for that, we haveresultProj
which comes fromcreateResultProjection
which remains in sync withboundCondition
(source).However,
boundCondition
is inherited inShuffledHashJoinExec
and there we have abuildSideOrFullOuterJoin
method (source) and there theresultProjection
operates over the same input schema as the join output schema (rather than assuming that the joined row always has the streamed side on the left). In that code,boundCondition
is evaluated over an input that matches the join output schema and there the streamed side could be on either side rather than only the left.I think this inheritance is confusing and hard to reason about.
It seems like
HashJoin.scala
has an invariant of "streamed side always on left" which gets violated inShuffleHashJoin.scala
's separate implementation of outer joins.I wonder whether we can address this bug by modifying
ShuffledHashJoinExec.buildSideOrFullOuterJoin
so that it always unconditionally places the streamed side on the left (the same asHashJoin.scala
's default). (Edit: I'm basically agreeing with @cloud-fan's suggestion above).