Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
.setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
}

// TODO (SPARK-31730): re-enable it
ignore("global sync by barrier() call") {
test("global sync by barrier() call") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand All @@ -57,10 +57,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("share messages with allGather() call") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -78,10 +75,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("throw exception if we attempt to synchronize with different blocking calls") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -100,10 +94,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("successively sync with allGather and barrier") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -129,8 +120,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
assert(times2.max - times2.min <= 1000)
}

// TODO (SPARK-31730): re-enable it
ignore("support multiple barrier() call within a single task") {
test("support multiple barrier() call within a single task") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand Down