diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 6191e41b4118f..764b4b7587718 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -37,10 +37,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with .setAppName("test-cluster") .set(TEST_NO_STAGE_RETRY, true) sc = new SparkContext(conf) + TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000) } - // TODO (SPARK-31730): re-enable it - ignore("global sync by barrier() call") { + test("global sync by barrier() call") { initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => @@ -57,10 +57,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with } test("share messages with allGather() call") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -78,10 +75,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with } test("throw exception if we attempt to synchronize with different blocking calls") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -100,10 +94,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with } test("successively sync with allGather and barrier") { - val conf = new SparkConf() - .setMaster("local-cluster[4, 1, 1024]") - .setAppName("test-cluster") - sc = new SparkContext(conf) + initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() @@ -129,8 +120,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with assert(times2.max - times2.min <= 1000) } - // TODO (SPARK-31730): re-enable it - ignore("support multiple barrier() call within a single task") { + test("support multiple barrier() call within a single task") { initLocalClusterSparkContext() val rdd = sc.makeRDD(1 to 10, 4) val rdd2 = rdd.barrier().mapPartitions { it =>