diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9d412f2dba3ce..51d20d3428915 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -349,9 +349,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi it.next.asInstanceOf[Tuple2[_, _]]._1 /** Send the given CompletionEvent messages for the tasks in the TaskSet. */ - private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]): Unit = { - assert(taskSet.tasks.size >= results.size) - for ((result, i) <- results.zipWithIndex) { + private def complete(taskSet: TaskSet, taskEndInfos: Seq[(TaskEndReason, Any)]): Unit = { + assert(taskSet.tasks.size >= taskEndInfos.size) + for ((result, i) <- taskEndInfos.zipWithIndex) { if (i < taskSet.tasks.size) { runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2)) } @@ -405,6 +405,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(JobCancelled(jobId, None)) } + /** Make some tasks in task set success and check results. */ + private def completeAndCheckAnswer( + taskSet: TaskSet, + taskEndInfos: Seq[(TaskEndReason, Any)], + expected: Map[Int, Any]): Unit = { + complete(taskSet, taskEndInfos) + assert(this.results === expected) + } + test("[SPARK-3353] parent stage should have lower stage id") { sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count() val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution @@ -461,8 +470,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 1) completeShuffleMapStageSuccessfully(1, 0, 1) completeShuffleMapStageSuccessfully(2, 0, 1) - complete(taskSets(3), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(3), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -558,8 +566,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi test("run trivial job") { submit(new MyRDD(sc, 1, Nil), Array(0)) - complete(taskSets(0), List((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -567,8 +574,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi val baseRdd = new MyRDD(sc, 1, Nil) val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0)) - complete(taskSets(0), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -592,8 +598,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(finalRdd, Array(0)) val taskSet = taskSets(0) assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) - complete(taskSet, Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSet, Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -729,8 +734,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(failure === null) // When the task set completes normally, state should be correctly updated. - complete(taskSets(0), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() assert(sparkListener.failedStages.isEmpty) @@ -746,8 +750,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi completeShuffleMapStageSuccessfully(0, 0, 1) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - complete(taskSets(1), Seq((Success, 42))) - assert(results === Map(0 -> 42)) + completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42)) assertDataStructuresEmpty() } @@ -771,8 +774,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // we can see both result blocks now assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === HashSet("hostA", "hostB")) - complete(taskSets(3), Seq((Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) + completeAndCheckAnswer(taskSets(3), Seq((Success, 43)), Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } @@ -1454,8 +1456,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) // finish the next stage normally, which completes the job - complete(taskSets(1), Seq((Success, 42), (Success, 43))) - assert(results === Map(0 -> 42, 1 -> 43)) + completeAndCheckAnswer(taskSets(1), Seq((Success, 42), (Success, 43)), Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() } @@ -1796,9 +1797,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // lets say there is a fetch failure in this task set, which makes us go back and // run stage 0, attempt 1 - complete(taskSets(1), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(1, 0, shuffleDep1) scheduler.resubmitFailedStages() // stage 0, attempt 1 should have the properties of job2 @@ -1872,9 +1871,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the second stage complete normally completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC")) // fail the third stage because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // have DAGScheduler try again @@ -1900,9 +1897,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // complete stage 1 completeShuffleMapStageSuccessfully(1, 0, 1) // pretend stage 2 failed because hostA went down - complete(taskSets(2), Seq( - (FetchFailed(makeBlockManagerId("hostA"), - shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null))) + completeNextStageWithFetchFailure(2, 0, shuffleDepTwo) // TODO assert this: // blockManagerMaster.removeExecutor("hostA-exec") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.