Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY

class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
Expand All @@ -37,10 +38,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
.setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
}

// TODO (SPARK-31730): re-enable it
ignore("global sync by barrier() call") {
test("global sync by barrier() call") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand All @@ -57,10 +58,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("share messages with allGather() call") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -78,10 +76,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("throw exception if we attempt to synchronize with different blocking calls") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -103,10 +98,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("successively sync with allGather and barrier") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -132,8 +124,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
assert(times2.max - times2.min <= 1000)
}

// TODO (SPARK-31730): re-enable it
ignore("support multiple barrier() call within a single task") {
test("support multiple barrier() call within a single task") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand Down Expand Up @@ -288,6 +279,9 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
// It's required to reset the delay timer when a task is scheduled, otherwise all the tasks
// could get scheduled at ANY level.
sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
Expand Down