diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c2989314262c5..b6e14e8210c86 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -580,7 +580,7 @@ private[spark] class ExecutorAllocationManager( // when the task backlog decreased. if (decommissionEnabled) { val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map( - id => (id, ExecutorDecommissionInfo("spark scale down", false))).toArray + id => (id, ExecutorDecommissionInfo("spark scale down"))).toArray client.decommissionExecutors(executorIdsWithoutHostLoss, adjustTargetNumExecutors = false) } else { client.killExecutors(executorIdsToBeRemoved.toSeq, adjustTargetNumExecutors = false, diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index b7a64d75a8d47..83f373d526e90 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -187,8 +187,10 @@ private[deploy] object DeployMessages { Utils.checkHostPort(hostPort) } + // When the host of Worker is lost or decommissioned, the `workerHost` is the host address + // of that Worker. Otherwise, it's None. case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], - exitStatus: Option[Int], workerLost: Boolean) + exitStatus: Option[Int], workerHost: Option[String]) case class ApplicationRemoved(message: String) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala index a6da8393bf405..e5efb15f6bc51 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClient.scala @@ -175,15 +175,15 @@ private[spark] class StandaloneAppClient( cores)) listener.executorAdded(fullId, workerId, hostPort, cores, memory) - case ExecutorUpdated(id, state, message, exitStatus, workerLost) => + case ExecutorUpdated(id, state, message, exitStatus, workerHost) => val fullId = appId + "/" + id val messageText = message.map(s => " (" + s + ")").getOrElse("") logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText)) if (ExecutorState.isFinished(state)) { - listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerLost) + listener.executorRemoved(fullId, message.getOrElse(""), exitStatus, workerHost) } else if (state == ExecutorState.DECOMMISSIONED) { listener.executorDecommissioned(fullId, - ExecutorDecommissionInfo(message.getOrElse(""), isHostDecommissioned = workerLost)) + ExecutorDecommissionInfo(message.getOrElse(""), workerHost)) } case WorkerRemoved(id, host, message) => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala index e72f7e976bb0a..76970ac9829c9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/StandaloneAppClientListener.scala @@ -39,7 +39,7 @@ private[spark] trait StandaloneAppClientListener { fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved( - fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit + fullId: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit def executorDecommissioned(fullId: String, decommissionInfo: ExecutorDecommissionInfo): Unit diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 220e1c963d5ea..48516cdf83291 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -308,7 +308,7 @@ private[deploy] class Master( appInfo.resetRetryCount() } - exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false)) + exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, None)) if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app @@ -909,9 +909,10 @@ private[deploy] class Master( exec.application.driver.send(ExecutorUpdated( exec.id, ExecutorState.DECOMMISSIONED, Some("worker decommissioned"), None, - // workerLost is being set to true here to let the driver know that the host (aka. worker) - // is also being decommissioned. - workerLost = true)) + // worker host is being set here to let the driver know that the host (aka. worker) + // is also being decommissioned. So the driver can unregister all the shuffle map + // statues located at this host when it receives the executor lost event. + Some(worker.host))) exec.state = ExecutorState.DECOMMISSIONED exec.application.removeExecutor(exec) } @@ -932,7 +933,7 @@ private[deploy] class Master( for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) exec.application.driver.send(ExecutorUpdated( - exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true)) + exec.id, ExecutorState.LOST, Some("worker lost"), None, Some(worker.host))) exec.state = ExecutorState.LOST exec.application.removeExecutor(exec) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 07258f270b458..48045bafe6e3f 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -172,10 +172,7 @@ private[spark] class CoarseGrainedExecutorBackend( driver match { case Some(endpoint) => logInfo("Sending DecommissionExecutor to driver.") - endpoint.send( - DecommissionExecutor( - executorId, - ExecutorDecommissionInfo(msg, isHostDecommissioned = false))) + endpoint.send(DecommissionExecutor(executorId, ExecutorDecommissionInfo(msg))) case _ => logError("No registered driver to send Decommission to.") } @@ -275,7 +272,7 @@ private[spark] class CoarseGrainedExecutorBackend( // Tell master we are are decommissioned so it stops trying to schedule us if (driver.nonEmpty) { driver.get.askSync[Boolean](DecommissionExecutor( - executorId, ExecutorDecommissionInfo(msg, false))) + executorId, ExecutorDecommissionInfo(msg))) } else { logError("No driver to message decommissioning.") } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b308115935d64..c3482c94761f6 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1890,16 +1890,6 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional - private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL = - ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL") - .doc("Duration for which a decommissioned executor's information will be kept after its" + - "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " + - "decommissioning even after the mapper executor has been decommissioned. This allows " + - "eager recovery from fetch failures caused by decommissioning, increasing job robustness.") - .version("3.1.0") - .timeConf(TimeUnit.SECONDS) - .createWithDefaultString("5m") - private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 18cd2410c1e4c..080e0e7f1552f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1826,7 +1826,7 @@ private[spark] class DAGScheduler( val externalShuffleServiceEnabled = env.blockManager.externalShuffleServiceEnabled val isHostDecommissioned = taskScheduler .getExecutorDecommissionState(bmAddress.executorId) - .exists(_.isHostDecommissioned) + .exists(_.workerHost.isDefined) // Shuffle output of all executors on host `bmAddress.host` may be lost if: // - External shuffle service is enabled, so we assume that all shuffle data on node is @@ -1989,15 +1989,15 @@ private[spark] class DAGScheduler( */ private[scheduler] def handleExecutorLost( execId: String, - workerLost: Boolean): Unit = { + workerHost: Option[String]): Unit = { // if the cluster manager explicitly tells us that the entire worker was lost, then // we know to unregister shuffle output. (Note that "worker" specifically refers to the process // from a Standalone cluster, where the shuffle service lives in the Worker.) - val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled + val fileLost = workerHost.isDefined || !env.blockManager.externalShuffleServiceEnabled removeExecutorAndUnregisterOutputs( execId = execId, fileLost = fileLost, - hostToUnregisterOutputs = None, + hostToUnregisterOutputs = workerHost, maybeEpoch = None) } @@ -2366,11 +2366,12 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId, reason) => - val workerLost = reason match { - case ExecutorProcessLost(_, true, _) => true - case _ => false + val workerHost = reason match { + case ExecutorProcessLost(_, workerHost, _) => workerHost + case ExecutorDecommission(workerHost) => workerHost + case _ => None } - dagScheduler.handleExecutorLost(execId, workerLost) + dagScheduler.handleExecutorLost(execId, workerHost) case WorkerRemoved(workerId, host, message) => dagScheduler.handleWorkerRemoved(workerId, host, message) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala index 48ae879a518ce..7eec070232c3b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorDecommissionInfo.scala @@ -20,12 +20,12 @@ package org.apache.spark.scheduler /** * Message providing more detail when an executor is being decommissioned. * @param message Human readable reason for why the decommissioning is happening. - * @param isHostDecommissioned Whether the host (aka the `node` or `worker` in other places) is - * being decommissioned too. Used to infer if the shuffle data might - * be lost even if the external shuffle service is enabled. + * @param workerHost When workerHost is defined, it means the host (aka the `node` or `worker` + * in other places) has been decommissioned too. Used to infer if the + * shuffle data might be lost even if the external shuffle service is enabled. */ private[spark] -case class ExecutorDecommissionInfo(message: String, isHostDecommissioned: Boolean) +case class ExecutorDecommissionInfo(message: String, workerHost: Option[String] = None) /** * State related to decommissioning that is kept by the TaskSchedulerImpl. This state is derived @@ -37,4 +37,4 @@ case class ExecutorDecommissionState( // to estimate when the executor might eventually be lost if EXECUTOR_DECOMMISSION_KILL_INTERVAL // is configured. startTime: Long, - isHostDecommissioned: Boolean) + workerHost: Option[String] = None) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 671dedaa5a6e8..f2eb4a7047b56 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -53,14 +53,15 @@ private [spark] object LossReasonPending extends ExecutorLossReason("Pending los /** * @param _message human readable loss reason - * @param workerLost whether the worker is confirmed lost too (i.e. including shuffle service) + * @param workerHost it's defined when the host is confirmed lost too (i.e. including + * shuffle service) * @param causedByApp whether the loss of the executor is the fault of the running app. * (assumed true by default unless known explicitly otherwise) */ private[spark] case class ExecutorProcessLost( _message: String = "Executor Process Lost", - workerLost: Boolean = false, + workerHost: Option[String] = None, causedByApp: Boolean = true) extends ExecutorLossReason(_message) @@ -69,5 +70,8 @@ case class ExecutorProcessLost( * * This is used by the task scheduler to remove state associated with the executor, but * not yet fail any tasks that were running in the executor before the executor is "fully" lost. + * + * @param workerHost it is defined when the worker is decommissioned too */ -private [spark] object ExecutorDecommission extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) + extends ExecutorLossReason("Executor decommission.") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d446638107690..107c517ca06bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -143,18 +143,6 @@ private[spark] class TaskSchedulerImpl( // continue to run even after being asked to decommission, but they will eventually exit. val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionState] - // When they exit and we know of that via heartbeat failure, we will add them to this cache. - // This cache is consulted to know if a fetch failure is because a source executor was - // decommissioned. - lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() - .expireAfterWrite( - conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS) - .ticker(new Ticker{ - override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) - }) - .build[String, ExecutorDecommissionState]() - .asMap() - def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap } @@ -922,28 +910,8 @@ private[spark] class TaskSchedulerImpl( synchronized { // Don't bother noting decommissioning for executors that we don't know about if (executorIdToHost.contains(executorId)) { - val oldDecomStateOpt = executorsPendingDecommission.get(executorId) - val newDecomState = if (oldDecomStateOpt.isEmpty) { - // This is the first time we are hearing of decommissioning this executor, - // so create a brand new state. - ExecutorDecommissionState( - clock.getTimeMillis(), - decommissionInfo.isHostDecommissioned) - } else { - val oldDecomState = oldDecomStateOpt.get - if (!oldDecomState.isHostDecommissioned && decommissionInfo.isHostDecommissioned) { - // Only the cluster manager is allowed to send decommission messages with - // isHostDecommissioned set. So the new decommissionInfo is from the cluster - // manager and is thus authoritative. Flip isHostDecommissioned to true but keep the old - // decommission start time. - ExecutorDecommissionState( - oldDecomState.startTime, - isHostDecommissioned = true) - } else { - oldDecomState - } - } - executorsPendingDecommission(executorId) = newDecomState + executorsPendingDecommission(executorId) = + ExecutorDecommissionState(clock.getTimeMillis(), decommissionInfo.workerHost) } } rootPool.executorDecommission(executorId) @@ -952,26 +920,11 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionState(executorId: String) : Option[ExecutorDecommissionState] = synchronized { - executorsPendingDecommission - .get(executorId) - .orElse(Option(decommissionedExecutorsRemoved.get(executorId))) + executorsPendingDecommission.get(executorId) } - override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { + override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { var failedExecutor: Option[String] = None - val reason = givenReason match { - // Handle executor process loss due to decommissioning - case ExecutorProcessLost(message, origWorkerLost, origCausedByApp) => - val executorDecommissionState = getExecutorDecommissionState(executorId) - ExecutorProcessLost( - message, - // Also mark the worker lost if we know that the host was decommissioned - origWorkerLost || executorDecommissionState.exists(_.isHostDecommissioned), - // Executor loss is certainly not caused by app if we knew that this executor is being - // decommissioned - causedByApp = executorDecommissionState.isEmpty && origCausedByApp) - case e => e - } synchronized { if (executorIdToRunningTaskIds.contains(executorId)) { @@ -1060,9 +1013,7 @@ private[spark] class TaskSchedulerImpl( } } - - val decomState = executorsPendingDecommission.remove(executorId) - decomState.foreach(decommissionedExecutorsRemoved.put(executorId, _)) + executorsPendingDecommission.remove(executorId) if (reason != LossReasonPending) { executorIdToHost -= executorId @@ -1104,7 +1055,7 @@ private[spark] class TaskSchedulerImpl( // exposed for test protected final def isHostDecommissioned(host: String): Boolean = { hostToExecutors.get(host).exists { executors => - executors.exists(e => getExecutorDecommissionState(e).exists(_.isHostDecommissioned)) + executors.exists(e => getExecutorDecommissionState(e).exists(_.workerHost.isDefined)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ff0387602273d..673fe4fe27519 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -988,7 +988,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case exited: ExecutorExited => exited.exitCausedByApp - case ExecutorKilled => false + case ExecutorKilled | ExecutorDecommission(_) => false case ExecutorProcessLost(_, _, false) => false case _ => true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index ca657313c14f6..0f144125af7bf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -92,8 +92,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. private val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned - protected val executorsPendingDecommission = new HashSet[String] + // Executors which are being decommissioned. Maps from executorId to workerHost. + protected val executorsPendingDecommission = new HashMap[String, Option[String]] // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -390,16 +390,23 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp case Some(executorInfo) => // This must be synchronized because variables mutated // in this block are read when requesting executors - val killed = CoarseGrainedSchedulerBackend.this.synchronized { + val lossReason = CoarseGrainedSchedulerBackend.this.synchronized { addressToExecutorId -= executorInfo.executorAddress executorDataMap -= executorId executorsPendingLossReason -= executorId - executorsPendingDecommission -= executorId - executorsPendingToRemove.remove(executorId).getOrElse(false) + val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) + val workerHostOpt = executorsPendingDecommission.remove(executorId) + if (killedByDriver) { + ExecutorKilled + } else if (workerHostOpt.isDefined) { + ExecutorDecommission(workerHostOpt.get) + } else { + reason + } } totalCoreCount.addAndGet(-executorInfo.totalCores) totalRegisteredExecutors.addAndGet(-1) - scheduler.executorLost(executorId, if (killed) ExecutorKilled else reason) + scheduler.executorLost(executorId, lossReason) listenerBus.post( SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason.toString)) case None => @@ -462,11 +469,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], adjustTargetNumExecutors: Boolean): Seq[String] = { - val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, _) => + val executorsToDecommission = executorsAndDecomInfo.filter { case (executorId, decomInfo) => CoarseGrainedSchedulerBackend.this.synchronized { // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { - executorsPendingDecommission += executorId + executorsPendingDecommission(executorId) = decomInfo.workerHost true } else { false @@ -489,19 +496,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp decomInfo: ExecutorDecommissionInfo): Boolean = { logInfo(s"Asking executor $executorId to decommissioning.") - try { - scheduler.executorDecommission(executorId, decomInfo) - if (driverEndpoint != null) { - logInfo("Propagating executor decommission to driver.") - driverEndpoint.send(DecommissionExecutor(executorId, decomInfo)) - } - } catch { - case e: Exception => - logError(s"Unexpected error during decommissioning ${e.toString}", e) - return false - } + scheduler.executorDecommission(executorId, decomInfo) // Send decommission message to the executor (it could have originated on the executor - // but not necessarily. + // but not necessarily). CoarseGrainedSchedulerBackend.this.synchronized { executorDataMap.get(executorId) match { case Some(executorInfo) => @@ -656,7 +653,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp !executorsPendingToRemove.contains(id) && !executorsPendingLossReason.contains(id) && !executorsPendingDecommission.contains(id) - } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 3acb6f1088e13..34b03dfec9e80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -165,10 +165,13 @@ private[spark] class StandaloneSchedulerBackend( } override def executorRemoved( - fullId: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { + fullId: String, + message: String, + exitStatus: Option[Int], + workerHost: Option[String]): Unit = { val reason: ExecutorLossReason = exitStatus match { case Some(code) => ExecutorExited(code, exitCausedByApp = true, message) - case None => ExecutorProcessLost(message, workerLost = workerLost) + case None => ExecutorProcessLost(message, workerHost) } logInfo("Executor %s removed: %s".format(fullId, message)) removeExecutor(fullId.split("/")(1), reason) diff --git a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala index 85ad4bdb3ec10..fe88822bb46b5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/client/AppClientSuite.scala @@ -129,7 +129,7 @@ class AppClientSuite // We only record decommissioning for the executor we've requested assert(ci.listener.execDecommissionedMap.size === 1) val decommissionInfo = ci.listener.execDecommissionedMap.get(executorId) - assert(decommissionInfo != null && decommissionInfo.isHostDecommissioned, + assert(decommissionInfo != null && decommissionInfo.workerHost.isDefined, s"$executorId should have been decommissioned along with its worker") } @@ -245,7 +245,7 @@ class AppClientSuite } def executorRemoved( - id: String, message: String, exitStatus: Option[Int], workerLost: Boolean): Unit = { + id: String, message: String, exitStatus: Option[Int], workerHost: Option[String]): Unit = { execRemovedList.add(id) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a7f8affee918c..436765808e22b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -848,9 +848,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } private val shuffleFileLossTests = Seq( - ("executor process lost with shuffle service", ExecutorProcessLost("", false), true, false), - ("worker lost with shuffle service", ExecutorProcessLost("", true), true, true), - ("worker lost without shuffle service", ExecutorProcessLost("", true), false, true), + ("executor process lost with shuffle service", ExecutorProcessLost("", None), true, false), + ("worker lost with shuffle service", ExecutorProcessLost("", Some("hostA")), true, true), + ("worker lost without shuffle service", ExecutorProcessLost("", Some("hostA")), false, true), ("executor failure with shuffle service", ExecutorKilled, true, false), ("executor failure without shuffle service", ExecutorKilled, false, true)) @@ -874,10 +874,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker) submit(reduceRdd, Array(0)) completeShuffleMapStageSuccessfully(0, 0, 1) + val expectHostFileLoss = event match { + case ExecutorProcessLost(_, workerHost, _) => workerHost.isDefined + case _ => false + } runEvent(ExecutorLost("hostA-exec", event)) verify(blockManagerMaster, times(1)).removeExecutor("hostA-exec") if (expectFileLoss) { - verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") + if (expectHostFileLoss) { + verify(mapOutputTracker, times(1)).removeOutputsOnHost("hostA") + } else { + verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("hostA-exec") + } intercept[MetadataFetchFailedException] { mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 26c9d9130e56a..f29eb70eb3628 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -158,8 +158,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .exists(s => s.contains(exec0) && s.contains(exec1))) assert(scheduler.getExecutorsAliveOnHost(host1).exists(_.contains(exec2))) - scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", false)) - scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", true)) + scheduler.executorDecommission(exec1, ExecutorDecommissionInfo("test", None)) + scheduler.executorDecommission(exec2, ExecutorDecommissionInfo("test", Some(host1))) assert(scheduler.isExecutorAlive(exec0)) assert(!Seq(exec1, exec2).exists(scheduler.isExecutorAlive)) @@ -1864,18 +1864,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("scheduler should keep the decommission state where host was decommissioned") { val clock = new ManualClock(10000L) val scheduler = setupSchedulerForDecommissionTests(clock, 2) - val oldTime = clock.getTimeMillis() - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false)) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true)) - - clock.advance(3000L) - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0 new", false)) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1 new", false)) + val decomTime = clock.getTimeMillis() + scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", None)) + scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", Some("host1"))) assert(scheduler.getExecutorDecommissionState("executor0") - === Some(ExecutorDecommissionState(oldTime, false))) + === Some(ExecutorDecommissionState(decomTime, None))) assert(scheduler.getExecutorDecommissionState("executor1") - === Some(ExecutorDecommissionState(oldTime, true))) + === Some(ExecutorDecommissionState(decomTime, Some("host1")))) assert(scheduler.getExecutorDecommissionState("executor2").isEmpty) } @@ -1890,7 +1886,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) scheduler.executorLost("executor0", ExecutorExited(0, false, "normal")) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) - scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false)) + scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", None)) assert(scheduler.getExecutorDecommissionState("executor0").isEmpty) // 0th task just died above @@ -1903,31 +1899,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) // executor 1 is decommissioned before loosing - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) + scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", None)) assert(scheduler.getExecutorDecommissionState("executor1").isDefined) clock.advance(2000) // executor1 is eventually lost scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.decommissionedExecutorsRemoved.size === 1) assert(scheduler.executorsPendingDecommission.isEmpty) // So now both the tasks are no longer running assert(manager.copiesRunning.take(2) === Array(0, 0)) clock.advance(2000) - // Decommission state should hang around a bit after removal ... - assert(scheduler.getExecutorDecommissionState("executor1").isDefined) - scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) - clock.advance(2000) - assert(scheduler.decommissionedExecutorsRemoved.size === 1) - assert(scheduler.getExecutorDecommissionState("executor1").isDefined) - - // The default timeout for expiry is 300k milliseconds (5 minutes) which completes now, - // and the executor1's decommission state should finally be purged. - clock.advance(300000) - assert(scheduler.getExecutorDecommissionState("executor1").isEmpty) - assert(scheduler.decommissionedExecutorsRemoved.isEmpty) - // Now give it some resources and both tasks should be rerun val taskDescriptions = taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor2", "host2", 1), WorkerOffer("executor3", "host3", 1))).flatten diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 86d4e92df723b..c389fd2ffa8b1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -681,8 +681,8 @@ class TaskSetManagerSuite assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY)) // Decommission all executors on host0, to mimic CoarseGrainedSchedulerBackend. - sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", true)) - sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", true)) + sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", Some(host0))) + sched.executorDecommission(exec1, ExecutorDecommissionInfo("test", Some(host0))) assert(manager.myLocalityLevels === Array(ANY)) } @@ -707,7 +707,7 @@ class TaskSetManagerSuite assert(manager.myLocalityLevels === Array(PROCESS_LOCAL, NODE_LOCAL, ANY)) // Decommission the only executor (without the host) that the task is interested in running on. - sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", false)) + sched.executorDecommission(exec0, ExecutorDecommissionInfo("test", None)) assert(manager.myLocalityLevels === Array(NODE_LOCAL, ANY)) } @@ -2029,8 +2029,7 @@ class TaskSetManagerSuite // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3) will be now // checked if they should be speculated. // (TASK 2 -> 15, TASK 3 -> 15) - sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", - isHostDecommissioned = false)) + sched.executorDecommission("exec2", ExecutorDecommissionInfo("decom", None)) assert(sched.getExecutorDecommissionState("exec2").map(_.startTime) === Some(clock.getTimeMillis())) diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala index 4264d45b36f2a..129eb8bf91051 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala @@ -64,7 +64,7 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkConte val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] sc.getExecutorIds().tail.foreach { id => - sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false), + sched.decommissionExecutor(id, ExecutorDecommissionInfo("", None), adjustTargetNumExecutors = false) assert(rdd3.sortByKey().collect().length === 100) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala index 1ccb53f32dc2e..83bb66efdac9e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala @@ -77,7 +77,7 @@ class WorkerDecommissionSuite extends SparkFunSuite with LocalSparkContext { val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend] val execs = sched.getExecutorIds() // Make the executors decommission, finish, exit, and not be replaced. - val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false))).toArray + val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", None))).toArray sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = true) val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds) assert(asyncCountResult === 10) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 37836a9b49042..094b893cdda2e 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -192,7 +192,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", isHostDecommissioned = false), + ExecutorDecommissionInfo("", None), adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala index 8f74d2d9959d1..1037950a4424f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala @@ -137,7 +137,7 @@ private[streaming] class ExecutorAllocationManager( val execIdToRemove = removableExecIds(Random.nextInt(removableExecIds.size)) if (conf.get(DECOMMISSION_ENABLED)) { client.decommissionExecutor(execIdToRemove, - ExecutorDecommissionInfo("spark scale down", false), + ExecutorDecommissionInfo("spark scale down", None), adjustTargetNumExecutors = true) } else { client.killExecutor(execIdToRemove) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index ec3ff456b8eab..f1870718c6730 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -98,7 +98,7 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase /** Verify that a particular executor was scaled down. */ def verifyScaledDownExec(expectedExec: Option[String]): Unit = { if (expectedExec.nonEmpty) { - val decomInfo = ExecutorDecommissionInfo("spark scale down", false) + val decomInfo = ExecutorDecommissionInfo("spark scale down", None) if (decommissioning) { verify(allocationClient, times(1)).decommissionExecutor( meq(expectedExec.get), meq(decomInfo), meq(true))