diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a0e84b94735ec..a302f680a272e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1107,10 +1107,19 @@ private[spark] class TaskSetManager( def recomputeLocality(): Unit = { // A zombie TaskSetManager may reach here while executorLost happens if (isZombie) return + val previousLocalityIndex = currentLocalityIndex val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) + val previousMyLocalityLevels = myLocalityLevels myLocalityLevels = computeValidLocalityLevels() localityWaits = myLocalityLevels.map(getLocalityWait) currentLocalityIndex = getLocalityIndex(previousLocalityLevel) + if (currentLocalityIndex > previousLocalityIndex) { + // SPARK-31837: If the new level is more local, shift to the new most local locality + // level in terms of better data locality. For example, say the previous locality + // levels are [PROCESS, NODE, ANY] and current level is ANY. After recompute, the + // locality levels are [PROCESS, NODE, RACK, ANY]. Then, we'll shift to RACK level. + currentLocalityIndex = getLocalityIndex(myLocalityLevels.diff(previousMyLocalityLevels).head) + } } def executorAdded(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 54899bfcf34fa..01c82f894cf98 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -276,9 +276,6 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with test("SPARK-31485: barrier stage should fail if only partial tasks are launched") { initLocalClusterSparkContext(2) - // It's required to reset the delay timer when a task is scheduled, otherwise all the tasks - // could get scheduled at ANY level. - sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true) val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2) val dep = new OneToOneDependency[Int](rdd0) // set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index a8541cb863478..a75bae56229b4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1208,7 +1208,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("SPARK-16106 locality levels updated if executor added to existing host") { val taskScheduler = setupScheduler() - taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1))) taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0, (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* )) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 4978be3e04c1e..e4aad58d25064 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -620,7 +620,7 @@ class TaskSetManagerSuite manager.executorAdded() sched.addExecutor("execC", "host2") manager.executorAdded() - assert(manager.resourceOffer("exec1", "host1", ANY)._1.isDefined) + assert(manager.resourceOffer("execB", "host1", ANY)._1.isDefined) sched.removeExecutor("execA") manager.executorLost( "execA", @@ -634,6 +634,25 @@ class TaskSetManagerSuite assert(sched.taskSetsFailed.contains(taskSet.id)) } + test("SPARK-31837: Shift to the new highest locality level if there is when recomputeLocality") { + sc = new SparkContext("local", "test") + sched = new FakeTaskScheduler(sc) + val taskSet = FakeTask.createTaskSet(2, + Seq(TaskLocation("host1", "execA")), + Seq(TaskLocation("host1", "execA"))) + val clock = new ManualClock() + val manager = new TaskSetManager(sched, taskSet, 1, clock = clock) + // before any executors are added to TaskScheduler, the manager's + // locality level only has ANY, so tasks can be scheduled anyway. + assert(manager.resourceOffer("execB", "host2", ANY)._1.isDefined) + sched.addExecutor("execA", "host1") + manager.executorAdded() + // after adding a new executor, the manager locality has PROCESS_LOCAL, NODE_LOCAL, ANY. + // And we'll shift to the new highest locality level, which is PROCESS_LOCAL in this case. + assert(manager.resourceOffer("execC", "host3", ANY)._1.isEmpty) + assert(manager.resourceOffer("execA", "host1", ANY)._1.isDefined) + } + test("test RACK_LOCAL tasks") { // Assign host1 to rack1 FakeRackUtil.assignHostToRack("host1", "rack1")