Skip to content

Commit ab747d3

Browse files
mridulmmateiz
authored andcommitted
Bugfixes/improvements to scheduler
Move the PR#517 of apache-incubator-spark to the apache-spark Author: Mridul Muralidharan <[email protected]> Closes #159 from mridulm/master and squashes the following commits: 5ff59c2 [Mridul Muralidharan] Change property in suite also 167fad8 [Mridul Muralidharan] Address review comments 9bda70e [Mridul Muralidharan] Address review comments, akwats add to failedExecutors 270d841 [Mridul Muralidharan] Address review comments fa5d9f1 [Mridul Muralidharan] Bugfixes/improvements to scheduler : PR #517
1 parent 6112270 commit ab747d3

File tree

3 files changed

+152
-20
lines changed

3 files changed

+152
-20
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,8 @@ private[spark] class TaskSchedulerImpl(
235235
taskIdToExecutorId(tid) = execId
236236
activeExecutorIds += execId
237237
executorsByHost(host) += execId
238-
availableCpus(i) -= 1
238+
availableCpus(i) -= taskSet.CPUS_PER_TASK
239+
assert (availableCpus(i) >= 0)
239240
launchedTask = true
240241
}
241242
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ private[spark] class TaskSetManager(
5959
// CPUs to request per task
6060
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
6161

62+
/*
63+
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
64+
* does not realize right away leading to repeated task failures. If enabled,
65+
* this temporarily prevents a task from re-launching on an executor where
66+
* it just failed.
67+
*/
68+
private val EXECUTOR_TASK_BLACKLIST_TIMEOUT =
69+
conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
70+
6271
// Quantile of tasks at which to start speculation
6372
val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
6473
val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5)
@@ -71,7 +80,9 @@ private[spark] class TaskSetManager(
7180
val numTasks = tasks.length
7281
val copiesRunning = new Array[Int](numTasks)
7382
val successful = new Array[Boolean](numTasks)
74-
val numFailures = new Array[Int](numTasks)
83+
private val numFailures = new Array[Int](numTasks)
84+
// key is taskId, value is a Map of executor id to when it failed
85+
private val failedExecutors = new HashMap[Int, HashMap[String, Long]]()
7586
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
7687
var tasksSuccessful = 0
7788

@@ -228,12 +239,18 @@ private[spark] class TaskSetManager(
228239
* This method also cleans up any tasks in the list that have already
229240
* been launched, since we want that to happen lazily.
230241
*/
231-
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
232-
while (!list.isEmpty) {
233-
val index = list.last
234-
list.trimEnd(1)
235-
if (copiesRunning(index) == 0 && !successful(index)) {
236-
return Some(index)
242+
private def findTaskFromList(execId: String, list: ArrayBuffer[Int]): Option[Int] = {
243+
var indexOffset = list.size
244+
245+
while (indexOffset > 0) {
246+
indexOffset -= 1
247+
val index = list(indexOffset)
248+
if (!executorIsBlacklisted(execId, index)) {
249+
// This should almost always be list.trimEnd(1) to remove tail
250+
list.remove(indexOffset)
251+
if (copiesRunning(index) == 0 && !successful(index)) {
252+
return Some(index)
253+
}
237254
}
238255
}
239256
None
@@ -244,6 +261,21 @@ private[spark] class TaskSetManager(
244261
taskAttempts(taskIndex).exists(_.host == host)
245262
}
246263

264+
/**
265+
* Is this re-execution of a failed task on an executor it already failed in before
266+
* EXECUTOR_TASK_BLACKLIST_TIMEOUT has elapsed ?
267+
*/
268+
private def executorIsBlacklisted(execId: String, taskId: Int): Boolean = {
269+
if (failedExecutors.contains(taskId)) {
270+
val failed = failedExecutors.get(taskId).get
271+
272+
return failed.contains(execId) &&
273+
clock.getTime() - failed.get(execId).get < EXECUTOR_TASK_BLACKLIST_TIMEOUT
274+
}
275+
276+
false
277+
}
278+
247279
/**
248280
* Return a speculative task for a given executor if any are available. The task should not have
249281
* an attempt running on this host, in case the host is slow. In addition, the task should meet
@@ -254,10 +286,13 @@ private[spark] class TaskSetManager(
254286
{
255287
speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
256288

289+
def canRunOnHost(index: Int): Boolean =
290+
!hasAttemptOnHost(index, host) && !executorIsBlacklisted(execId, index)
291+
257292
if (!speculatableTasks.isEmpty) {
258293
// Check for process-local or preference-less tasks; note that tasks can be process-local
259294
// on multiple nodes when we replicate cached blocks, as in Spark Streaming
260-
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
295+
for (index <- speculatableTasks if canRunOnHost(index)) {
261296
val prefs = tasks(index).preferredLocations
262297
val executors = prefs.flatMap(_.executorId)
263298
if (prefs.size == 0 || executors.contains(execId)) {
@@ -268,7 +303,7 @@ private[spark] class TaskSetManager(
268303

269304
// Check for node-local tasks
270305
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
271-
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
306+
for (index <- speculatableTasks if canRunOnHost(index)) {
272307
val locations = tasks(index).preferredLocations.map(_.host)
273308
if (locations.contains(host)) {
274309
speculatableTasks -= index
@@ -280,7 +315,7 @@ private[spark] class TaskSetManager(
280315
// Check for rack-local tasks
281316
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
282317
for (rack <- sched.getRackForHost(host)) {
283-
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
318+
for (index <- speculatableTasks if canRunOnHost(index)) {
284319
val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
285320
if (racks.contains(rack)) {
286321
speculatableTasks -= index
@@ -292,7 +327,7 @@ private[spark] class TaskSetManager(
292327

293328
// Check for non-local tasks
294329
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
295-
for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
330+
for (index <- speculatableTasks if canRunOnHost(index)) {
296331
speculatableTasks -= index
297332
return Some((index, TaskLocality.ANY))
298333
}
@@ -309,32 +344,32 @@ private[spark] class TaskSetManager(
309344
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
310345
: Option[(Int, TaskLocality.Value)] =
311346
{
312-
for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
347+
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
313348
return Some((index, TaskLocality.PROCESS_LOCAL))
314349
}
315350

316351
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
317-
for (index <- findTaskFromList(getPendingTasksForHost(host))) {
352+
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
318353
return Some((index, TaskLocality.NODE_LOCAL))
319354
}
320355
}
321356

322357
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
323358
for {
324359
rack <- sched.getRackForHost(host)
325-
index <- findTaskFromList(getPendingTasksForRack(rack))
360+
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
326361
} {
327362
return Some((index, TaskLocality.RACK_LOCAL))
328363
}
329364
}
330365

331366
// Look for no-pref tasks after rack-local tasks since they can run anywhere.
332-
for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
367+
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
333368
return Some((index, TaskLocality.PROCESS_LOCAL))
334369
}
335370

336371
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
337-
for (index <- findTaskFromList(allPendingTasks)) {
372+
for (index <- findTaskFromList(execId, allPendingTasks)) {
338373
return Some((index, TaskLocality.ANY))
339374
}
340375
}
@@ -460,6 +495,7 @@ private[spark] class TaskSetManager(
460495
logInfo("Ignorning task-finished event for TID " + tid + " because task " +
461496
index + " has already completed successfully")
462497
}
498+
failedExecutors.remove(index)
463499
maybeFinishTaskSet()
464500
}
465501

@@ -480,17 +516,19 @@ private[spark] class TaskSetManager(
480516
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
481517
}
482518
var taskMetrics : TaskMetrics = null
483-
var failureReason = "unknown"
519+
var failureReason: String = null
484520
reason match {
485521
case fetchFailed: FetchFailed =>
486522
logWarning("Loss was due to fetch failure from " + fetchFailed.bmAddress)
487523
if (!successful(index)) {
488524
successful(index) = true
489525
tasksSuccessful += 1
490526
}
527+
// Not adding to failed executors for FetchFailed.
491528
isZombie = true
492529

493530
case TaskKilled =>
531+
// Not adding to failed executors for TaskKilled.
494532
logWarning("Task %d was killed.".format(tid))
495533

496534
case ef: ExceptionFailure =>
@@ -504,7 +542,8 @@ private[spark] class TaskSetManager(
504542
return
505543
}
506544
val key = ef.description
507-
failureReason = "Exception failure: %s".format(ef.description)
545+
failureReason = "Exception failure in TID %s on host %s: %s".format(
546+
tid, info.host, ef.description)
508547
val now = clock.getTime()
509548
val (printFull, dupCount) = {
510549
if (recentExceptions.contains(key)) {
@@ -533,11 +572,16 @@ private[spark] class TaskSetManager(
533572
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
534573
logWarning(failureReason)
535574

536-
case _ => {}
575+
case _ =>
576+
failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
537577
}
578+
// always add to failed executors
579+
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
580+
put(info.executorId, clock.getTime())
538581
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
539582
addPendingTask(index)
540583
if (!isZombie && state != TaskState.KILLED) {
584+
assert (null != failureReason)
541585
numFailures(index) += 1
542586
if (numFailures(index) >= maxTaskFailures) {
543587
logError("Task %s:%d failed %d times; aborting job".format(

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,93 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
298298
}
299299
}
300300

301+
test("executors should be blacklisted after task failure, in spite of locality preferences") {
302+
val rescheduleDelay = 300L
303+
val conf = new SparkConf().
304+
set("spark.scheduler.executorTaskBlacklistTime", rescheduleDelay.toString).
305+
// dont wait to jump locality levels in this test
306+
set("spark.locality.wait", "0")
307+
308+
sc = new SparkContext("local", "test", conf)
309+
// two executors on same host, one on different.
310+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
311+
("exec1.1", "host1"), ("exec2", "host2"))
312+
// affinity to exec1 on host1 - which we will fail.
313+
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
314+
val clock = new FakeClock
315+
val manager = new TaskSetManager(sched, taskSet, 4, clock)
316+
317+
{
318+
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
319+
assert(offerResult.isDefined, "Expect resource offer to return a task")
320+
321+
assert(offerResult.get.index === 0)
322+
assert(offerResult.get.executorId === "exec1")
323+
324+
// Cause exec1 to fail : failure 1
325+
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
326+
assert(!sched.taskSetsFailed.contains(taskSet.id))
327+
328+
// Ensure scheduling on exec1 fails after failure 1 due to blacklist
329+
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
330+
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
331+
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
332+
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
333+
}
334+
335+
// Run the task on exec1.1 - should work, and then fail it on exec1.1
336+
{
337+
val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
338+
assert(offerResult.isDefined,
339+
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
340+
341+
assert(offerResult.get.index === 0)
342+
assert(offerResult.get.executorId === "exec1.1")
343+
344+
// Cause exec1.1 to fail : failure 2
345+
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
346+
assert(!sched.taskSetsFailed.contains(taskSet.id))
347+
348+
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
349+
assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
350+
}
351+
352+
// Run the task on exec2 - should work, and then fail it on exec2
353+
{
354+
val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
355+
assert(offerResult.isDefined, "Expect resource offer to return a task")
356+
357+
assert(offerResult.get.index === 0)
358+
assert(offerResult.get.executorId === "exec2")
359+
360+
// Cause exec2 to fail : failure 3
361+
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
362+
assert(!sched.taskSetsFailed.contains(taskSet.id))
363+
364+
// Ensure scheduling on exec2 fails after failure 3 due to blacklist
365+
assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
366+
}
367+
368+
// After reschedule delay, scheduling on exec1 should be possible.
369+
clock.advance(rescheduleDelay)
370+
371+
{
372+
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
373+
assert(offerResult.isDefined, "Expect resource offer to return a task")
374+
375+
assert(offerResult.get.index === 0)
376+
assert(offerResult.get.executorId === "exec1")
377+
378+
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
379+
380+
// Cause exec1 to fail : failure 4
381+
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
382+
}
383+
384+
// we have failed the same task 4 times now : task id should now be in taskSetsFailed
385+
assert(sched.taskSetsFailed.contains(taskSet.id))
386+
}
387+
301388
def createTaskResult(id: Int): DirectTaskResult[Int] = {
302389
val valueSer = SparkEnv.get.serializer.newInstance()
303390
new DirectTaskResult[Int](valueSer.serialize(id), mutable.Map.empty, new TaskMetrics)

0 commit comments

Comments
 (0)