Skip to content

Commit 973d87e

Browse files
committed
add another skew test case
1 parent 42a52a1 commit 973d87e

File tree

1 file changed

+38
-1
lines changed

1 file changed

+38
-1
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -733,12 +733,13 @@ class AdaptiveQueryExecSuite
733733
test("SPARK-32201: handle general skew join pattern") {
734734
withSQLConf(
735735
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
736-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
736+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1199",
737737
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
738738
SQLConf.SHUFFLE_PARTITIONS.key -> "100",
739739
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "800",
740740
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "800") {
741741

742+
// CASE 1:
742743
// SMJ
743744
// Sort
744745
// CustomShuffleReader(coalesced)
@@ -765,6 +766,7 @@ class AdaptiveQueryExecSuite
765766
.otherwise('id).as("key1"),
766767
'id as "value1")
767768
.createOrReplaceTempView("skewData1")
769+
768770
spark
769771
.range(0, 1000, 1, 10)
770772
.select(
@@ -788,6 +790,41 @@ class AdaptiveQueryExecSuite
788790
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(sqlText)
789791
val innerSmj = findTopLevelSortMergeJoin(adaptivePlan)
790792
checkSkewJoin(innerSmj, 2, 0)
793+
794+
// CASE 2:
795+
// SMJ
796+
// Sort
797+
// SMJ
798+
// CustomShuffleReader(coalesced)
799+
// Shuffle
800+
// Sort
801+
// CustomShuffleReader(coalesced)
802+
// Shuffle
803+
// -->
804+
// SMJ
805+
// Sort
806+
// BroadcastHashJoin <-- SMJ change to BCJ
807+
// CustomShuffleReader(coalesced and skew)
808+
// Shuffle
809+
// Sort
810+
// CustomShuffleReader(coalesced)
811+
// Shuffle
812+
val sqlText2 =
813+
"""
814+
|SELECT * FROM
815+
| (
816+
| SELECT t1.*
817+
| FROM skewData1 t1 LEFT JOIN testData t2
818+
| ON t1.value1 = t2.key
819+
| AND t2.value = '2' || t2.value = '1'
820+
| ) AS data1
821+
| LEFT JOIN
822+
| skewData2 AS data2
823+
|ON data1.key1 = data2.key2 LIMIT 10
824+
|""".stripMargin
825+
val (_, adaptivePlan2) = runAdaptiveAndVerifyResult(sqlText2)
826+
val innerSmj2 = findTopLevelSortMergeJoin(adaptivePlan2)
827+
checkSkewJoin(innerSmj2, 2, 0)
791828
}
792829
}
793830
}

0 commit comments

Comments
 (0)