diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 475afd01d00c..b17b9cc05c4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2899,7 +2899,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorLost(execId, reason) => val workerHost = reason match { case ExecutorProcessLost(_, workerHost, _) => workerHost - case ExecutorDecommission(workerHost) => workerHost + case ExecutorDecommission(workerHost, _) => workerHost case _ => None } dagScheduler.handleExecutorLost(execId, workerHost) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index f333c01bb890..fb6a62551fa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -77,6 +77,13 @@ case class ExecutorProcessLost( * If you update this code make sure to re-run the K8s integration tests. * * @param workerHost it is defined when the worker is decommissioned too + * @param reason detailed decommission message */ -private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) - extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission( + workerHost: Option[String] = None, + reason: String = "") + extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason) + +private[spark] object ExecutorDecommission { + val msgPrefix = "Executor decommission: " +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1d157f51fe67..943d1e53df44 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case ExecutorExited(_, false, _) => false - case ExecutorKilled | ExecutorDecommission(_) => false + case ExecutorKilled | ExecutorDecommission(_, _) => false case ExecutorProcessLost(_, _, false) => false // If the task is launching, this indicates that Driver has sent LaunchTask to Executor, // but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f775ee874d0f..08c4f944c844 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned. Maps from executorId to workerHost. - protected val executorsPendingDecommission = new HashMap[String, Option[String]] + // Executors which are being decommissioned. Maps from executorId to ExecutorDecommissionInfo. + protected val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -444,11 +444,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap -= executorId executorsPendingLossReason -= executorId val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) - val workerHostOpt = executorsPendingDecommission.remove(executorId) + val decommissionInfoOpt = executorsPendingDecommission.remove(executorId) if (killedByDriver) { ExecutorKilled - } else if (workerHostOpt.isDefined) { - ExecutorDecommission(workerHostOpt.get) + } else if (decommissionInfoOpt.isDefined) { + val decommissionInfo = decommissionInfoOpt.get + ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message) } else { reason } @@ -532,7 +533,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { scheduler.executorDecommission(executorId, decomInfo) - executorsPendingDecommission(executorId) = decomInfo.workerHost + executorsPendingDecommission(executorId) = decomInfo Some(executorId) } else { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 129428963696..fc9248de7ee0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor( if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) if (event.reason == ExecutorLossMessage.decommissionFinished || - event.reason == ExecutorDecommission().message) { + (event.reason != null && event.reason.startsWith(ExecutorDecommission.msgPrefix))) { metrics.gracefullyDecommissioned.inc() } else if (removed.decommissioning) { metrics.decommissionUnfinished.inc() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index e004c334dee7..d9d2e6102f12 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -183,9 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption } - sc.addSparkListener(new SparkListener { + val listener = new SparkListener { + var removeReasonValidated = false + override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { executorRemovedSem.release() + if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0") { + removeReasonValidated = true + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -211,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } } } - }) + } + sc.addSparkListener(listener) // Cache the RDD lazily if (persist) { @@ -247,7 +253,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", None), + ExecutorDecommissionInfo("test msg 0", None), adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() @@ -343,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // should have same value like before assert(testRdd.count() === numParts) assert(accum.value === numParts) + import scala.language.reflectiveCalls + assert(listener.removeReasonValidated) } }