Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1178,19 +1178,27 @@ class DAGScheduler(
event.taskInfo.attemptNumber, // this is a task attempt number
event.reason)

if (!stageIdToStage.contains(task.stageId)) {
// The stage may have already finished when we get this event -- eg. maybe it was a
val stageOpt = stageIdToStage.get(task.stageId)

if (stageOpt.isEmpty || (failedStages.contains(stageOpt.get) && event.reason == Success)) {
// The stage may have already finished or failed when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
val msg = if (stageOpt.isEmpty) {
"have already finished"
} else {
s"${stageOpt.get} have been marked as failed"
}
logWarning(s"Ignoring task $task because of stage $msg")
postTaskEnd(event)

// Skip all the actions if the stage has been cancelled.
// Skip all the actions if the stage has been cancelled or failed.
return
}

val stage = stageIdToStage(task.stageId)
val stage = stageOpt.get

// Make sure the task's accumulators are updated before any other processing happens, so that
// we can post a task end event before any jobs or stages are updated. The accumulators are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private[spark] class TaskSetManager(
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)

// Set the coresponding index of Boolean var when the task killed by other attempt tasks,
// Set the corresponding index of Boolean var when the task killed by other attempt tasks,
// this happened while we set the `spark.speculation` to true. The task killed by others
// should not resubmit while executor lost.
private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2399,6 +2399,91 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
}

/**
* This tests the case where origin task success after speculative task got FetchFailed
* before.
*/
test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " +
"successful tasks") {
// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
val shuffleIdA = shuffleDepA.shuffleId

val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))

val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)

submit(rddC, Array(0, 1))

// Complete both tasks in rddA.
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))

// The first task success
runEvent(makeCompletionEvent(
taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))

// The second task's speculative attempt fails first, but task self still running.
// This may caused by ExecutorLost.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I'm not very familiar with this test suite, how can you tell it's a speculative task?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we only need to mock the speculative task failed event came before success event, makeCompletionEvent with same taskSets's task can achieve such goal. This also use in task events always posted in speculation / when stage is killed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, you can runEvent(SpeculativeTaskSubmitted) first to simulate a speculative task submitted before you runEvent(makeCompletetionEvent()).

FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"),
null))
// Check currently missing partition.
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1)
// The second result task self success soon.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
// Missing partition number should not change, otherwise it will cause child stage
// never succeed.
assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1)
}

test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " +
"successful tasks") {
val rddA = new MyRDD(sc, 2, Nil)
val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
val shuffleIdA = shuffleDepA.shuffleId
val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
submit(rddB, Array(0, 1))

// Complete both tasks in rddA.
assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))

// The first task of rddB success
assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]])
runEvent(makeCompletionEvent(
taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2)))

// The second task's speculative attempt fails first, but task self still running.
// This may caused by ExecutorLost.
runEvent(makeCompletionEvent(
taskSets(1).tasks(1),
FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"),
null))
// Make sure failedStage is not empty now
assert(scheduler.failedStages.nonEmpty)
// The second result task self success soon.
assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]])
// This task success event will be ignored by DAGScheduler
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
// Resubmit failed stage and success finally
scheduler.resubmitFailedStages()
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, makeMapStatus("hostB", 2)))
runEvent(makeCompletionEvent(
taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2)))
assertDataStructuresEmpty()
}

/**
* Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
* Note that this checks only the host and not the executor ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(resubmittedTasks === 0)
}


test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") {
val conf = new SparkConf().set("spark.speculation", "true")
sc = new SparkContext("local", "test", conf)
Expand Down