From 83efb3d90c86bdbd61af98f6e6e307b36379d836 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 29 Aug 2024 12:27:41 +0800 Subject: [PATCH] [SPARK-46037][SQL] Correctness fix for Shuffled Hash Join build left without codegen ### What changes were proposed in this pull request? This is a re-submitting of https://github.com/apache/spark/pull/43938 to fix a join correctness bug caused by https://github.com/apache/spark/pull/41398 . Credits go to mcdull-zhang ### Why are the changes needed? correctness fix ### Does this PR introduce _any_ user-facing change? Yes, the query result will be corrected. ### How was this patch tested? new test ### Was this patch authored or co-authored using generative AI tooling? no Closes #47905 from cloud-fan/join. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../spark/sql/execution/joins/HashJoin.scala | 5 ++--- .../sql/execution/joins/OuterJoinSuite.scala | 22 +++++++++++++++++-- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 3ae76a1db22b2..5d59a48d544a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport { UnsafeProjection.create(streamedBoundKeys) @transient protected[this] lazy val boundCondition = if (condition.isDefined) { - if (joinType == FullOuter && buildSide == BuildLeft) { - // Put join left side before right side. This is to be consistent with - // `ShuffledHashJoinExec.fullOuterJoin`. + if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) { + // Put join left side before right side. Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _ } else { Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index e4ea88067c7c2..7ba93ee13e182 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint} import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest} import org.apache.spark.sql.execution.exchange.EnsureRequirements import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.test.{SharedSparkSession, SQLTestData} import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType} -class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { +class OuterJoinSuite extends SparkPlanTest with SharedSparkSession with SQLTestData { import testImplicits.toRichColumn + setupTestData() private val EnsureRequirements = new EnsureRequirements() @@ -326,4 +327,21 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession { (null, null, 7, 7.0) ) ) + + testWithWholeStageCodegenOnAndOff( + "SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off") { _ => + def join(hint: String): DataFrame = { + sql( + s""" + |SELECT /*+ $hint */ * + |FROM testData t1 + |LEFT OUTER JOIN + |testData2 t2 + |ON key = a AND concat(value, b) = '12' + |""".stripMargin) + } + val df1 = join("SHUFFLE_HASH(t1)") + val df2 = join("SHUFFLE_MERGE(t1)") + checkAnswer(df1, identity, df2.collect().toSeq) + } }