diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 6787250ddc3f..72c22d903737 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1221,7 +1221,6 @@ private[spark] class DAGScheduler( s"(available: ${stage.isAvailable}," + s"available outputs: ${stage.numAvailableOutputs}," + s"partitions: ${stage.numPartitions})") - markMapStageJobsAsFinished(stage) case stage : ResultStage => logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})") } @@ -1402,7 +1401,7 @@ private[spark] class DAGScheduler( shuffleStage.pendingPartitions -= task.partitionId } - if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) { + if (shuffleStage.pendingPartitions.isEmpty) { markStageAsFinished(shuffleStage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 56ba23c38af7..bd5609d78b15 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.util.Properties +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import scala.annotation.meta.param @@ -2246,58 +2247,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assertDataStructuresEmpty() } - test("Trigger mapstage's job listener in submitMissingTasks") { - val rdd1 = new MyRDD(sc, 2, Nil) - val dep1 = new ShuffleDependency(rdd1, new HashPartitioner(2)) - val rdd2 = new MyRDD(sc, 2, List(dep1), tracker = mapOutputTracker) - val dep2 = new ShuffleDependency(rdd2, new HashPartitioner(2)) - - val listener1 = new SimpleListener - val listener2 = new SimpleListener - - submitMapStage(dep1, listener1) - submitMapStage(dep2, listener2) - - // Complete the stage0. - assert(taskSets(0).stageId === 0) - complete(taskSets(0), Seq( - (Success, makeMapStatus("hostA", rdd1.partitions.length)), - (Success, makeMapStatus("hostB", rdd1.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - assert(listener1.results.size === 1) - - // When attempting stage1, trigger a fetch failure. - assert(taskSets(1).stageId === 1) - complete(taskSets(1), Seq( - (Success, makeMapStatus("hostC", rdd2.partitions.length)), - (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) - scheduler.resubmitFailedStages() - // Stage1 listener should not have a result yet - assert(listener2.results.size === 0) - - // Speculative task succeeded in stage1. - runEvent(makeCompletionEvent( - taskSets(1).tasks(1), - Success, - makeMapStatus("hostD", rdd2.partitions.length))) - // stage1 listener still should not have a result, though there's no missing partitions - // in it. Because stage1 has been failed and is not inside `runningStages` at this moment. - assert(listener2.results.size === 0) - - // Stage0 should now be running as task set 2; make its task succeed - assert(taskSets(2).stageId === 0) - complete(taskSets(2), Seq( - (Success, makeMapStatus("hostC", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === - Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) - - // After stage0 is finished, stage1 will be submitted and found there is no missing - // partitions in it. Then listener got triggered. - assert(listener2.results.size === 1) - assertDataStructuresEmpty() - } - /** * In this test, we run a map stage where one of the executors fails but we still receive a * "zombie" complete message from that executor. We want to make sure the stage is not reported @@ -2627,6 +2576,39 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(countSubmittedMapStageAttempts() === 2) } + test("SPARK-25211 speculation and fetch failed result in hang of job") { + val shuffleMapRDD1 = new MyRDD(sc, 1, Nil) + val dep1 = new ShuffleDependency(shuffleMapRDD1, new HashPartitioner(2)) + val shuffleMapRDD2 = new MyRDD(sc, 2, List(dep1)) + val dep2 = new ShuffleDependency(shuffleMapRDD2, new HashPartitioner(2)) + + val jobId = scheduler.nextJobId.get() + val waiter = new JobWaiter(scheduler, jobId, 1, (_: Int, _: MapOutputStatistics) => {}) + val realJobId = submitMapStage(dep2, waiter) + assert(waiter.jobId === realJobId) + assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) + complete(taskSets(0), + (Success, makeMapStatus("hostA", dep1.partitioner.numPartitions)) :: Nil + ) + assert(taskSets(1).stageId === 1 && taskSets(1).stageAttemptId === 0) + complete(taskSets(1), Seq( + (Success, makeMapStatus("hostB", dep2.partitioner.numPartitions)), + (FetchFailed(makeBlockManagerId("hostA"), dep1.shuffleId, 0, 0, "ignored"), null))) + + // waiting for resubmitting of failed stages + TimeUnit.MILLISECONDS.sleep(DAGScheduler.RESUBMIT_TIMEOUT * 2) + assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1) + + // A speculated task finished + runEvent(makeCompletionEvent(taskSets(1).tasks(1), + Success, makeMapStatus("hostC", dep2.partitioner.numPartitions))) + assert(waiter.jobFinished) + + runEvent(makeCompletionEvent(taskSets(2).tasks(0), + Success, makeMapStatus("hostD", dep2.partitioner.numPartitions))) + assert(taskSets.size === 3) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID.