Skip to content

Commit

Permalink
[SPARK-46037][SQL] Correctness fix for Shuffled Hash Join build left …
Browse files Browse the repository at this point in the history
…without codegen

### What changes were proposed in this pull request?

This is a re-submitting of #43938 to fix a join correctness bug caused by #41398 . Credits go to mcdull-zhang

### Why are the changes needed?

correctness fix

### Does this PR introduce _any_ user-facing change?

Yes, the query result will be corrected.

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #47905 from cloud-fan/join.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
cloud-fan committed Aug 29, 2024
1 parent ebb1975 commit af5e0a2
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,8 @@ trait HashJoin extends JoinCodegenSupport {
UnsafeProjection.create(streamedBoundKeys)

@transient protected[this] lazy val boundCondition = if (condition.isDefined) {
if (joinType == FullOuter && buildSide == BuildLeft) {
// Put join left side before right side. This is to be consistent with
// `ShuffledHashJoinExec.fullOuterJoin`.
if ((joinType == FullOuter || joinType == LeftOuter) && buildSide == BuildLeft) {
// Put join left side before right side.
Predicate.create(condition.get, buildPlan.output ++ streamedPlan.output).eval _
} else {
Predicate.create(condition.get, streamedPlan.output ++ buildPlan.output).eval _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import org.apache.spark.sql.catalyst.plans.logical.{Join, JoinHint}
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest}
import org.apache.spark.sql.execution.exchange.EnsureRequirements
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestData}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}

class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
class OuterJoinSuite extends SparkPlanTest with SharedSparkSession with SQLTestData {
import testImplicits.toRichColumn
setupTestData()

private val EnsureRequirements = new EnsureRequirements()

Expand Down Expand Up @@ -326,4 +327,21 @@ class OuterJoinSuite extends SparkPlanTest with SharedSparkSession {
(null, null, 7, 7.0)
)
)

testWithWholeStageCodegenOnAndOff(
"SPARK-46037: ShuffledHashJoin build left with left outer join, codegen off") { _ =>
def join(hint: String): DataFrame = {
sql(
s"""
|SELECT /*+ $hint */ *
|FROM testData t1
|LEFT OUTER JOIN
|testData2 t2
|ON key = a AND concat(value, b) = '12'
|""".stripMargin)
}
val df1 = join("SHUFFLE_HASH(t1)")
val df2 = join("SHUFFLE_MERGE(t1)")
checkAnswer(df1, identity, df2.collect().toSeq)
}
}

0 comments on commit af5e0a2

Please sign in to comment.