@@ -201,18 +201,34 @@ private[spark] class ExecutorAllocationManager(
201201 }
202202
203203 /**
204- * If the add time has expired, request new executors and refresh the add time.
205- * If the remove time for an existing executor has expired, kill the executor.
204+ * The number of executors we would have if the cluster manager were to fulfill all our existing
205+ * requests.
206+ */
207+ private def targetNumExecutors (): Int =
208+ numExecutorsPending + executorIds.size - executorsPendingToRemove.size
209+
210+ /**
211+ * The maximum number of executors we would need under the current load to satisfy all running
212+ * and pending tasks, rounded up.
213+ */
214+ private def maxNumExecutorsNeeded (): Int = {
215+ val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
216+ (numRunningOrPendingTasks + tasksPerExecutor - 1 ) / tasksPerExecutor
217+ }
218+
219+ /**
220+ * This is called at a fixed interval to regulate the number of pending executor requests
221+ * and number of executors running.
222+ *
223+ * First, adjust our requested executors based on the add time and our current needs.
224+ * Then, if the remove time for an existing executor has expired, kill the executor.
225+ *
206226 * This is factored out into its own method for testing.
207227 */
208228 private def schedule (): Unit = synchronized {
209229 val now = clock.getTimeMillis
210- if (addTime != NOT_SET && now >= addTime) {
211- addExecutors()
212- logDebug(s " Starting timer to add more executors (to " +
213- s " expire in $sustainedSchedulerBacklogTimeout seconds) " )
214- addTime += sustainedSchedulerBacklogTimeout * 1000
215- }
230+
231+ addOrCancelExecutorRequests(now)
216232
217233 removeTimes.retain { case (executorId, expireTime) =>
218234 val expired = now >= expireTime
@@ -223,59 +239,89 @@ private[spark] class ExecutorAllocationManager(
223239 }
224240 }
225241
242+ /**
243+ * Check to see whether our existing allocation and the requests we've made previously exceed our
244+ * current needs. If so, let the cluster manager know so that it can cancel pending requests that
245+ * are unneeded.
246+ *
247+ * If not, and the add time has expired, see if we can request new executors and refresh the add
248+ * time.
249+ *
250+ * @return the delta in the target number of executors.
251+ */
252+ private def addOrCancelExecutorRequests (now : Long ): Int = synchronized {
253+ val currentTarget = targetNumExecutors
254+ val maxNeeded = maxNumExecutorsNeeded
255+
256+ if (maxNeeded < currentTarget) {
257+ // The target number exceeds the number we actually need, so stop adding new
258+ // executors and inform the cluster manager to cancel the extra pending requests.
259+ val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
260+ client.requestTotalExecutors(newTotalExecutors)
261+ numExecutorsToAdd = 1
262+ updateNumExecutorsPending(newTotalExecutors)
263+ } else if (addTime != NOT_SET && now >= addTime) {
264+ val delta = addExecutors(maxNeeded)
265+ logDebug(s " Starting timer to add more executors (to " +
266+ s " expire in $sustainedSchedulerBacklogTimeout seconds) " )
267+ addTime += sustainedSchedulerBacklogTimeout * 1000
268+ delta
269+ } else {
270+ 0
271+ }
272+ }
273+
226274 /**
227275 * Request a number of executors from the cluster manager.
228276 * If the cap on the number of executors is reached, give up and reset the
229277 * number of executors to add next round instead of continuing to double it.
230- * Return the number actually requested.
278+ *
279+ * @param maxNumExecutorsNeeded the maximum number of executors all currently running or pending
280+ * tasks could fill
281+ * @return the number of additional executors actually requested.
231282 */
232- private def addExecutors (): Int = synchronized {
233- // Do not request more executors if we have already reached the upper bound
234- val numExistingExecutors = executorIds.size + numExecutorsPending
235- if (numExistingExecutors >= maxNumExecutors) {
283+ private def addExecutors (maxNumExecutorsNeeded : Int ): Int = {
284+ // Do not request more executors if it would put our target over the upper bound
285+ val currentTarget = targetNumExecutors
286+ if (currentTarget >= maxNumExecutors) {
236287 logDebug(s " Not adding executors because there are already ${executorIds.size} " +
237288 s " registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors) " )
238289 numExecutorsToAdd = 1
239290 return 0
240291 }
241292
242- // The number of executors needed to satisfy all pending tasks is the number of tasks pending
243- // divided by the number of tasks each executor can fit, rounded up.
244- val maxNumExecutorsPending =
245- (listener.totalPendingTasks() + tasksPerExecutor - 1 ) / tasksPerExecutor
246- if (numExecutorsPending >= maxNumExecutorsPending) {
247- logDebug(s " Not adding executors because there are already $numExecutorsPending " +
248- s " pending and pending tasks could only fill $maxNumExecutorsPending" )
249- numExecutorsToAdd = 1
250- return 0
251- }
252-
253- // It's never useful to request more executors than could satisfy all the pending tasks, so
254- // cap request at that amount.
255- // Also cap request with respect to the configured upper bound.
256- val maxNumExecutorsToAdd = math.min(
257- maxNumExecutorsPending - numExecutorsPending,
258- maxNumExecutors - numExistingExecutors)
259- assert(maxNumExecutorsToAdd > 0 )
260-
261- val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)
262-
263- val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
264- val addRequestAcknowledged = testing || client.requestExecutors(actualNumExecutorsToAdd)
293+ val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
294+ val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
295+ val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
265296 if (addRequestAcknowledged) {
266- logInfo(s " Requesting $actualNumExecutorsToAdd new executor(s) because " +
267- s " tasks are backlogged (new desired total will be $newTotalExecutors) " )
268- numExecutorsToAdd =
269- if (actualNumExecutorsToAdd == numExecutorsToAdd) numExecutorsToAdd * 2 else 1
270- numExecutorsPending += actualNumExecutorsToAdd
271- actualNumExecutorsToAdd
297+ val delta = updateNumExecutorsPending(newTotalExecutors)
298+ logInfo(s " Requesting $delta new executor(s) because tasks are backlogged " +
299+ s " (new desired total will be $newTotalExecutors) " )
300+ numExecutorsToAdd = if (delta == numExecutorsToAdd) {
301+ numExecutorsToAdd * 2
302+ } else {
303+ 1
304+ }
305+ delta
272306 } else {
273- logWarning(s " Unable to reach the cluster manager " +
274- s " to request $actualNumExecutorsToAdd executors! " )
307+ logWarning(
308+ s " Unable to reach the cluster manager to request $newTotalExecutors total executors!" )
275309 0
276310 }
277311 }
278312
313+ /**
314+ * Given the new target number of executors, update the number of pending executor requests,
315+ * and return the delta from the old number of pending requests.
316+ */
317+ private def updateNumExecutorsPending (newTotalExecutors : Int ): Int = {
318+ val newNumExecutorsPending =
319+ newTotalExecutors - executorIds.size + executorsPendingToRemove.size
320+ val delta = newNumExecutorsPending - numExecutorsPending
321+ numExecutorsPending = newNumExecutorsPending
322+ delta
323+ }
324+
279325 /**
280326 * Request the cluster manager to remove the given executor.
281327 * Return whether the request is received.
@@ -415,6 +461,8 @@ private[spark] class ExecutorAllocationManager(
415461 private val stageIdToNumTasks = new mutable.HashMap [Int , Int ]
416462 private val stageIdToTaskIndices = new mutable.HashMap [Int , mutable.HashSet [Int ]]
417463 private val executorIdToTaskIds = new mutable.HashMap [String , mutable.HashSet [Long ]]
464+ // Number of tasks currently running on the cluster. Should be 0 when no stages are active.
465+ private var numRunningTasks : Int = _
418466
419467 override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ): Unit = {
420468 val stageId = stageSubmitted.stageInfo.stageId
@@ -435,6 +483,10 @@ private[spark] class ExecutorAllocationManager(
435483 // This is needed in case the stage is aborted for any reason
436484 if (stageIdToNumTasks.isEmpty) {
437485 allocationManager.onSchedulerQueueEmpty()
486+ if (numRunningTasks != 0 ) {
487+ logWarning(" No stages are running, but numRunningTasks != 0" )
488+ numRunningTasks = 0
489+ }
438490 }
439491 }
440492 }
@@ -446,6 +498,7 @@ private[spark] class ExecutorAllocationManager(
446498 val executorId = taskStart.taskInfo.executorId
447499
448500 allocationManager.synchronized {
501+ numRunningTasks += 1
449502 // This guards against the race condition in which the `SparkListenerTaskStart`
450503 // event is posted before the `SparkListenerBlockManagerAdded` event, which is
451504 // possible because these events are posted in different threads. (see SPARK-4951)
@@ -475,7 +528,8 @@ private[spark] class ExecutorAllocationManager(
475528 val executorId = taskEnd.taskInfo.executorId
476529 val taskId = taskEnd.taskInfo.taskId
477530 allocationManager.synchronized {
478- // If the executor is no longer running scheduled any tasks, mark it as idle
531+ numRunningTasks -= 1
532+ // If the executor is no longer running any scheduled tasks, mark it as idle
479533 if (executorIdToTaskIds.contains(executorId)) {
480534 executorIdToTaskIds(executorId) -= taskId
481535 if (executorIdToTaskIds(executorId).isEmpty) {
@@ -514,6 +568,11 @@ private[spark] class ExecutorAllocationManager(
514568 }.sum
515569 }
516570
571+ /**
572+ * The number of tasks currently running across all stages.
573+ */
574+ def totalRunningTasks (): Int = numRunningTasks
575+
517576 /**
518577 * Return true if an executor is not currently running a task, and false otherwise.
519578 *
0 commit comments