diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2e3e1cc9877d7..d327099197dc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -111,8 +111,9 @@ private[spark] class TaskSchedulerImpl( private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] // keyed by taskset - // value is true if the task set's locality wait timer was reset on the last resource offer - private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]() + // value is true if the task set has not rejected any resources due to locality + // since the timer was last reset + private val noRejectsSinceLastReset = new mutable.HashMap[TaskSet, Boolean]() private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) // Protected by `this` @@ -336,7 +337,7 @@ private[spark] class TaskSchedulerImpl( taskSetsByStageIdAndAttempt -= manager.taskSet.stageId } } - resetOnPreviousOffer -= manager.taskSet + noRejectsSinceLastReset -= manager.taskSet manager.parent.removeSchedulable(manager) logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + s" ${manager.parent.name}") @@ -615,13 +616,14 @@ private[spark] class TaskSchedulerImpl( } if (!legacyLocalityWaitReset) { - if (noDelaySchedulingRejects && launchedAnyTask) { - if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) { + if (noDelaySchedulingRejects) { + if (launchedAnyTask && + (isAllFreeResources || noRejectsSinceLastReset.getOrElse(taskSet.taskSet, true))) { taskSet.resetDelayScheduleTimer(globalMinLocality) - resetOnPreviousOffer.update(taskSet.taskSet, true) + noRejectsSinceLastReset.update(taskSet.taskSet, true) } } else { - resetOnPreviousOffer.update(taskSet.taskSet, false) + noRejectsSinceLastReset.update(taskSet.taskSet, false) } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 056c34278c1ea..a8541cb863478 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -296,6 +296,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B .flatten.isEmpty) } + test("SPARK-18886 - task set with no locality requirements should not starve one with them") { + val clock = new ManualClock() + // All tasks created here are local to exec1, host1. + // Locality level starts at PROCESS_LOCAL. + val taskScheduler = setupTaskSchedulerForLocalityTests(clock) + // Locality levels increase at 3000 ms. + val advanceAmount = 2000 + + val taskSet2 = FakeTask.createTaskSet(8, 2, 0) + taskScheduler.submitTasks(taskSet2) + + // Stage 2 takes resource since it has no locality requirements + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten + .headOption + .map(_.name) + .getOrElse("") + .contains("stage 2.0")) + + // Clock advances to 2s. No locality changes yet. + clock.advance(advanceAmount) + + // Stage 2 takes resource since it has no locality requirements + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten + .headOption + .map(_.name) + .getOrElse("") + .contains("stage 2.0")) + + // Simulates: + // 1. stage 2 has taken all resource offers through single resource offers + // 2. stage 1 is offered 0 cpus on allResourceOffer. + // This should not reset timer. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 0)), + isAllFreeResources = true) + .flatten.length === 0) + + // This should move stage 1 to NODE_LOCAL. + clock.advance(advanceAmount) + + // Stage 1 should now accept NODE_LOCAL resource. + assert(taskScheduler + .resourceOffers( + IndexedSeq(WorkerOffer("exec2", "host1", 1)), + isAllFreeResources = false) + .flatten + .headOption + .map(_.name) + .getOrElse("") + .contains("stage 1.1")) + } + test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " + "time if last full resource offer (isAllResources = true) was accepted as well as any " + "following partial resource offers") { @@ -306,12 +367,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B // Locality levels increase at 3000 ms. val advanceAmount = 3000 - // PROCESS_LOCAL full resource offer is accepted. + // PROCESS_LOCAL full resource offer is not rejected due to locality. + // It has 0 available cores, so no task is launched. + // Timer is reset and locality level remains at PROCESS_LOCAL. assert(taskScheduler .resourceOffers( - IndexedSeq(WorkerOffer("exec1", "host1", 1)), + IndexedSeq(WorkerOffer("exec1", "host1", 0)), isAllFreeResources = true) - .flatten.length === 1) + .flatten.length === 0) // Advancing clock increases locality level to NODE_LOCAL. clock.advance(advanceAmount)