Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1569,24 +1569,45 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assertDataStructuresEmpty()
}

test("run trivial shuffle with out-of-band failure and retry") {
/**
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from a task that ran on that executor. We want to make sure the
* stage is resubmitted so that the task that ran on the failed executor is re-executed, and
* that the stage is only marked as finished once that task completes.
*/
test("run trivial shuffle with out-of-band executor failure and retry") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep), tracker = mapOutputTracker)
submit(reduceRdd, Array(0))
// blockManagerMaster.removeExecutor("exec-hostA")
// pretend we were told hostA went away
// Tell the DAGScheduler that hostA was lost.
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
// rather than marking it is as failed and waiting.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))

// At this point, no more tasks are running for the stage (and the TaskSetManager considers the
// stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler
// should re-submit the stage with one task (the task that originally ran on HostA).
assert(taskSets.size === 2)
assert(taskSets(1).tasks.size === 1)

// Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce
// stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on
// alive executors).
assert(taskSets(1).tasks(0).isInstanceOf[ShuffleMapTask])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think its worth adding

assert(taskSets(1).tasks.size === 1)

here, to make sure that only the one task is resubmitted, not both? If it weren't true, the test would fail later on anyway, but it might be helpful to get a more meaningful earlier error msg. Not necessary, up to you on whether its worth adding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea done


// have hostC complete the resubmitted task
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))

// Make sure that the reduce stage was now submitted.
assert(taskSets.size === 3)
assert(taskSets(2).tasks(0).isInstanceOf[ResultTask[_, _]])

// Complete the reduce stage.
complete(taskSets(2), Seq((Success, 42)))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
Expand Down Expand Up @@ -2031,6 +2052,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
* In this test, we run a map stage where one of the executors fails but we still receive a
* "zombie" complete message from that executor. We want to make sure the stage is not reported
* as done until all tasks have completed.
*
* Most of the functionality in this test is tested in "run trivial shuffle with out-of-band
* executor failure and retry". However, that test uses ShuffleMapStages that are followed by
* a ResultStage, whereas in this test, the ShuffleMapStage is tested in isolation, without a
* ResultStage after it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have looked closer at this test earlier ... I was hoping this was testing a multi-stage mapjob. That would really be a better test. ideally we'd even have three stages, with a failure happening in the second stage, and the last stage.

In any case, your changes still look good, no need to have to do those other things now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original point of this test was to test a Map-only stage, so then it can't have a stage that follows it, right? I thought your earlier comment ("That is testing directly submitting mapstage jobs. While a lot of the logic is the same, I think its worth its own test (even earlier versions of #16620 had the behavior wrong for these jobs)." was saying that it was important / useful to have this map-only test. Let me know if that comment was based on the understanding that this tested multi-stage jobs and you think I should just remove this map-only test.

I do agree that it would be useful to add another test that tests a job with more stages, which seems like it could reveal more bugs, but I'll hold off on doing that in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the final map-stage can't have anything that follows it, but the job overall can still have multiple stages, and the failure can occur during the processing of those earlier map-stages, or the final one.

In any case, I agree you don't need to expand that test in this PR. and even though this test doesn't do as much as I was hoping, I do still think it adds value, and is worth leaving in, even though its very similar to the other test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that makes sense.

*/
test("map stage submission with executor failure late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 3, Nil)
Expand All @@ -2042,7 +2068,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
runEvent(makeCompletionEvent(oldTaskSet.tasks(0), Success, makeMapStatus("hostA", 2)))
assert(results.size === 0) // Map stage job should not be complete yet

// Pretend host A was lost
// Pretend host A was lost. This will cause the TaskSetManager to resubmit task 0, because it
// completed on hostA.
val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA", ExecutorKilled))
val newEpoch = mapOutputTracker.getEpoch
Expand All @@ -2054,13 +2081,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou

// A completion from another task should work because it's a non-failed host
runEvent(makeCompletionEvent(oldTaskSet.tasks(2), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet

// At this point, no more tasks are running for the stage (and the TaskSetManager considers
// the stage complete), but the task that ran on hostA needs to be re-run, so the map stage
// shouldn't be marked as complete, and the DAGScheduler should re-submit the stage.
assert(results.size === 0)
assert(taskSets.size === 2)

// Now complete tasks in the second task set
val newTaskSet = taskSets(1)
assert(newTaskSet.tasks.size === 2) // Both tasks 0 and 1 were on hostA
// 2 tasks should have been re-submitted, for tasks 0 and 1 (which ran on hostA).
assert(newTaskSet.tasks.size === 2)
// Complete task 0 from the original task set (i.e., not hte one that's currently active).
// This should still be counted towards the job being complete (but there's still one
// outstanding task).
runEvent(makeCompletionEvent(newTaskSet.tasks(0), Success, makeMapStatus("hostB", 2)))
assert(results.size === 0) // Map stage job should not be complete yet
assert(results.size === 0)

// Complete the final task, from the currently active task set. There's still one
// running task, task 0 in the currently active stage attempt, but the success of task 0 means
// the DAGScheduler can mark the stage as finished.
runEvent(makeCompletionEvent(newTaskSet.tasks(1), Success, makeMapStatus("hostB", 2)))
assert(results.size === 1) // Map stage job should now finally be complete
assertDataStructuresEmpty()
Expand Down