diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 17bc3391cf946..c87c72f4c82ca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { @@ -37,10 +38,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with .setAppName("test-cluster") .set(TEST_NO_STAGE_RETRY, true) sc = new SparkContext(conf) + TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000) } - // TODO (SPARK-31730): re-enable it - ignore("global sync by barrier() call") { + test("global sync by barrier() call") { initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => @@ -57,10 +58,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with } test("share messages with allGather() call") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -78,10 +76,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with } test("throw exception if we attempt to synchronize with different blocking calls") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -103,10 +98,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with } test("successively sync with allGather and barrier") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -132,8 +124,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with assert(times2.max - times2.min <= 1000) } - // TODO (SPARK-31730): re-enable it - ignore("support multiple barrier() call within a single task") { + test("support multiple barrier() call within a single task") { initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => @@ -288,6 +279,9 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with test("SPARK-31485: barrier stage should fail if only partial tasks are launched") { initLocalClusterSparkContext(2) + // It's required to reset the delay timer when a task is scheduled, otherwise all the tasks + // could get scheduled at ANY level. + sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true) val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) val dep = new OneToOneDependency[Int](rdd0) // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for