From 60c3e9d69697edb8bea00ef75cf9c3f6fab98b79 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 21 May 2020 21:39:03 -0700 Subject: [PATCH 1/2] Attempt to fix the WorkerDecommissionSuite in core --- .../scheduler/WorkerDecommissionSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 8c6f86a6c0e88..c4b86eb2a0d0c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -22,7 +22,8 @@ import java.util.concurrent.Semaphore import scala.concurrent.TimeoutException import scala.concurrent.duration._ -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite, + TestUtils} import org.apache.spark.internal.config import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend import org.apache.spark.util.{RpcUtils, SerializableBuffer, ThreadUtils} @@ -48,12 +49,6 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { test("verify a task with all workers decommissioned succeeds") { val input = sc.parallelize(1 to 10) - // Do a count to wait for the executors to be registered. - input.count() - val sleepyRdd = input.mapPartitions{ x => - Thread.sleep(50) - x - } // Listen for the job val sem = new Semaphore(0) sc.addSparkListener(new SparkListener { @@ -61,6 +56,13 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { sem.release() } }) + TestUtils.waitUntilExecutorsUp(sc = sc, + numExecutors = 2, + timeout = 10000) // 10s + val sleepyRdd = input.mapPartitions{ x => + Thread.sleep(50) + x + } // Start the task. val asyncCount = sleepyRdd.countAsync() // Wait for the job to have started @@ -70,13 +72,13 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() execs.foreach(execId => sched.decommissionExecutor(execId)) - val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 10.seconds) + val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) // Try and launch task after decommissioning, this should fail val postDecommissioned = input.map(x => x) val postDecomAsyncCount = postDecommissioned.countAsync() val thrown = intercept[java.util.concurrent.TimeoutException]{ - val result = ThreadUtils.awaitResult(postDecomAsyncCount, 10.seconds) + val result = ThreadUtils.awaitResult(postDecomAsyncCount, 20.seconds) } assert(postDecomAsyncCount.isCompleted === false, "After exec decommission new task could not launch") From b5e83dea3ed263d15c7d31969cfac2f345575a71 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 22 May 2020 05:27:04 -0700 Subject: [PATCH 2/2] Waiting for the task to start is not enough to gaurantee that we've reached the point in the scheduling, make the task take slightly longer and wait an extra two seconds for scheduling to run it's corse --- .../org/apache/spark/scheduler/WorkerDecommissionSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index c4b86eb2a0d0c..148d20ee659a2 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -60,13 +60,15 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { numExecutors = 2, timeout = 10000) // 10s val sleepyRdd = input.mapPartitions{ x => - Thread.sleep(50) + Thread.sleep(5000) // 5s x } // Start the task. val asyncCount = sleepyRdd.countAsync() // Wait for the job to have started sem.acquire(1) + // Give it time to make it to the worker otherwise we'll block + Thread.sleep(2000) // 2s // Decommission all the executors, this should not halt the current task. // decom.sh message passing is tested manually. val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]