diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0807e653b41a..39c0b99bf1a7 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -319,7 +319,15 @@ private[spark] class ExecutorAllocationManager( removeTimes.retain { case (executorId, expireTime) => val expired = now >= expireTime if (expired) { - initializing = false + if (initializing) { + initializing = false + // When initializing is firstly set to false due to expired executor, we need to + // recalculate numExecutorsTarget. Thus, all expired executors in this round can + // be properly removed in time. Otherwise, these executors will never be removed + // if there are no tasks to be submitted. (see SPARK-26588) + // Note that this will occur when we enable dynamic allocation with spark-shell. + updateAndSyncNumExecutorsTarget(now) + } executorIdsToBeRemoved += executorId } !expired diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 6b310b9cb67a..81ea6a49ba9c 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -936,11 +936,6 @@ class ExecutorAllocationManagerSuite assert(maxNumExecutorsNeeded(manager) === 0) schedule(manager) - // Verify executor is timeout but numExecutorsTarget is not recalculated - assert(numExecutorsTarget(manager) === 3) - - // Schedule again to recalculate the numExecutorsTarget after executor is timeout - schedule(manager) // Verify that current number of executors should be ramp down when executor is timeout assert(numExecutorsTarget(manager) === 2) } @@ -1148,6 +1143,46 @@ class ExecutorAllocationManagerSuite verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false) } + test("SPARK-26588 Idle executor should properly be killed when no job is submitted.") { + val initialExecutors = 4 + val conf = new SparkConf() + .set("spark.dynamicAllocation.enabled", "true") + .set(config.SHUFFLE_SERVICE_ENABLED.key, "true") + .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString) + .set("spark.dynamicAllocation.executorIdleTimeout", "1000ms") + .set("spark.dynamicAllocation.testing", "true") + + val mockAllocationClient = mock(classOf[ExecutorAllocationClient]) + val mockBMM = mock(classOf[BlockManagerMaster]) + val manager = new ExecutorAllocationManager( + mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockBMM) + val clock = new ManualClock() + manager.setClock(clock) + + // Verify the initial number of executors + assert(numExecutorsTarget(manager) === 4) + assert(removeTimes(manager).isEmpty) + + onExecutorAdded(manager, "first") + onExecutorAdded(manager, "second") + onExecutorAdded(manager, "third") + onExecutorAdded(manager, "fourth") + assert(removeTimes(manager).size === 4) + assert(maxNumExecutorsNeeded(manager) == 0) + + // can't remove executor, still initializing + assert(!removeExecutor(manager, "first")) + assert(executorsPendingToRemove(manager) === Set()) + assert(numExecutorsTarget(manager) === 4) + + // idle executors should be removed properly, even if there are no task to be submitted. + clock.advance(3000) + schedule(manager) + assert(removeTimes(manager).isEmpty) + assert(numExecutorsTarget(manager) === 0) + assert(executorsPendingToRemove(manager) === Set("first", "second", "third", "fourth")) + } + private def createSparkContext( minExecutors: Int = 1, maxExecutors: Int = 5,