Skip to content

Commit 2b2dd08

Browse files
rdblueMarcelo Vanzin
authored andcommitted
[SPARK-20540][CORE] Fix unstable executor requests.
There are two problems fixed in this commit. First, the ExecutorAllocationManager sets a timeout to avoid requesting executors too often. However, the timeout is always updated based on its value and a timeout, not the current time. If the call is delayed by locking for more than the ongoing scheduler timeout, the manager will request more executors on every run. This seems to be the main cause of SPARK-20540. The second problem is that the total number of requested executors is not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates the value based on the current status of 3 variables: the number of known executors, the number of executors that have been killed, and the number of pending executors. But, the number of pending executors is never less than 0, even though there may be more known than requested. When executors are killed and not replaced, this can cause the request sent to YARN to be incorrect because there were too many executors due to the scheduler's state being slightly out of date. This is fixed by tracking the currently requested size explicitly. ## How was this patch tested? Existing tests. Author: Ryan Blue <[email protected]> Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation.
1 parent 6fc6cf8 commit 2b2dd08

File tree

3 files changed

+33
-7
lines changed

3 files changed

+33
-7
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ private[spark] class ExecutorAllocationManager(
331331
val delta = addExecutors(maxNeeded)
332332
logDebug(s"Starting timer to add more executors (to " +
333333
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
334-
addTime += sustainedSchedulerBacklogTimeoutS * 1000
334+
addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
335335
delta
336336
} else {
337337
0

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6969
// `CoarseGrainedSchedulerBackend.this`.
7070
private val executorDataMap = new HashMap[String, ExecutorData]
7171

72+
// Number of executors requested by the cluster manager, [[ExecutorAllocationManager]]
73+
@GuardedBy("CoarseGrainedSchedulerBackend.this")
74+
private var requestedTotalExecutors = 0
75+
7276
// Number of executors requested from the cluster manager that have not registered yet
7377
@GuardedBy("CoarseGrainedSchedulerBackend.this")
7478
private var numPendingExecutors = 0
@@ -413,6 +417,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
413417
* */
414418
protected def reset(): Unit = {
415419
val executors = synchronized {
420+
requestedTotalExecutors = 0
416421
numPendingExecutors = 0
417422
executorsPendingToRemove.clear()
418423
Set() ++ executorDataMap.keys
@@ -487,12 +492,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
487492
logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
488493

489494
val response = synchronized {
495+
requestedTotalExecutors += numAdditionalExecutors
490496
numPendingExecutors += numAdditionalExecutors
491497
logDebug(s"Number of pending executors is now $numPendingExecutors")
498+
if (requestedTotalExecutors !=
499+
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
500+
logDebug(
501+
s"""requestExecutors($numAdditionalExecutors): Executor request doesn't match:
502+
|requestedTotalExecutors = $requestedTotalExecutors
503+
|numExistingExecutors = $numExistingExecutors
504+
|numPendingExecutors = $numPendingExecutors
505+
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
506+
}
492507

493508
// Account for executors pending to be added or removed
494-
doRequestTotalExecutors(
495-
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
509+
doRequestTotalExecutors(requestedTotalExecutors)
496510
}
497511

498512
defaultAskTimeout.awaitResult(response)
@@ -524,6 +538,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
524538
}
525539

526540
val response = synchronized {
541+
this.requestedTotalExecutors = numExecutors
527542
this.localityAwareTasks = localityAwareTasks
528543
this.hostToLocalTaskCount = hostToLocalTaskCount
529544

@@ -589,8 +604,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
589604
// take into account executors that are pending to be added or removed.
590605
val adjustTotalExecutors =
591606
if (!replace) {
592-
doRequestTotalExecutors(
593-
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
607+
requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0)
608+
if (requestedTotalExecutors !=
609+
(numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) {
610+
logDebug(
611+
s"""killExecutors($executorIds, $replace, $force): Executor counts do not match:
612+
|requestedTotalExecutors = $requestedTotalExecutors
613+
|numExistingExecutors = $numExistingExecutors
614+
|numPendingExecutors = $numPendingExecutors
615+
|executorsPendingToRemove = ${executorsPendingToRemove.size}""".stripMargin)
616+
}
617+
doRequestTotalExecutors(requestedTotalExecutors)
594618
} else {
595619
numPendingExecutors += knownExecutors.size
596620
Future.successful(true)

core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -356,12 +356,13 @@ class StandaloneDynamicAllocationSuite
356356
test("kill the same executor twice (SPARK-9795)") {
357357
sc = new SparkContext(appConf)
358358
val appId = sc.applicationId
359+
sc.requestExecutors(2)
359360
eventually(timeout(10.seconds), interval(10.millis)) {
360361
val apps = getApplications()
361362
assert(apps.size === 1)
362363
assert(apps.head.id === appId)
363364
assert(apps.head.executors.size === 2)
364-
assert(apps.head.getExecutorLimit === Int.MaxValue)
365+
assert(apps.head.getExecutorLimit === 2)
365366
}
366367
// sync executors between the Master and the driver, needed because
367368
// the driver refuses to kill executors it does not know about
@@ -380,12 +381,13 @@ class StandaloneDynamicAllocationSuite
380381
test("the pending replacement executors should not be lost (SPARK-10515)") {
381382
sc = new SparkContext(appConf)
382383
val appId = sc.applicationId
384+
sc.requestExecutors(2)
383385
eventually(timeout(10.seconds), interval(10.millis)) {
384386
val apps = getApplications()
385387
assert(apps.size === 1)
386388
assert(apps.head.id === appId)
387389
assert(apps.head.executors.size === 2)
388-
assert(apps.head.getExecutorLimit === Int.MaxValue)
390+
assert(apps.head.getExecutorLimit === 2)
389391
}
390392
// sync executors between the Master and the driver, needed because
391393
// the driver refuses to kill executors it does not know about

0 commit comments

Comments
 (0)