Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ private[spark] class TaskSchedulerImpl(
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]

// keyed by taskset
// value is true if the task set's locality wait timer was reset on the last resource offer
private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]()
// value is true if the task set has not rejected any resources due to locality
// since the timer was last reset
private val noRejectsSinceLastReset = new mutable.HashMap[TaskSet, Boolean]()
private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)

// Protected by `this`
Expand Down Expand Up @@ -336,7 +337,7 @@ private[spark] class TaskSchedulerImpl(
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
}
}
resetOnPreviousOffer -= manager.taskSet
noRejectsSinceLastReset -= manager.taskSet
manager.parent.removeSchedulable(manager)
logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
s" ${manager.parent.name}")
Expand Down Expand Up @@ -615,13 +616,14 @@ private[spark] class TaskSchedulerImpl(
}

if (!legacyLocalityWaitReset) {
if (noDelaySchedulingRejects && launchedAnyTask) {
if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) {
if (noDelaySchedulingRejects) {
if (launchedAnyTask &&
(isAllFreeResources || noRejectsSinceLastReset.getOrElse(taskSet.taskSet, true))) {
taskSet.resetDelayScheduleTimer(globalMinLocality)
resetOnPreviousOffer.update(taskSet.taskSet, true)
noRejectsSinceLastReset.update(taskSet.taskSet, true)
}
} else {
resetOnPreviousOffer.update(taskSet.taskSet, false)
noRejectsSinceLastReset.update(taskSet.taskSet, false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,67 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
.flatten.isEmpty)
}

test("SPARK-18886 - task set with no locality requirements should not starve one with them") {
val clock = new ManualClock()
// All tasks created here are local to exec1, host1.
// Locality level starts at PROCESS_LOCAL.
val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
// Locality levels increase at 3000 ms.
val advanceAmount = 2000

val taskSet2 = FakeTask.createTaskSet(8, 2, 0)
taskScheduler.submitTasks(taskSet2)

// Stage 2 takes resource since it has no locality requirements
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten
.headOption
.map(_.name)
.getOrElse("")
.contains("stage 2.0"))

// Clock advances to 2s. No locality changes yet.
clock.advance(advanceAmount)

// Stage 2 takes resource since it has no locality requirements
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten
.headOption
.map(_.name)
.getOrElse("")
.contains("stage 2.0"))

// Simulates:
// 1. stage 2 has taken all resource offers through single resource offers
// 2. stage 1 is offered 0 cpus on allResourceOffer.
// This should not reset timer.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 0)),
isAllFreeResources = true)
.flatten.length === 0)

// This should move stage 1 to NODE_LOCAL.
clock.advance(advanceAmount)

// Stage 1 should now accept NODE_LOCAL resource.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten
.headOption
.map(_.name)
.getOrElse("")
.contains("stage 1.1"))
}

test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " +
"time if last full resource offer (isAllResources = true) was accepted as well as any " +
"following partial resource offers") {
Expand All @@ -306,12 +367,14 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// Locality levels increase at 3000 ms.
val advanceAmount = 3000

// PROCESS_LOCAL full resource offer is accepted.
// PROCESS_LOCAL full resource offer is not rejected due to locality.
// It has 0 available cores, so no task is launched.
// Timer is reset and locality level remains at PROCESS_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
IndexedSeq(WorkerOffer("exec1", "host1", 0)),
isAllFreeResources = true)
.flatten.length === 1)
.flatten.length === 0)

// Advancing clock increases locality level to NODE_LOCAL.
clock.advance(advanceAmount)
Expand Down