diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 8c46a8432339..e09300df7bea 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1178,19 +1178,27 @@ class DAGScheduler( event.taskInfo.attemptNumber, // this is a task attempt number event.reason) - if (!stageIdToStage.contains(task.stageId)) { - // The stage may have already finished when we get this event -- eg. maybe it was a + val stageOpt = stageIdToStage.get(task.stageId) + + if (stageOpt.isEmpty || (failedStages.contains(stageOpt.get) && event.reason == Success)) { + // The stage may have already finished or failed when we get this event -- eg. maybe it was a // speculative task. It is important that we send the TaskEnd event in any case, so listeners // are properly notified and can chose to handle it. For instance, some listeners are // doing their own accounting and if they don't get the task end event they think // tasks are still running when they really aren't. + val msg = if (stageOpt.isEmpty) { + "have already finished" + } else { + s"${stageOpt.get} have been marked as failed" + } + logWarning(s"Ignoring task $task because of stage $msg") postTaskEnd(event) - // Skip all the actions if the stage has been cancelled. + // Skip all the actions if the stage has been cancelled or failed. return } - val stage = stageIdToStage(task.stageId) + val stage = stageOpt.get // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index d958658527f6..1ba9273887b8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -82,7 +82,7 @@ private[spark] class TaskSetManager( val successful = new Array[Boolean](numTasks) private val numFailures = new Array[Int](numTasks) - // Set the coresponding index of Boolean var when the task killed by other attempt tasks, + // Set the corresponding index of Boolean var when the task killed by other attempt tasks, // this happened while we set the `spark.speculation` to true. The task killed by others // should not resubmit while executor lost. private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index d812b5bd92c1..c66c03bc9e78 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -2399,6 +2399,91 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + + "successful tasks") { + // Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + + val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + + submit(rddC, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // The first task success + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + + // The second task's speculative attempt fails first, but task self still running. + // This may caused by ExecutorLost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) + // Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + // The second result task self success soon. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + // Missing partition number should not change, otherwise it will cause child stage + // never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + + "successful tasks") { + val rddA = new MyRDD(sc, 2, Nil) + val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) + val shuffleIdA = shuffleDepA.shuffleId + val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) + submit(rddB, Array(0, 1)) + + // Complete both tasks in rddA. + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2)))) + + // The first task of rddB success + assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) + runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + + // The second task's speculative attempt fails first, but task self still running. + // This may caused by ExecutorLost. + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) + // Make sure failedStage is not empty now + assert(scheduler.failedStages.nonEmpty) + // The second result task self success soon. + assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) + // This task success event will be ignored by DAGScheduler + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + // Resubmit failed stage and success finally + scheduler.resubmitFailedStages() + runEvent(makeCompletionEvent( + taskSets(0).tasks(0), Success, makeMapStatus("hostB", 2))) + runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) + assertDataStructuresEmpty() + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID. diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index ca6a7e5db3b1..6013e1802695 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -745,7 +745,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(resubmittedTasks === 0) } - test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") { val conf = new SparkConf().set("spark.speculation", "true") sc = new SparkContext("local", "test", conf)