Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore
import scala.concurrent.TimeoutException
import scala.concurrent.duration._

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite,
TestUtils}
import org.apache.spark.internal.config
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils}
Expand All @@ -48,35 +49,38 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext {

test("verify a task with all workers decommissioned succeeds") {
val input = sc.parallelize(1 to 10)
// Do a count to wait for the executors to be registered.
input.count()
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(50)
x
}
// Listen for the job
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
sem.release()
}
})
TestUtils.waitUntilExecutorsUp(sc = sc,
numExecutors = 2,
timeout = 10000) // 10s
val sleepyRdd = input.mapPartitions{ x =>
Thread.sleep(5000) // 5s
x
}
// Start the task.
val asyncCount = sleepyRdd.countAsync()
// Wait for the job to have started
sem.acquire(1)
// Give it time to make it to the worker otherwise we'll block
Thread.sleep(2000) // 2s
// Decommission all the executors, this should not halt the current task.
// decom.sh message passing is tested manually.
val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
val execs = sched.getExecutorIds()
execs.foreach(execId => sched.decommissionExecutor(execId))
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds)
val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
assert(asyncCountResult === 10)
// Try and launch task after decommissioning, this should fail
val postDecommissioned = input.map(x => x)
val postDecomAsyncCount = postDecommissioned.countAsync()
val thrown = intercept[java.util.concurrent.TimeoutException]{
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds)
val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds)
}
assert(postDecomAsyncCount.isCompleted === false,
"After exec decommission new task could not launch")
Expand Down