Skip to content

Commit 6cf335d

Browse files
committed
Added a TaskSetManager unit test.
This test ensures that when there are no alive executors that satisfy a particular locality level, the TaskSetManager doesn't ever use that as the maximum allowed locality level (this optimization ensures that a job doesn't wait extra time in an attempt to satisfy a scheduling locality level that is impossible). @mateiz and @lirui-intel this unit test illustrates an issue with apache#892 (it fails with that patch). Author: Kay Ousterhout <[email protected]> Closes apache#1024 from kayousterhout/scheduler_unit_test and squashes the following commits: de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
1 parent 0cf6002 commit 6cf335d

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
141141
assert(sched.finishedManagers.contains(manager))
142142
}
143143

144+
test("skip unsatisfiable locality levels") {
145+
sc = new SparkContext("local", "test")
146+
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
147+
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
148+
val clock = new FakeClock
149+
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
150+
151+
// An executor that is not NODE_LOCAL should be rejected.
152+
assert(manager.resourceOffer("execC", "host2", ANY) === None)
153+
154+
// Because there are no alive PROCESS_LOCAL executors, the base locality level should be
155+
// NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
156+
// any of the locality wait timers expire.
157+
assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
158+
}
159+
144160
test("basic delay scheduling") {
145161
sc = new SparkContext("local", "test")
146162
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))

0 commit comments

Comments
 (0)