diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 1bdfdb5ab9c54..2e336b264cd3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1106,20 +1106,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } test("SPARK-32330: Preserve shuffled hash join build side partitioning") { - withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val df1 = spark.range(10).select($"id".as("k1")) - val df2 = spark.range(30).select($"id".as("k2")) - Seq("inner", "cross").foreach(joinType => { - val plan = df1.join(df2, $"k1" === $"k2", joinType).groupBy($"k1").count() - .queryExecution.executedPlan - assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) - // No extra shuffle before aggregate - assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2) - }) - } + val df1 = spark.range(10).select($"id".as("k1")) + val df2 = spark.range(30).select($"id".as("k2")) + Seq("inner", "cross").foreach(joinType => { + val plan = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) + .groupBy($"k1").count() + .queryExecution.executedPlan + assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) + // No extra shuffle before aggregate + assert(collect(plan) { case _: ShuffleExchangeExec => true }.size === 2) + }) } test("SPARK-32383: Preserve hash join (BHJ and SHJ) stream side ordering") { @@ -1129,40 +1125,30 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan val df4 = spark.range(100).select($"id".as("k4")) // Test broadcast hash join - withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") { - Seq("inner", "left_outer").foreach(joinType => { - val plan = df1.join(df2, $"k1" === $"k2", joinType) - .join(df3, $"k1" === $"k3", joinType) - .join(df4, $"k1" === $"k4", joinType) - .queryExecution - .executedPlan - assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2) - assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size === 1) - // No extra sort before last sort merge join - assert(collect(plan) { case _: SortExec => true }.size === 3) - }) - } + Seq("inner", "left_outer").foreach(joinType => { + val plan = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", joinType) + .join(df3.hint("BROADCAST"), $"k1" === $"k3", joinType) + .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType) + .queryExecution + .executedPlan + assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2) + assert(collect(plan) { case _: BroadcastHashJoinExec => true }.size === 1) + // No extra sort before last sort merge join + assert(collect(plan) { case _: SortExec => true }.size === 3) + }) // Test shuffled hash join - withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val df3 = spark.range(10).select($"id".as("k3")) - - Seq("inner", "left_outer").foreach(joinType => { - val plan = df1.join(df2, $"k1" === $"k2", joinType) - .join(df3, $"k1" === $"k3", joinType) - .join(df4, $"k1" === $"k4", joinType) - .queryExecution - .executedPlan - assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2) - assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) - // No extra sort before last sort merge join - assert(collect(plan) { case _: SortExec => true }.size === 3) - }) - } + Seq("inner", "left_outer").foreach(joinType => { + val plan = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2", joinType) + .join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3", joinType) + .join(df4.hint("SHUFFLE_MERGE"), $"k1" === $"k4", joinType) + .queryExecution + .executedPlan + assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 2) + assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) + // No extra sort before last sort merge join + assert(collect(plan) { case _: SortExec => true }.size === 3) + }) } test("SPARK-32290: SingleColumn Null Aware Anti Join Optimize") { @@ -1250,24 +1236,16 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan $"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6") ) inputDFs.foreach { case (df1, df2, joinExprs) => - withSQLConf( - // Set broadcast join threshold and number of shuffle partitions, - // as shuffled hash join depends on these two configs. - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80", - SQLConf.SHUFFLE_PARTITIONS.key -> "2") { - val smjDF = df1.join(df2, joinExprs, "full") - assert(collect(smjDF.queryExecution.executedPlan) { - case _: SortMergeJoinExec => true }.size === 1) - val smjResult = smjDF.collect() - - withSQLConf(SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val shjDF = df1.join(df2, joinExprs, "full") - assert(collect(shjDF.queryExecution.executedPlan) { - case _: ShuffledHashJoinExec => true }.size === 1) - // Same result between shuffled hash join and sort merge join - checkAnswer(shjDF, smjResult) - } - } + val smjDF = df1.join(df2.hint("SHUFFLE_MERGE"), joinExprs, "full") + assert(collect(smjDF.queryExecution.executedPlan) { + case _: SortMergeJoinExec => true }.size === 1) + val smjResult = smjDF.collect() + + val shjDF = df1.join(df2.hint("SHUFFLE_HASH"), joinExprs, "full") + assert(collect(shjDF.queryExecution.executedPlan) { + case _: ShuffledHashJoinExec => true }.size === 1) + // Same result between shuffled hash join and sort merge join + checkAnswer(shjDF, smjResult) } } @@ -1284,10 +1262,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan ) inputDFs.foreach { case (df1, df2, joinType) => // Test broadcast hash join - withSQLConf( - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - val bhjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val bhjCodegenDF = df1.join(df2.hint("BROADCAST"), $"k1" === $"k2", joinType) assert(bhjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : BroadcastHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : BroadcastHashJoinExec)) => true @@ -1303,13 +1279,8 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan } // Test shuffled hash join - withSQLConf(SQLConf.PREFER_SORTMERGEJOIN.key -> "false", - // Set broadcast join threshold and number of shuffle partitions, - // as shuffled hash join depends on these two configs. - SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50", - SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", - SQLConf.SHUFFLE_PARTITIONS.key -> "2") { - val shjCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val shjCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) assert(shjCodegenDF.queryExecution.executedPlan.collect { case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true case WholeStageCodegenExec(ProjectExec(_, _ : ShuffledHashJoinExec)) => true @@ -1317,7 +1288,7 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(shjCodegenDF, Seq.empty) withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") { - val shjNonCodegenDF = df1.join(df2, $"k1" === $"k2", joinType) + val shjNonCodegenDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2", joinType) assert(shjNonCodegenDF.queryExecution.executedPlan.collect { case _: ShuffledHashJoinExec => true }.size === 1) checkAnswer(shjNonCodegenDF, Seq.empty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index eb5643df4c752..71eaed269e6c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -71,28 +71,25 @@ class WholeStageCodegenSuite extends QueryTest with SharedSparkSession } test("ShuffledHashJoin should be included in WholeStageCodegen") { - withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "30", - SQLConf.SHUFFLE_PARTITIONS.key -> "2", - SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { - val df1 = spark.range(5).select($"id".as("k1")) - val df2 = spark.range(15).select($"id".as("k2")) - val df3 = spark.range(6).select($"id".as("k3")) - - // test one shuffled hash join - val oneJoinDF = df1.join(df2, $"k1" === $"k2") - assert(oneJoinDF.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true - }.size === 1) - checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) - - // test two shuffled hash joins - val twoJoinsDF = df1.join(df2, $"k1" === $"k2").join(df3, $"k1" === $"k3") - assert(twoJoinsDF.queryExecution.executedPlan.collect { - case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true - }.size === 2) - checkAnswer(twoJoinsDF, - Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) - } + val df1 = spark.range(5).select($"id".as("k1")) + val df2 = spark.range(15).select($"id".as("k2")) + val df3 = spark.range(6).select($"id".as("k3")) + + // test one shuffled hash join + val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2") + assert(oneJoinDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true + }.size === 1) + checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3), Row(4, 4))) + + // test two shuffled hash joins + val twoJoinsDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2") + .join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3") + assert(twoJoinsDF.queryExecution.executedPlan.collect { + case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true + }.size === 2) + checkAnswer(twoJoinsDF, + Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4, 4))) } test("Sort should be included in WholeStageCodegen") {