diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e2..6419218f47c85 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -509,6 +509,7 @@ private[spark] class ExecutorAllocationManager( private def onExecutorBusy(executorId: String): Unit = synchronized { logDebug(s"Clearing idle timer for $executorId because it is now running a task") removeTimes.remove(executorId) + executorsPendingToRemove.remove(executorId) } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d3ea30f3b0f50..576b3b487abd5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -87,7 +87,7 @@ private[spark] class TaskSchedulerImpl( // Incrementing task IDs val nextTaskId = new AtomicLong(0) - // Number of tasks runing on each executor + // Number of tasks running on each executor private val executorIdToTaskCount = new HashMap[String, Int] // The set of executors we have on each host; this is used to compute hostsAlive, which diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index bd1826b47e953..9dc1d59f5f453 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -437,9 +437,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp val executorsToKill = knownExecutors .filter { id => !executorsPendingToRemove.contains(id) } .filter { id => force || !scheduler.isExecutorBusy(id) } - // for test only - .filter { id => force || - !scheduler.sc.getConf.getBoolean("spark.dynamicAllocation.testing", false)} executorsPendingToRemove ++= executorsToKill // If we do not wish to replace the executors we kill, sync the target number of executors @@ -452,7 +449,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp numPendingExecutors += knownExecutors.size } - (force || !executorsToKill.isEmpty) && doKillExecutors(executorsToKill) + doKillExecutors(executorsToKill) } /** diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index b9ccc36461165..9eafb654a477f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark.deploy +import scala.collection.mutable import scala.concurrent.duration._ import org.mockito.Mockito.{mock, when} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ @@ -29,6 +30,7 @@ import org.apache.spark.deploy.master.ApplicationInfo import org.apache.spark.deploy.master.Master import org.apache.spark.deploy.worker.Worker import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} +import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor @@ -38,7 +40,8 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterE class StandaloneDynamicAllocationSuite extends SparkFunSuite with LocalSparkContext - with BeforeAndAfterAll { + with BeforeAndAfterAll + with PrivateMethodTester { private val numWorkers = 2 private val conf = new SparkConf() @@ -405,7 +408,7 @@ class StandaloneDynamicAllocationSuite } test("disable force kill for busy executors (SPARK-9552)") { - sc = new SparkContext(appConf.set("spark.dynamicAllocation.testing", "true")) + sc = new SparkContext(appConf) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -417,17 +420,21 @@ class StandaloneDynamicAllocationSuite // sync executors between the Master and the driver, needed because // the driver refuses to kill executors it does not know about syncExecutors(sc) - var executors = getExecutorIds(sc) + val executors = getExecutorIds(sc) assert(executors.size === 2) // force kill busy executor - assert(killExecutorWithForce(sc, executors.head)) + assert(killExecutor(sc, executors.head, force = true)) var apps = getApplications() // kill executor successfully assert(apps.head.executors.size === 1) - // try to kill busy executor but it should be failed - assert(killExecutorWithForce(sc, executors.head, false) === false) + // simulate running a task on the executor + val getMap = PrivateMethod[mutable.HashMap[String, Int]]('executorIdToTaskCount) + val taskScheduler = sc.taskScheduler.asInstanceOf[TaskSchedulerImpl] + val executorIdToTaskCount = taskScheduler invokePrivate getMap() + executorIdToTaskCount(executors.head) = 1 + // kill the busy executor without force; this should fail + assert(killExecutor(sc, executors.head, force = false)) apps = getApplications() - // won't kill busy executor assert(apps.head.executors.size === 1) } @@ -482,16 +489,13 @@ class StandaloneDynamicAllocationSuite sc.killExecutors(getExecutorIds(sc).take(n)) } - private def killExecutorWithForce( - sc: SparkContext, - executorId: String, - force: Boolean = true): Boolean = { + /** Kill the given executor, specifying whether to force kill it. */ + private def killExecutor(sc: SparkContext, executorId: String, force: Boolean): Boolean = { + syncExecutors(sc) sc.schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(Seq(executorId), replace = false, force) - case _ => - logWarning("Killing executors is only supported in coarse-grained mode") - false + case _ => fail("expected coarse grained scheduler") } }