Skip to content

Commit a349a08

Browse files
sryzanemccarthy
authored andcommitted
[SPARK-6954] [YARN] ExecutorAllocationManager can end up requesting a negative n...
...umber of executors Author: Sandy Ryza <[email protected]> Closes apache#5704 from sryza/sandy-spark-6954 and squashes the following commits: b7890fb [Sandy Ryza] Avoid ramping up to an existing number of executors 6eb516a [Sandy Ryza] SPARK-6954. ExecutorAllocationManager can end up requesting a negative number of executors
1 parent fe19278 commit a349a08

File tree

2 files changed

+148
-124
lines changed

2 files changed

+148
-124
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 49 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,20 @@ import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
2727
/**
2828
* An agent that dynamically allocates and removes executors based on the workload.
2929
*
30-
* The add policy depends on whether there are backlogged tasks waiting to be scheduled. If
31-
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
32-
* persists for another M seconds, then more executors are added and so on. The number added
33-
* in each round increases exponentially from the previous round until an upper bound on the
34-
* number of executors has been reached. The upper bound is based both on a configured property
35-
* and on the number of tasks pending: the policy will never increase the number of executor
36-
* requests past the number needed to handle all pending tasks.
30+
* The ExecutorAllocationManager maintains a moving target number of executors which is periodically
31+
* synced to the cluster manager. The target starts at a configured initial value and changes with
32+
* the number of pending and running tasks.
33+
*
34+
* Decreasing the target number of executors happens when the current target is more than needed to
35+
* handle the current load. The target number of executors is always truncated to the number of
36+
* executors that could run all current running and pending tasks at once.
37+
*
38+
* Increasing the target number of executors happens in response to backlogged tasks waiting to be
39+
* scheduled. If the scheduler queue is not drained in N seconds, then new executors are added. If
40+
* the queue persists for another M seconds, then more executors are added and so on. The number
41+
* added in each round increases exponentially from the previous round until an upper bound has been
42+
* reached. The upper bound is based both on a configured property and on the current number of
43+
* running and pending tasks, as described above.
3744
*
3845
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
3946
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
@@ -105,8 +112,10 @@ private[spark] class ExecutorAllocationManager(
105112
// Number of executors to add in the next round
106113
private var numExecutorsToAdd = 1
107114

108-
// Number of executors that have been requested but have not registered yet
109-
private var numExecutorsPending = 0
115+
// The desired number of executors at this moment in time. If all our executors were to die, this
116+
// is the number of executors we would immediately want from the cluster manager.
117+
private var numExecutorsTarget =
118+
conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
110119

111120
// Executors that have been requested to be removed but have not been killed yet
112121
private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -199,13 +208,6 @@ private[spark] class ExecutorAllocationManager(
199208
executor.awaitTermination(10, TimeUnit.SECONDS)
200209
}
201210

202-
/**
203-
* The number of executors we would have if the cluster manager were to fulfill all our existing
204-
* requests.
205-
*/
206-
private def targetNumExecutors(): Int =
207-
numExecutorsPending + executorIds.size - executorsPendingToRemove.size
208-
209211
/**
210212
* The maximum number of executors we would need under the current load to satisfy all running
211213
* and pending tasks, rounded up.
@@ -227,7 +229,7 @@ private[spark] class ExecutorAllocationManager(
227229
private def schedule(): Unit = synchronized {
228230
val now = clock.getTimeMillis
229231

230-
addOrCancelExecutorRequests(now)
232+
updateAndSyncNumExecutorsTarget(now)
231233

232234
removeTimes.retain { case (executorId, expireTime) =>
233235
val expired = now >= expireTime
@@ -239,26 +241,28 @@ private[spark] class ExecutorAllocationManager(
239241
}
240242

241243
/**
244+
* Updates our target number of executors and syncs the result with the cluster manager.
245+
*
242246
* Check to see whether our existing allocation and the requests we've made previously exceed our
243-
* current needs. If so, let the cluster manager know so that it can cancel pending requests that
244-
* are unneeded.
247+
* current needs. If so, truncate our target and let the cluster manager know so that it can
248+
* cancel pending requests that are unneeded.
245249
*
246250
* If not, and the add time has expired, see if we can request new executors and refresh the add
247251
* time.
248252
*
249253
* @return the delta in the target number of executors.
250254
*/
251-
private def addOrCancelExecutorRequests(now: Long): Int = synchronized {
252-
val currentTarget = targetNumExecutors
255+
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
253256
val maxNeeded = maxNumExecutorsNeeded
254257

255-
if (maxNeeded < currentTarget) {
258+
if (maxNeeded < numExecutorsTarget) {
256259
// The target number exceeds the number we actually need, so stop adding new
257-
// executors and inform the cluster manager to cancel the extra pending requests.
258-
val newTotalExecutors = math.max(maxNeeded, minNumExecutors)
259-
client.requestTotalExecutors(newTotalExecutors)
260+
// executors and inform the cluster manager to cancel the extra pending requests
261+
val oldNumExecutorsTarget = numExecutorsTarget
262+
numExecutorsTarget = math.max(maxNeeded, minNumExecutors)
263+
client.requestTotalExecutors(numExecutorsTarget)
260264
numExecutorsToAdd = 1
261-
updateNumExecutorsPending(newTotalExecutors)
265+
numExecutorsTarget - oldNumExecutorsTarget
262266
} else if (addTime != NOT_SET && now >= addTime) {
263267
val delta = addExecutors(maxNeeded)
264268
logDebug(s"Starting timer to add more executors (to " +
@@ -281,21 +285,30 @@ private[spark] class ExecutorAllocationManager(
281285
*/
282286
private def addExecutors(maxNumExecutorsNeeded: Int): Int = {
283287
// Do not request more executors if it would put our target over the upper bound
284-
val currentTarget = targetNumExecutors
285-
if (currentTarget >= maxNumExecutors) {
286-
logDebug(s"Not adding executors because there are already ${executorIds.size} " +
287-
s"registered and $numExecutorsPending pending executor(s) (limit $maxNumExecutors)")
288+
if (numExecutorsTarget >= maxNumExecutors) {
289+
val numExecutorsPending = numExecutorsTarget - executorIds.size
290+
logDebug(s"Not adding executors because there are already ${executorIds.size} registered " +
291+
s"and ${numExecutorsPending} pending executor(s) (limit $maxNumExecutors)")
288292
numExecutorsToAdd = 1
289293
return 0
290294
}
291295

292-
val actualMaxNumExecutors = math.min(maxNumExecutors, maxNumExecutorsNeeded)
293-
val newTotalExecutors = math.min(currentTarget + numExecutorsToAdd, actualMaxNumExecutors)
294-
val addRequestAcknowledged = testing || client.requestTotalExecutors(newTotalExecutors)
296+
val oldNumExecutorsTarget = numExecutorsTarget
297+
// There's no point in wasting time ramping up to the number of executors we already have, so
298+
// make sure our target is at least as much as our current allocation:
299+
numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
300+
// Boost our target with the number to add for this round:
301+
numExecutorsTarget += numExecutorsToAdd
302+
// Ensure that our target doesn't exceed what we need at the present moment:
303+
numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)
304+
// Ensure that our target fits within configured bounds:
305+
numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors), minNumExecutors)
306+
307+
val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget)
295308
if (addRequestAcknowledged) {
296-
val delta = updateNumExecutorsPending(newTotalExecutors)
309+
val delta = numExecutorsTarget - oldNumExecutorsTarget
297310
logInfo(s"Requesting $delta new executor(s) because tasks are backlogged" +
298-
s" (new desired total will be $newTotalExecutors)")
311+
s" (new desired total will be $numExecutorsTarget)")
299312
numExecutorsToAdd = if (delta == numExecutorsToAdd) {
300313
numExecutorsToAdd * 2
301314
} else {
@@ -304,23 +317,11 @@ private[spark] class ExecutorAllocationManager(
304317
delta
305318
} else {
306319
logWarning(
307-
s"Unable to reach the cluster manager to request $newTotalExecutors total executors!")
320+
s"Unable to reach the cluster manager to request $numExecutorsTarget total executors!")
308321
0
309322
}
310323
}
311324

312-
/**
313-
* Given the new target number of executors, update the number of pending executor requests,
314-
* and return the delta from the old number of pending requests.
315-
*/
316-
private def updateNumExecutorsPending(newTotalExecutors: Int): Int = {
317-
val newNumExecutorsPending =
318-
newTotalExecutors - executorIds.size + executorsPendingToRemove.size
319-
val delta = newNumExecutorsPending - numExecutorsPending
320-
numExecutorsPending = newNumExecutorsPending
321-
delta
322-
}
323-
324325
/**
325326
* Request the cluster manager to remove the given executor.
326327
* Return whether the request is received.
@@ -372,10 +373,6 @@ private[spark] class ExecutorAllocationManager(
372373
// as idle again so as not to forget that it is a candidate for removal. (see SPARK-4951)
373374
executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
374375
logInfo(s"New executor $executorId has registered (new total is ${executorIds.size})")
375-
if (numExecutorsPending > 0) {
376-
numExecutorsPending -= 1
377-
logDebug(s"Decremented number of pending executors ($numExecutorsPending left)")
378-
}
379376
} else {
380377
logWarning(s"Duplicate executor $executorId has registered")
381378
}

0 commit comments

Comments
 (0)