From 77fd5dc6b003584496ed438d758fff787fae43c3 Mon Sep 17 00:00:00 2001 From: Bo Zhang Date: Wed, 28 Sep 2022 18:39:05 +0800 Subject: [PATCH 1/4] Populate ExecutorDecommission with messages --- .../org/apache/spark/scheduler/DAGScheduler.scala | 2 +- .../apache/spark/scheduler/ExecutorLossReason.scala | 11 +++++++++-- .../org/apache/spark/scheduler/MapStatus.scala | 3 ++- .../org/apache/spark/scheduler/TaskSetManager.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 13 +++++++------ .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 2 +- .../BlockManagerDecommissionIntegrationSuite.scala | 4 +++- 7 files changed, 24 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 475afd01d00c..b17b9cc05c4c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -2899,7 +2899,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler case ExecutorLost(execId, reason) => val workerHost = reason match { case ExecutorProcessLost(_, workerHost, _) => workerHost - case ExecutorDecommission(workerHost) => workerHost + case ExecutorDecommission(workerHost, _) => workerHost case _ => None } dagScheduler.handleExecutorLost(execId, workerHost) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index f333c01bb890..18c7806e82eb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -77,6 +77,13 @@ case class ExecutorProcessLost( * If you update this code make sure to re-run the K8s integration tests. * * @param workerHost it is defined when the worker is decommissioned too + * @param _message detailed decommission message */ -private [spark] case class ExecutorDecommission(workerHost: Option[String] = None) - extends ExecutorLossReason("Executor decommission.") +private [spark] case class ExecutorDecommission( + workerHost: Option[String] = None, + _message: String = "") + extends ExecutorLossReason("Executor decommission.") + +private[spark] object ExecutorDecommission { + val msgPrefix = "Executor decommission: " +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index d10cf55ed0d1..c33a78bceca6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -123,7 +123,8 @@ private[spark] object MapStatus { private[spark] class CompressedMapStatus( private[this] var loc: BlockManagerId, private[this] var compressedSizes: Array[Byte], - private[this] var _mapTaskId: Long) + private[this] var _mapTaskId: Long, + private[this] var _checksumVal: Long = 0) extends MapStatus with Externalizable { // For deserialization only diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1d157f51fe67..943d1e53df44 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1071,7 +1071,7 @@ private[spark] class TaskSetManager( for ((tid, info) <- taskInfos if info.running && info.executorId == execId) { val exitCausedByApp: Boolean = reason match { case ExecutorExited(_, false, _) => false - case ExecutorKilled | ExecutorDecommission(_) => false + case ExecutorKilled | ExecutorDecommission(_, _) => false case ExecutorProcessLost(_, _, false) => false // If the task is launching, this indicates that Driver has sent LaunchTask to Executor, // but Executor has not sent StatusUpdate(TaskState.RUNNING) to Driver. Hence, we assume diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index f775ee874d0f..a58bc9f161f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned. Maps from executorId to workerHost. - protected val executorsPendingDecommission = new HashMap[String, Option[String]] + // Executors which are being decommissioned. + protected val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] // A map of ResourceProfile id to map of hostname with its possible task number running on it @GuardedBy("CoarseGrainedSchedulerBackend.this") @@ -444,11 +444,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap -= executorId executorsPendingLossReason -= executorId val killedByDriver = executorsPendingToRemove.remove(executorId).getOrElse(false) - val workerHostOpt = executorsPendingDecommission.remove(executorId) + val decommissionInfoOpt = executorsPendingDecommission.remove(executorId) if (killedByDriver) { ExecutorKilled - } else if (workerHostOpt.isDefined) { - ExecutorDecommission(workerHostOpt.get) + } else if (decommissionInfoOpt.isDefined) { + val decommissionInfo = decommissionInfoOpt.get + ExecutorDecommission(decommissionInfo.workerHost, decommissionInfo.message) } else { reason } @@ -532,7 +533,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Only bother decommissioning executors which are alive. if (isExecutorActive(executorId)) { scheduler.executorDecommission(executorId, decomInfo) - executorsPendingDecommission(executorId) = decomInfo.workerHost + executorsPendingDecommission(executorId) = decomInfo Some(executorId) } else { None diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index 129428963696..fc9248de7ee0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -356,7 +356,7 @@ private[spark] class ExecutorMonitor( if (removed != null) { decrementExecResourceProfileCount(removed.resourceProfileId) if (event.reason == ExecutorLossMessage.decommissionFinished || - event.reason == ExecutorDecommission().message) { + (event.reason != null && event.reason.startsWith(ExecutorDecommission.msgPrefix))) { metrics.gracefullyDecommissioned.inc() } else if (removed.decommissioning) { metrics.decommissionUnfinished.inc() diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index e004c334dee7..456538544cfe 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -186,6 +186,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS sc.addSparkListener(new SparkListener { override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { executorRemovedSem.release() + assert(execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0" || + execRemoved.reason == "Command exited with code 0") } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -247,7 +249,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // Decommission executor and ensure it is not relaunched by setting adjustTargetNumExecutors sched.decommissionExecutor( execToDecommission, - ExecutorDecommissionInfo("", None), + ExecutorDecommissionInfo("test msg 0", None), adjustTargetNumExecutors = true) val decomTime = new SystemClock().getTimeMillis() From 48b705df564e3b23fe6213e329f3362aa96d9f00 Mon Sep 17 00:00:00 2001 From: Bo Zhang Date: Thu, 29 Sep 2022 08:46:33 +0800 Subject: [PATCH 2/4] Revert change in MapStatus --- core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index c33a78bceca6..d10cf55ed0d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -123,8 +123,7 @@ private[spark] object MapStatus { private[spark] class CompressedMapStatus( private[this] var loc: BlockManagerId, private[this] var compressedSizes: Array[Byte], - private[this] var _mapTaskId: Long, - private[this] var _checksumVal: Long = 0) + private[this] var _mapTaskId: Long) extends MapStatus with Externalizable { // For deserialization only From a69519dec36c5e2783c4c3a83430d702777b7ec2 Mon Sep 17 00:00:00 2001 From: Bo Zhang Date: Thu, 29 Sep 2022 08:49:45 +0800 Subject: [PATCH 3/4] Actually uses the msgPrefix in ExecutorDecommission --- .../scala/org/apache/spark/scheduler/ExecutorLossReason.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 18c7806e82eb..35af882619b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -82,7 +82,7 @@ case class ExecutorProcessLost( private [spark] case class ExecutorDecommission( workerHost: Option[String] = None, _message: String = "") - extends ExecutorLossReason("Executor decommission.") + extends ExecutorLossReason(ExecutorDecommission.msgPrefix + _message) private[spark] object ExecutorDecommission { val msgPrefix = "Executor decommission: " From 250a30236c625319d9217c27476b4722fa68cb76 Mon Sep 17 00:00:00 2001 From: Bo Zhang Date: Sat, 8 Oct 2022 15:05:24 +0800 Subject: [PATCH 4/4] Address comments --- .../spark/scheduler/ExecutorLossReason.scala | 6 +++--- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../BlockManagerDecommissionIntegrationSuite.scala | 14 ++++++++++---- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala index 35af882619b8..fb6a62551fa4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorLossReason.scala @@ -77,12 +77,12 @@ case class ExecutorProcessLost( * If you update this code make sure to re-run the K8s integration tests. * * @param workerHost it is defined when the worker is decommissioned too - * @param _message detailed decommission message + * @param reason detailed decommission message */ private [spark] case class ExecutorDecommission( workerHost: Option[String] = None, - _message: String = "") - extends ExecutorLossReason(ExecutorDecommission.msgPrefix + _message) + reason: String = "") + extends ExecutorLossReason(ExecutorDecommission.msgPrefix + reason) private[spark] object ExecutorDecommission { val msgPrefix = "Executor decommission: " diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a58bc9f161f6..08c4f944c844 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -99,7 +99,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors that have been lost, but for which we don't yet know the real exit reason. protected val executorsPendingLossReason = new HashSet[String] - // Executors which are being decommissioned. + // Executors which are being decommissioned. Maps from executorId to ExecutorDecommissionInfo. protected val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] // A map of ResourceProfile id to map of hostname with its possible task number running on it diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 456538544cfe..d9d2e6102f12 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -183,11 +183,14 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS taskEndEvents.asScala.filter(_.taskInfo.successful).map(_.taskInfo.executorId).headOption } - sc.addSparkListener(new SparkListener { + val listener = new SparkListener { + var removeReasonValidated = false + override def onExecutorRemoved(execRemoved: SparkListenerExecutorRemoved): Unit = { executorRemovedSem.release() - assert(execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0" || - execRemoved.reason == "Command exited with code 0") + if (execRemoved.reason == ExecutorDecommission.msgPrefix + "test msg 0") { + removeReasonValidated = true + } } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { @@ -213,7 +216,8 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS } } } - }) + } + sc.addSparkListener(listener) // Cache the RDD lazily if (persist) { @@ -345,5 +349,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS // should have same value like before assert(testRdd.count() === numParts) assert(accum.value === numParts) + import scala.language.reflectiveCalls + assert(listener.removeReasonValidated) } }