diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b93536e6536e..65e8c46dc611 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -327,15 +327,15 @@ private[spark] class ExecutorAllocationManager( * @return the number of additional executors actually requested. */ private def addExecutors(maxNumExecutorsNeeded: Int): Int = { + val numExistingExecutors = executorIds.size - executorsPendingToRemove.size // Do not request more executors if it would put our target over the upper bound - if (numExecutorsTarget >= maxNumExecutors) { + if (numExistingExecutors >= maxNumExecutors) { logDebug(s"Not adding executors because our current target total " + s"is already $numExecutorsTarget (limit $maxNumExecutors)") numExecutorsToAdd = 1 return 0 } - val oldNumExecutorsTarget = numExecutorsTarget // There's no point in wasting time ramping up to the number of executors we already have, so // make sure our target is at least as much as our current allocation: numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size) @@ -346,7 +346,7 @@ private[spark] class ExecutorAllocationManager( // Ensure that our target fits within configured bounds: numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors) - val delta = numExecutorsTarget - oldNumExecutorsTarget + val delta = numExecutorsTarget - numExistingExecutors // If our target has not changed, do not send a message // to the cluster manager and reset our exponential growth @@ -406,6 +406,7 @@ private[spark] class ExecutorAllocationManager( logInfo(s"Removing executor $executorId because it has been idle for " + s"$executorIdleTimeoutS seconds (new desired total will be ${numExistingExecutors - 1})") executorsPendingToRemove.add(executorId) + numExecutorsTarget = math.min(numExecutorsTarget, numExistingExecutors - 1) true } else { logWarning(s"Unable to reach the cluster manager to kill executor $executorId!")