Skip to content

Commit 49d25ec

Browse files
cloud-fanIvanK-db
authored andcommitted
[SPARK-46037][SQL] Correctness fix for Shuffled Hash Join build left without codegen
### What changes were proposed in this pull request? This is a re-submitting of apache#43938 to fix a join correctness bug caused by apache#41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes apache#47905 from cloud-fan/join. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 853f493 commit 49d25ec

File tree

2 files changed

+22
-5
lines changed

2 files changed

+22
-5
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
138138
UnsafeProjection.create(streamedBoundKeys)
139139

140140
@transient protected[this] lazy val boundCondition = if (condition.isDefined) {
141-
if (joinType == FullOuter && buildSide == BuildLeft) {
142-
// Put join left side before right side. This is to be consistent with
143-
// `ShuffledHashJoinExec.fullOuterJoin`.
141+
if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) {
142+
// Put join left side before right side.
144143
Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _
145144
} else {
146145
Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala

+20-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}
2626
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
2727
import org.apache.spark.sql.execution.exchange.EnsureRequirements
2828
import org.apache.spark.sql.internal.SQLConf
29-
import org.apache.spark.sql.test.SharedSparkSession
29+
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestData}
3030
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
3131

32-
class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
32+
class OuterJoinSuite extends SparkPlanTest with SharedSparkSession with SQLTestData {
3333
import testImplicits.toRichColumn
34+
setupTestData()
3435

3536
private val EnsureRequirements = new EnsureRequirements()
3637

@@ -326,4 +327,21 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
326327
(null, null, 7, 7.0)
327328
)
328329
)
330+
331+
testWithWholeStageCodegenOnAndOff(
332+
"SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off") { _ =>
333+
def join(hint: String): DataFrame = {
334+
sql(
335+
s"""
336+
|SELECT /*+ $hint */ *
337+
|FROM testData t1
338+
|LEFT OUTER JOIN
339+
|testData2 t2
340+
|ON key = a AND concat(value, b) = '12'
341+
|""".stripMargin)
342+
}
343+
val df1 = join("SHUFFLE_HASH(t1)")
344+
val df2 = join("SHUFFLE_MERGE(t1)")
345+
checkAnswer(df1, identity, df2.collect().toSeq)
346+
}
329347
}

0 commit comments

Comments
 (0)