Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 47 additions & 76 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1106,20 +1106,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}

test("SPARK-32330: Preserve shuffled hash join build side partitioning") {
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(30).select($"id".as("k2"))
Seq("inner", "cross").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count()
.queryExecution.executedPlan
assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2)
})
}
val df1 = spark.range(10).select($"id".as("k1"))
val df2 = spark.range(30).select($"id".as("k2"))
Seq("inner", "cross").foreach(joinType => {
val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType)
.groupBy($"k1").count()
.queryExecution.executedPlan
assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra shuffle before aggregate
assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2)
})
}

test("SPARK-32383: Preserve hash join (BHJ and SHJ) stream side ordering") {
Expand All @@ -1129,40 +1125,30 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
val df4 = spark.range(100).select($"id".as("k4"))

// Test broadcast hash join
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType)
.join(df3, $"k1" === $"k3", joinType)
.join(df4, $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(collect(plan) { case _: SortExec => true }.size === 3)
})
}
Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", joinType)
.join(df3.hint("BROADCAST"), $"k1" === $"k3", joinType)
.join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(collect(plan) { case _: SortExec => true }.size === 3)
})

// Test shuffled hash join
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df3 = spark.range(10).select($"id".as("k3"))

Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2, $"k1" === $"k2", joinType)
.join(df3, $"k1" === $"k3", joinType)
.join(df4, $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(collect(plan) { case _: SortExec => true }.size === 3)
})
}
Seq("inner", "left_outer").foreach(joinType => {
val plan = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", joinType)
.join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3", joinType)
.join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType)
.queryExecution
.executedPlan
assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2)
assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1)
// No extra sort before last sort merge join
assert(collect(plan) { case _: SortExec => true }.size === 3)
})
}

test("SPARK-32290: SingleColumn Null Aware Anti Join Optimize") {
Expand Down Expand Up @@ -1250,24 +1236,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
$"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
)
inputDFs.foreach { case (df1, df2, joinExprs) =>
withSQLConf(
// Set broadcast join threshold and number of shuffle partitions,
// as shuffled hash join depends on these two configs.
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80",
SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
val smjDF = df1.join(df2, joinExprs, "full")
assert(collect(smjDF.queryExecution.executedPlan) {
case _: SortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

withSQLConf(SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val shjDF = df1.join(df2, joinExprs, "full")
assert(collect(shjDF.queryExecution.executedPlan) {
case _: ShuffledHashJoinExec => true }.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
}
val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full")
assert(collect(smjDF.queryExecution.executedPlan) {
case _: SortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

val shjDF = df1.join(df2.hint("SHUFFLE_HASH"), joinExprs, "full")
assert(collect(shjDF.queryExecution.executedPlan) {
case _: ShuffledHashJoinExec => true }.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
}

Expand All @@ -1284,10 +1262,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
)
inputDFs.foreach { case (df1, df2, joinType) =>
// Test broadcast hash join
withSQLConf(
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val bhjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val bhjCodegenDF = df1.join(df2.hint("BROADCAST"), $"k1" === $"k2", joinType)
assert(bhjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : BroadcastHashJoinExec) => true
case WholeStageCodegenExec(ProjectExec(_, _ : BroadcastHashJoinExec)) => true
Expand All @@ -1303,21 +1279,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}

// Test shuffled hash join
withSQLConf(SQLConf.PREFER_SORTMERGEJOIN.key -> "false",
// Set broadcast join threshold and number of shuffle partitions,
// as shuffled hash join depends on these two configs.
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
val shjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val shjCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType)
assert(shjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)

withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
val shjNonCodegenDF = df1.join(df2, $"k1" === $"k2", joinType)
val shjNonCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType)
assert(shjNonCodegenDF.queryExecution.executedPlan.collect {
case _: ShuffledHashJoinExec => true }.size === 1)
checkAnswer(shjNonCodegenDF, Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,25 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
}

test("ShuffledHashJoin should be included in WholeStageCodegen") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30",
SQLConf.SHUFFLE_PARTITIONS.key -> "2",
SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
val df1 = spark.range(5).select($"id".as("k1"))
val df2 = spark.range(15).select($"id".as("k2"))
val df3 = spark.range(6).select($"id".as("k3"))

// test one shuffled hash join
val oneJoinDF = df1.join(df2, $"k1" === $"k2")
assert(oneJoinDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
}.size === 1)
checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4)))

// test two shuffled hash joins
val twoJoinsDF = df1.join(df2, $"k1" === $"k2").join(df3, $"k1" === $"k3")
assert(twoJoinsDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
}.size === 2)
checkAnswer(twoJoinsDF,
Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4)))
}
val df1 = spark.range(5).select($"id".as("k1"))
val df2 = spark.range(15).select($"id".as("k2"))
val df3 = spark.range(6).select($"id".as("k3"))

// test one shuffled hash join
val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
assert(oneJoinDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
}.size === 1)
checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4)))

// test two shuffled hash joins
val twoJoinsDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
.join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3")
assert(twoJoinsDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
}.size === 2)
checkAnswer(twoJoinsDF,
Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4)))
}

test("Sort should be included in WholeStageCodegen") {
Expand Down