-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23365][CORE] Do not adjust num executors when killing idle executors. #20604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient { | |
| /** | ||
| * Request that the cluster manager kill the specified executors. | ||
| * | ||
| * When asking the executor to be replaced, the executor loss is considered a failure, and | ||
| * killed tasks that are running on the executor will count towards the failure limits. If no | ||
| * replacement is being requested, then the tasks will not count towards the limit. | ||
| * | ||
| * @param executorIds identifiers of executors to kill | ||
| * @param replace whether to replace the killed executors with new ones, default false | ||
| * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down | ||
| * after these executors have been killed | ||
| * @param countFailures if there are tasks running on the executors when they are killed, whether | ||
| * those failures be counted to task failure limits? | ||
|
||
| * @param force whether to force kill busy executors, default false | ||
| * @return the ids of the executors acknowledged by the cluster manager to be removed. | ||
| */ | ||
| def killExecutors( | ||
| executorIds: Seq[String], | ||
| replace: Boolean = false, | ||
| adjustTargetNumExecutors: Boolean, | ||
| countFailures: Boolean, | ||
| force: Boolean = false): Seq[String] | ||
|
|
||
| /** | ||
|
|
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient { | |
| * @return whether the request is acknowledged by the cluster manager. | ||
| */ | ||
| def killExecutor(executorId: String): Boolean = { | ||
| val killedExecutors = killExecutors(Seq(executorId)) | ||
| val killedExecutors = killExecutors(Seq(executorId), adjustTargetNumExecutors = true, | ||
| countFailures = false) | ||
| killedExecutors.nonEmpty && killedExecutors(0).equals(executorId) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging | |
| import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS} | ||
| import org.apache.spark.metrics.source.Source | ||
| import org.apache.spark.scheduler._ | ||
| import org.apache.spark.storage.BlockManagerMaster | ||
| import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} | ||
|
|
||
| /** | ||
|
|
@@ -81,7 +82,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} | |
| private[spark] class ExecutorAllocationManager( | ||
| client: ExecutorAllocationClient, | ||
| listenerBus: LiveListenerBus, | ||
| conf: SparkConf) | ||
| conf: SparkConf, | ||
| blockManagerMaster: BlockManagerMaster) | ||
| extends Logging { | ||
|
|
||
| allocationManager => | ||
|
|
@@ -151,7 +153,7 @@ private[spark] class ExecutorAllocationManager( | |
| private var clock: Clock = new SystemClock() | ||
|
|
||
| // Listener for Spark events that impact the allocation policy | ||
| private val listener = new ExecutorAllocationListener | ||
| val listener = new ExecutorAllocationListener | ||
|
|
||
| // Executor that handles the scheduling task. | ||
| private val executor = | ||
|
|
@@ -334,6 +336,10 @@ private[spark] class ExecutorAllocationManager( | |
|
|
||
| // If the new target has not changed, avoid sending a message to the cluster manager | ||
| if (numExecutorsTarget < oldNumExecutorsTarget) { | ||
| // We lower the target number of executors but don't actively kill any yet. We do this | ||
|
||
| // in case an executor just happens to get lost (eg., bad hardware, or the cluster manager | ||
| // preempts it) -- in that case, there is no point in trying to immediately get a new | ||
| // executor, since we couldn't even use it yet. | ||
|
||
| client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) | ||
| logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " + | ||
| s"$oldNumExecutorsTarget) because not all requested executors are actually needed") | ||
|
|
@@ -455,7 +461,12 @@ private[spark] class ExecutorAllocationManager( | |
| val executorsRemoved = if (testing) { | ||
| executorIdsToBeRemoved | ||
| } else { | ||
| client.killExecutors(executorIdsToBeRemoved) | ||
| // We don't want to change our target number of executors, because we already did that | ||
| // when the task backlog decreased. Normally there wouldn't be any tasks running on these | ||
| // executors, but maybe the scheduler *just* decided to run a task there -- in that case, | ||
|
||
| // we don't want to count those failures. | ||
| client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = false, | ||
| countFailures = false, force = false) | ||
| } | ||
| // [SPARK-21834] killExecutors api reduces the target number of executors. | ||
| // So we need to update the target with desired value. | ||
|
|
@@ -575,7 +586,7 @@ private[spark] class ExecutorAllocationManager( | |
| // Note that it is not necessary to query the executors since all the cached | ||
| // blocks we are concerned with are reported to the driver. Note that this | ||
| // does not include broadcast blocks. | ||
| val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId) | ||
| val hasCachedBlocks = blockManagerMaster.hasCachedBlocks(executorId) | ||
| val now = clock.getTimeMillis() | ||
| val timeout = { | ||
| if (hasCachedBlocks) { | ||
|
|
@@ -610,7 +621,7 @@ private[spark] class ExecutorAllocationManager( | |
| * This class is intentionally conservative in its assumptions about the relative ordering | ||
| * and consistency of events returned by the listener. | ||
| */ | ||
| private class ExecutorAllocationListener extends SparkListener { | ||
| private[spark] class ExecutorAllocationListener extends SparkListener { | ||
|
|
||
| private val stageIdToNumTasks = new mutable.HashMap[Int, Int] | ||
| // Number of running tasks per stage including speculative tasks. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -533,7 +533,8 @@ class SparkContext(config: SparkConf) extends Logging { | |
| schedulerBackend match { | ||
| case b: ExecutorAllocationClient => | ||
| Some(new ExecutorAllocationManager( | ||
| schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) | ||
| schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, | ||
| _env.blockManager.master)) | ||
| case _ => | ||
| None | ||
| } | ||
|
|
@@ -1632,6 +1633,8 @@ class SparkContext(config: SparkConf) extends Logging { | |
| * :: DeveloperApi :: | ||
| * Request that the cluster manager kill the specified executors. | ||
| * | ||
| * This is not supported when dynamic allocation is turned on. | ||
| * | ||
| * @note This is an indication to the cluster manager that the application wishes to adjust | ||
| * its resource usage downwards. If the application wishes to replace the executors it kills | ||
| * through this method with new ones, it should follow up explicitly with a call to | ||
|
|
@@ -1643,7 +1646,10 @@ class SparkContext(config: SparkConf) extends Logging { | |
| def killExecutors(executorIds: Seq[String]): Boolean = { | ||
| schedulerBackend match { | ||
| case b: ExecutorAllocationClient => | ||
| b.killExecutors(executorIds, replace = false, force = true).nonEmpty | ||
| require(executorAllocationManager.isEmpty, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a developer api, so probably ok, but this is a change in behavior. Is it just not possible to support this with dynamic allocation?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would calling this mean with dynamic allocation on? Note this api explicitly says its meant to adjust resource usage downwards. If you've got just one executor, and then you kill it, should your app sit with 0 executors? Or even if you've got 10 executors, and you kill one -- when is dynamic allocation allowed to bump the total back up? I can't think of useful clear semantics for this (though this is not necessary to fix the bug, I could pull this out and move to a discussion in a new jira)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why you'd use this with dynamic allocation, but it's been possible in the past. It's probably ok to change this though.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, @squito , I'm quite questioned about the cases:
if app sit with 0 executors, then pending tasks increase, which lead to
for this case, to be honest, I really do not get your point. But, it must blame my poor English. And, what will happens if we use this method without see these several lines in Set Actually, I think this series methods, including WDYT?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point in general is that the semantics of combining
Thats true -- but only when pending tasks increase. But if you've got 0 executors, how do you expect pending tasks to increase? That would only happen when another taskset gets submitted, but with no executors your spark program will probably just be blocked. In the other case, I'm just trying to point out strange interactions between user control and dynamic allocation control. Imagine this sequence: Dynamic Allocation: 1000 tasks, so 1000 executors
hmm, from a quick look, I think you're right. it doesn't seem that using
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @squito , thanks for your reply.
And for And I checked
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @squito any thoughts? |
||
| "killExecutors() unsupported with Dynamic Allocation turned on") | ||
| b.killExecutors(executorIds, adjustTargetNumExecutors = true, countFailures = false, | ||
| force = true).nonEmpty | ||
| case _ => | ||
| logWarning("Killing executors is not supported by current scheduler.") | ||
| false | ||
|
|
@@ -1681,7 +1687,8 @@ class SparkContext(config: SparkConf) extends Logging { | |
| private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { | ||
| schedulerBackend match { | ||
| case b: ExecutorAllocationClient => | ||
| b.killExecutors(Seq(executorId), replace = true, force = true).nonEmpty | ||
| b.killExecutors(Seq(executorId), adjustTargetNumExecutors = false, countFailures = false, | ||
| force = true).nonEmpty | ||
| case _ => | ||
| logWarning("Killing executors is not supported by current scheduler.") | ||
| false | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -147,7 +147,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
|
|
||
| case KillExecutorsOnHost(host) => | ||
| scheduler.getExecutorsAliveOnHost(host).foreach { exec => | ||
| killExecutors(exec.toSeq, replace = true, force = true) | ||
| killExecutors(exec.toSeq, adjustTargetNumExecutors = false, countFailures = false, | ||
| force = true) | ||
| } | ||
|
|
||
| case UpdateDelegationTokens(newDelegationTokens) => | ||
|
|
@@ -584,18 +585,18 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| /** | ||
| * Request that the cluster manager kill the specified executors. | ||
| * | ||
| * When asking the executor to be replaced, the executor loss is considered a failure, and | ||
| * killed tasks that are running on the executor will count towards the failure limits. If no | ||
| * replacement is being requested, then the tasks will not count towards the limit. | ||
| * | ||
| * @param executorIds identifiers of executors to kill | ||
| * @param replace whether to replace the killed executors with new ones, default false | ||
| * @param adjustTargetNumExecutors whether the target number of executors be adjusted down | ||
| * after these executors have been killed | ||
| * @param countFailures if there are tasks running on the executors when they are killed, whether | ||
| * those failures be counted to task failure limits? | ||
| * @param force whether to force kill busy executors, default false | ||
| * @return the ids of the executors acknowledged by the cluster manager to be removed. | ||
| */ | ||
| final override def killExecutors( | ||
| executorIds: Seq[String], | ||
| replace: Boolean, | ||
| adjustTargetNumExecutors: Boolean, | ||
| countFailures: Boolean, | ||
| force: Boolean): Seq[String] = { | ||
| logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}") | ||
|
|
||
|
|
@@ -610,20 +611,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| val executorsToKill = knownExecutors | ||
| .filter { id => !executorsPendingToRemove.contains(id) } | ||
| .filter { id => force || !scheduler.isExecutorBusy(id) } | ||
| executorsToKill.foreach { id => executorsPendingToRemove(id) = !replace } | ||
| executorsToKill.foreach { id => executorsPendingToRemove(id) = !countFailures} | ||
|
||
|
|
||
| logInfo(s"Actual list of executor(s) to be killed is ${executorsToKill.mkString(", ")}") | ||
|
|
||
| // If we do not wish to replace the executors we kill, sync the target number of executors | ||
| // with the cluster manager to avoid allocating new ones. When computing the new target, | ||
| // take into account executors that are pending to be added or removed. | ||
| val adjustTotalExecutors = | ||
| if (!replace) { | ||
| if (adjustTargetNumExecutors) { | ||
| requestedTotalExecutors = math.max(requestedTotalExecutors - executorsToKill.size, 0) | ||
| if (requestedTotalExecutors != | ||
| (numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)) { | ||
| logDebug( | ||
| s"""killExecutors($executorIds, $replace, $force): Executor counts do not match: | ||
| s"""killExecutors($executorIds, $adjustTargetNumExecutors, $countFailures, $force): | ||
| |Executor counts do not match: | ||
| |requestedTotalExecutors = $requestedTotalExecutors | ||
| |numExistingExecutors = $numExistingExecutors | ||
| |numPendingExecutors = $numPendingExecutors | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still a little confused about this parameter.
If
force = false, it's a no op. And all call sites I've seen seem to set this parameter tofalse. So is there something I'm missing?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whoops, I was supposed to set
countFailures = trueinsc.killAndReplaceExecutors, thanks for catching that.