Skip to content

Commit 270d841

Browse files
committed
Address review comments
1 parent fa5d9f1 commit 270d841

File tree

2 files changed

+7
-5
lines changed

2 files changed

+7
-5
lines changed

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ private[spark] class TaskSetManager(
245245
while (indexOffset > 0) {
246246
indexOffset -= 1
247247
val index = list(indexOffset)
248-
if (! executorIsBlacklisted(execId, index)) {
248+
if (!executorIsBlacklisted(execId, index)) {
249249
// This should almost always be list.trimEnd(1) to remove tail
250250
list.remove(indexOffset)
251251
if (copiesRunning(index) == 0 && !successful(index)) {
@@ -516,7 +516,7 @@ private[spark] class TaskSetManager(
516516
logWarning("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
517517
}
518518
var taskMetrics : TaskMetrics = null
519-
var failureReason = "unknown"
519+
var failureReason: String = null
520520
val addToFailedExecutor = () => {
521521
failedExecutors.getOrElseUpdate(index, new HashMap[String, Long]()).
522522
put(info.executorId, clock.getTime())
@@ -532,6 +532,7 @@ private[spark] class TaskSetManager(
532532
isZombie = true
533533

534534
case TaskKilled =>
535+
// Not adding to failed executors for TaskKilled.
535536
logWarning("Task %d was killed.".format(tid))
536537

537538
case ef: ExceptionFailure =>
@@ -547,6 +548,7 @@ private[spark] class TaskSetManager(
547548
val key = ef.description
548549
failureReason = "Exception failure in TID %s on host %s: %s".format(
549550
tid, info.host, ef.description)
551+
addToFailedExecutor()
550552
val now = clock.getTime()
551553
val (printFull, dupCount) = {
552554
if (recentExceptions.contains(key)) {
@@ -573,16 +575,17 @@ private[spark] class TaskSetManager(
573575

574576
case TaskResultLost =>
575577
failureReason = "Lost result for TID %s on host %s".format(tid, info.host)
578+
addToFailedExecutor()
576579
logWarning(failureReason)
577580

578581
case _ =>
579582
failureReason = "TID %s on host %s failed for unknown reason".format(tid, info.host)
583+
addToFailedExecutor()
580584
}
581-
// Add to failed for everything else.
582-
addToFailedExecutor()
583585
sched.dagScheduler.taskEnded(tasks(index), reason, null, null, info, taskMetrics)
584586
addPendingTask(index)
585587
if (!isZombie && state != TaskState.KILLED) {
588+
assert (null != failureReason)
586589
numFailures(index) += 1
587590
if (numFailures(index) >= maxTaskFailures) {
588591
logError("Task %s:%d failed %d times; aborting job".format(

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,6 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
311311
("exec1.1", "host1"), ("exec2", "host2"))
312312
// affinity to exec1 on host1 - which we will fail.
313313
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "exec1")))
314-
// we need actual time measurement
315314
val clock = new FakeClock
316315
val manager = new TaskSetManager(sched, taskSet, 4, clock)
317316

0 commit comments

Comments
 (0)