diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 55fb76b3572a..07258f270b45 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -294,10 +294,15 @@ private[spark] class CoarseGrainedExecutorBackend( override def run(): Unit = { var lastTaskRunningTime = System.nanoTime() val sleep_time = 1000 // 1s - + // This config is internal and only used by unit tests to force an executor + // to hang around for longer when decommissioned. + val initialSleepMillis = env.conf.getInt( + "spark.test.executor.decommission.initial.sleep.millis", sleep_time) + if (initialSleepMillis > 0) { + Thread.sleep(initialSleepMillis) + } while (true) { logInfo("Checking to see if we can shutdown.") - Thread.sleep(sleep_time) if (executor == null || executor.numRunningTasks == 0) { if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) { logInfo("No running tasks, checking migrations") @@ -323,6 +328,7 @@ private[spark] class CoarseGrainedExecutorBackend( // move forward. lastTaskRunningTime = System.nanoTime() } + Thread.sleep(sleep_time) } } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 200cde0a2d3e..34acf9f9b30c 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1877,6 +1877,16 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL = + ConfigBuilder("spark.executor.decommission.removed.infoCacheTTL") + .doc("Duration for which a decommissioned executor's information will be kept after its" + + "removal. Keeping the decommissioned info after removal helps pinpoint fetch failures to " + + "decommissioning even after the mapper executor has been decommissioned. This allows " + + "eager recovery from fetch failures caused by decommissioning, increasing job robustness.") + .version("3.1.0") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("5m") + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7641948ed4b3..ae0387e09cc6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1846,7 +1846,14 @@ private[spark] class DAGScheduler( execId = bmAddress.executorId, fileLost = true, hostToUnregisterOutputs = hostToUnregisterOutputs, - maybeEpoch = Some(task.epoch)) + maybeEpoch = Some(task.epoch), + // shuffleFileLostEpoch is ignored when a host is decommissioned because some + // decommissioned executors on that host might have been removed before this fetch + // failure and might have bumped up the shuffleFileLostEpoch. We ignore that, and + // proceed with unconditional removal of shuffle outputs from all executors on that + // host, including from those that we still haven't confirmed as lost due to heartbeat + // delays. + ignoreShuffleFileLostEpoch = isHostDecommissioned) } } @@ -2012,7 +2019,8 @@ private[spark] class DAGScheduler( execId: String, fileLost: Boolean, hostToUnregisterOutputs: Option[String], - maybeEpoch: Option[Long] = None): Unit = { + maybeEpoch: Option[Long] = None, + ignoreShuffleFileLostEpoch: Boolean = false): Unit = { val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch) logDebug(s"Considering removal of executor $execId; " + s"fileLost: $fileLost, currentEpoch: $currentEpoch") @@ -2022,16 +2030,25 @@ private[spark] class DAGScheduler( blockManagerMaster.removeExecutor(execId) clearCacheLocs() } - if (fileLost && - (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) { - shuffleFileLostEpoch(execId) = currentEpoch - hostToUnregisterOutputs match { - case Some(host) => - logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") - mapOutputTracker.removeOutputsOnHost(host) - case None => - logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") - mapOutputTracker.removeOutputsOnExecutor(execId) + if (fileLost) { + val remove = if (ignoreShuffleFileLostEpoch) { + true + } else if (!shuffleFileLostEpoch.contains(execId) || + shuffleFileLostEpoch(execId) < currentEpoch) { + shuffleFileLostEpoch(execId) = currentEpoch + true + } else { + false + } + if (remove) { + hostToUnregisterOutputs match { + case Some(host) => + logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnHost(host) + case None => + logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)") + mapOutputTracker.removeOutputsOnExecutor(execId) + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index a0c507e7f893..2a382380691d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -26,6 +26,9 @@ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet} import scala.util.Random +import com.google.common.base.Ticker +import com.google.common.cache.CacheBuilder + import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics @@ -136,7 +139,21 @@ private[spark] class TaskSchedulerImpl( // IDs of the tasks running on each executor private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]] - private val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] + // We add executors here when we first get decommission notification for them. Executors can + // continue to run even after being asked to decommission, but they will eventually exit. + val executorsPendingDecommission = new HashMap[String, ExecutorDecommissionInfo] + + // When they exit and we know of that via heartbeat failure, we will add them to this cache. + // This cache is consulted to know if a fetch failure is because a source executor was + // decommissioned. + lazy val decommissionedExecutorsRemoved = CacheBuilder.newBuilder() + .expireAfterWrite( + conf.get(DECOMMISSIONED_EXECUTORS_REMEMBER_AFTER_REMOVAL_TTL), TimeUnit.SECONDS) + .ticker(new Ticker{ + override def read(): Long = TimeUnit.MILLISECONDS.toNanos(clock.getTimeMillis()) + }) + .build[String, ExecutorDecommissionInfo]() + .asMap() def runningTasksByExecutors: Map[String, Int] = synchronized { executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap @@ -910,7 +927,7 @@ private[spark] class TaskSchedulerImpl( // if we heard isHostDecommissioned ever true, then we keep that one since it is // most likely coming from the cluster manager and thus authoritative val oldDecomInfo = executorsPendingDecommission.get(executorId) - if (oldDecomInfo.isEmpty || !oldDecomInfo.get.isHostDecommissioned) { + if (!oldDecomInfo.exists(_.isHostDecommissioned)) { executorsPendingDecommission(executorId) = decommissionInfo } } @@ -921,7 +938,9 @@ private[spark] class TaskSchedulerImpl( override def getExecutorDecommissionInfo(executorId: String) : Option[ExecutorDecommissionInfo] = synchronized { - executorsPendingDecommission.get(executorId) + executorsPendingDecommission + .get(executorId) + .orElse(Option(decommissionedExecutorsRemoved.get(executorId))) } override def executorLost(executorId: String, givenReason: ExecutorLossReason): Unit = { @@ -1027,7 +1046,9 @@ private[spark] class TaskSchedulerImpl( } } - executorsPendingDecommission -= executorId + + val decomInfo = executorsPendingDecommission.remove(executorId) + decomInfo.foreach(decommissionedExecutorsRemoved.put(executorId, _)) if (reason != LossReasonPending) { executorIdToHost -= executorId diff --git a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala index ee9a6be03868..90b77a21ad02 100644 --- a/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/DecommissionWorkerSuite.scala @@ -84,6 +84,19 @@ class DecommissionWorkerSuite } } + // Unlike TestUtils.withListener, it also waits for the job to be done + def withListener(sc: SparkContext, listener: RootStageAwareListener) + (body: SparkListener => Unit): Unit = { + sc.addSparkListener(listener) + try { + body(listener) + sc.listenerBus.waitUntilEmpty() + listener.waitForJobDone() + } finally { + sc.listenerBus.removeListener(listener) + } + } + test("decommission workers should not result in job failure") { val maxTaskFailures = 2 val numTimesToKillWorkers = maxTaskFailures + 1 @@ -109,7 +122,7 @@ class DecommissionWorkerSuite } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 1, 1).map { _ => Thread.sleep(5 * 1000L); 1 }.count() @@ -164,7 +177,7 @@ class DecommissionWorkerSuite } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((pid, _) => { val sleepTimeSeconds = if (pid == 0) 1 else 10 Thread.sleep(sleepTimeSeconds * 1000L) @@ -190,10 +203,11 @@ class DecommissionWorkerSuite } } - test("decommission workers ensure that fetch failures lead to rerun") { + def testFetchFailures(initialSleepMillis: Int): Unit = { createWorkers(2) sc = createSparkContext( config.Tests.TEST_NO_STAGE_RETRY.key -> "false", + "spark.test.executor.decommission.initial.sleep.millis" -> initialSleepMillis.toString, config.UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE.key -> "true") val executorIdToWorkerInfo = getExecutorToWorkerAssignments val executorToDecom = executorIdToWorkerInfo.keysIterator.next @@ -212,22 +226,29 @@ class DecommissionWorkerSuite override def handleRootTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { val taskInfo = taskEnd.taskInfo if (taskInfo.executorId == executorToDecom && taskInfo.attemptNumber == 0 && - taskEnd.stageAttemptId == 0) { + taskEnd.stageAttemptId == 0 && taskEnd.stageId == 0) { decommissionWorkerOnMaster(workerToDecom, "decommission worker after task on it is done") } } } - TestUtils.withListener(sc, listener) { _ => + withListener(sc, listener) { _ => val jobResult = sc.parallelize(1 to 2, 2).mapPartitionsWithIndex((_, _) => { val executorId = SparkEnv.get.executorId - val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 - Thread.sleep(sleepTimeSeconds * 1000L) + val context = TaskContext.get() + // Only sleep in the first attempt to create the required window for decommissioning. + // Subsequent attempts don't need to be delayed to speed up the test. + if (context.attemptNumber() == 0 && context.stageAttemptNumber() == 0) { + val sleepTimeSeconds = if (executorId == executorToDecom) 10 else 1 + Thread.sleep(sleepTimeSeconds * 1000L) + } List(1).iterator }, preservesPartitioning = true) .repartition(1).mapPartitions(iter => { val context = TaskContext.get() if (context.attemptNumber == 0 && context.stageAttemptNumber() == 0) { + // Wait a bit for the decommissioning to be triggered in the listener + Thread.sleep(5000) // MapIndex is explicitly -1 to force the entire host to be decommissioned // However, this will cause both the tasks in the preceding stage since the host here is // "localhost" (shortcoming of this single-machine unit test in that all the workers @@ -246,6 +267,14 @@ class DecommissionWorkerSuite assert(tasksSeen.size === 6, s"Expected 6 tasks but got $tasksSeen") } + test("decommission stalled workers ensure that fetch failures lead to rerun") { + testFetchFailures(3600 * 1000) + } + + test("decommission eager workers ensure that fetch failures lead to rerun") { + testFetchFailures(0) + } + private abstract class RootStageAwareListener extends SparkListener { private var rootStageId: Option[Int] = None private val tasksFinished = new ConcurrentLinkedQueue[String]() @@ -265,6 +294,7 @@ class DecommissionWorkerSuite override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { jobEnd.jobResult match { case JobSucceeded => jobDone.set(true) + case JobFailed(exception) => logError(s"Job failed", exception) } } @@ -272,7 +302,15 @@ class DecommissionWorkerSuite protected def handleRootTaskStart(start: SparkListenerTaskStart) = {} + private def getSignature(taskInfo: TaskInfo, stageId: Int, stageAttemptId: Int): + String = { + s"${stageId}:${stageAttemptId}:" + + s"${taskInfo.index}:${taskInfo.attemptNumber}-${taskInfo.status}" + } + override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + val signature = getSignature(taskStart.taskInfo, taskStart.stageId, taskStart.stageAttemptId) + logInfo(s"Task started: $signature") if (isRootStageId(taskStart.stageId)) { rootTasksStarted.add(taskStart.taskInfo) handleRootTaskStart(taskStart) @@ -280,8 +318,7 @@ class DecommissionWorkerSuite } override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = { - val taskSignature = s"${taskEnd.stageId}:${taskEnd.stageAttemptId}:" + - s"${taskEnd.taskInfo.index}:${taskEnd.taskInfo.attemptNumber}" + val taskSignature = getSignature(taskEnd.taskInfo, taskEnd.stageId, taskEnd.stageAttemptId) logInfo(s"Task End $taskSignature") tasksFinished.add(taskSignature) if (isRootStageId(taskEnd.stageId)) { @@ -291,8 +328,13 @@ class DecommissionWorkerSuite } def getTasksFinished(): Seq[String] = { - assert(jobDone.get(), "Job isn't successfully done yet") - tasksFinished.asScala.toSeq + tasksFinished.asScala.toList + } + + def waitForJobDone(): Unit = { + eventually(timeout(10.seconds), interval(100.milliseconds)) { + assert(jobDone.get(), "Job isn't successfully done yet") + } } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index e5836458e7f9..66379d86f9be 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{Clock, ManualClock, SystemClock} class FakeSchedulerBackend extends SchedulerBackend { def start(): Unit = {} @@ -88,10 +88,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } def setupSchedulerWithMaster(master: String, confs: (String, String)*): TaskSchedulerImpl = { + setupSchedulerWithMasterAndClock(master, new SystemClock, confs: _*) + } + + def setupSchedulerWithMasterAndClock(master: String, clock: Clock, confs: (String, String)*): + TaskSchedulerImpl = { val conf = new SparkConf().setMaster(master).setAppName("TaskSchedulerImplSuite") confs.foreach { case (k, v) => conf.set(k, v) } sc = new SparkContext(conf) - taskScheduler = new TaskSchedulerImpl(sc) + taskScheduler = new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES), clock = clock) setupHelper() } @@ -1802,9 +1807,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(2 == taskDescriptions.head.resources(GPU).addresses.size) } - private def setupSchedulerForDecommissionTests(): TaskSchedulerImpl = { - val taskScheduler = setupSchedulerWithMaster( + private def setupSchedulerForDecommissionTests(clock: Clock): TaskSchedulerImpl = { + val taskScheduler = setupSchedulerWithMasterAndClock( s"local[2]", + clock, config.CPUS_PER_TASK.key -> 1.toString) taskScheduler.submitTasks(FakeTask.createTaskSet(2)) val multiCoreWorkerOffers = IndexedSeq(WorkerOffer("executor0", "host0", 1), @@ -1815,7 +1821,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B } test("scheduler should keep the decommission info where host was decommissioned") { - val scheduler = setupSchedulerForDecommissionTests() + val scheduler = setupSchedulerForDecommissionTests(new SystemClock) scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("0", false)) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("1", true)) @@ -1829,8 +1835,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(scheduler.getExecutorDecommissionInfo("executor2").isEmpty) } - test("scheduler should ignore decommissioning of removed executors") { - val scheduler = setupSchedulerForDecommissionTests() + test("scheduler should eventually purge removed and decommissioned executors") { + val clock = new ManualClock(10000L) + val scheduler = setupSchedulerForDecommissionTests(clock) // executor 0 is decommissioned after loosing assert(scheduler.getExecutorDecommissionInfo("executor0").isEmpty) @@ -1839,14 +1846,27 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B scheduler.executorDecommission("executor0", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionInfo("executor0").isEmpty) + assert(scheduler.executorsPendingDecommission.isEmpty) + clock.advance(5000) + // executor 1 is decommissioned before loosing assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) + clock.advance(2000) scheduler.executorLost("executor1", ExecutorExited(0, false, "normal")) - assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) + assert(scheduler.executorsPendingDecommission.isEmpty) + clock.advance(2000) + // It hasn't been 60 seconds yet before removal + assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) scheduler.executorDecommission("executor1", ExecutorDecommissionInfo("", false)) + clock.advance(2000) + assert(scheduler.decommissionedExecutorsRemoved.size === 1) + assert(scheduler.getExecutorDecommissionInfo("executor1").isDefined) + clock.advance(301000) assert(scheduler.getExecutorDecommissionInfo("executor1").isEmpty) + assert(scheduler.decommissionedExecutorsRemoved.isEmpty) } /**