Skip to content

Commit b5bc199

Browse files
cafri.sundongjoon-hyun
authored andcommitted
[SPARK-39328][SQL][TESTS] Fix flaky test SPARK-37753: Inhibit broadcast in left outer join when there are many empty partitions on outer/left side
### What changes were proposed in this pull request? Improve test `SPARK-37753: Inhibit broadcast in left outer join when there are many empty partitions on outer/left side` of `AdaptiveQueryExecSuite` ### Why are the changes needed? This test appears to always succeed in the Apache GitHub Action runner environment, But some environments, test does not seem to proceed as intended. On my environment: `4.18.0-553.8.1.el8_10.x86_64` `Intel(R) Xeon(R) Silver 4210 CPU 2.20GHz` `64G Mem` And ran test in master branch following the guide of official documentation ``` ./build/sbt testOnly org.apache.spark.sql.execution.adaptive.AdaptiveQueryExecSuite ... - SPARK-37753: Inhibit broadcast in left outer join when there are many empty partitions on outer/left side *** FAILED *** The code passed to eventually never returned normally. Attempted 25 times over 15.040156205999999 seconds. Last failure message: ``` even increasing the test's timeout to 1500 seconds results to failure after lots of retries. ``` SPARK-37753: Inhibit broadcast in left outer join when there are many empty partitions on outer/left side *** FAILED *** The code passed to failAfter did not complete within 20 minutes. (AdaptiveQueryExecSuite.scala:743) ``` --- The test says ```scala // if the right side is completed first and the left side is still being executed, // the right side does not know whether there are many empty partitions on the left side, // so there is no demote, and then the right side is broadcast in the planning stage. // so retry several times here to avoid unit test failure. eventually(timeout(15.seconds), interval(500.milliseconds)) { ... ``` It seems test failure occurs with very high probability by loading the ‘right side’ completes first. While the reason is unclear, I believe it would be better to regulate the subquery loading speed in a predictable manner via applying simple udf rather than retrying until both sides load in the desired order. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Rerun the test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52388 from Last-remote11/SPARK-39328. Authored-by: cafri.sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit e7b7acf) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent bcb5b99 commit b5bc199

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.URI
2222

2323
import org.apache.logging.log4j.Level
2424
import org.scalatest.PrivateMethodTester
25-
import org.scalatest.time.SpanSugar._
2625

2726
import org.apache.spark.SparkException
2827
import org.apache.spark.rdd.RDD
@@ -747,16 +746,22 @@ class AdaptiveQueryExecSuite
747746
// if the right side is completed first and the left side is still being executed,
748747
// the right side does not know whether there are many empty partitions on the left side,
749748
// so there is no demote, and then the right side is broadcast in the planning stage.
750-
// so retry several times here to avoid unit test failure.
751-
eventually(timeout(15.seconds), interval(500.milliseconds)) {
749+
// so apply `slow_udf` to delay right side to avoid unit test failure.
750+
withUserDefinedFunction("slow_udf" -> true) {
751+
spark.udf.register("slow_udf", (x: Int) => {
752+
Thread.sleep(300)
753+
x
754+
})
755+
752756
withSQLConf(
753757
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
754758
SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key -> "0.5") {
755759
// `testData` is small enough to be broadcast but has empty partition ratio over the config.
756760
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "200") {
757761
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
758762
"SELECT * FROM (select * from testData where value = '1') td" +
759-
" left outer join testData2 ON key = a")
763+
" left outer join (select slow_udf(a) as a, b from testData2) as td2" +
764+
" ON td.key = td2.a")
760765
val smj = findTopLevelSortMergeJoin(plan)
761766
assert(smj.size == 1)
762767
val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)

0 commit comments

Comments
 (0)