Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
it.next.asInstanceOf[Tuple2[_, _]]._1

/** Send the given CompletionEvent messages for the tasks in the TaskSet. */
private def complete(taskSet: TaskSet, results: Seq[(TaskEndReason, Any)]): Unit = {
assert(taskSet.tasks.size >= results.size)
for ((result, i) <- results.zipWithIndex) {
private def complete(taskSet: TaskSet, taskEndInfos: Seq[(TaskEndReason, Any)]): Unit = {
assert(taskSet.tasks.size >= taskEndInfos.size)
for ((result, i) <- taskEndInfos.zipWithIndex) {
if (i < taskSet.tasks.size) {
runEvent(makeCompletionEvent(taskSet.tasks(i), result._1, result._2))
}
Expand Down Expand Up @@ -405,6 +405,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
runEvent(JobCancelled(jobId, None))
}

/** Make some tasks in task set success and check results. */
private def completeAndCheckAnswer(
taskSet: TaskSet,
taskEndInfos: Seq[(TaskEndReason, Any)],
expected: Map[Int, Any]): Unit = {
complete(taskSet, taskEndInfos)
assert(this.results === expected)
}

test("[SPARK-3353] parent stage should have lower stage id") {
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution
Expand Down Expand Up @@ -461,8 +470,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
completeShuffleMapStageSuccessfully(0, 0, 1)
completeShuffleMapStageSuccessfully(1, 0, 1)
completeShuffleMapStageSuccessfully(2, 0, 1)
complete(taskSets(3), Seq((Success, 42)))
assert(results === Map(0 -> 42))
completeAndCheckAnswer(taskSets(3), Seq((Success, 42)), Map(0 -> 42))
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -558,17 +566,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

test("run trivial job") {
submit(new MyRDD(sc, 1, Nil), Array(0))
complete(taskSets(0), List((Success, 42)))
assert(results === Map(0 -> 42))
completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42))
assertDataStructuresEmpty()
}

test("run trivial job w/ dependency") {
val baseRdd = new MyRDD(sc, 1, Nil)
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42))
assertDataStructuresEmpty()
}

Expand All @@ -592,8 +598,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
submit(finalRdd, Array(0))
val taskSet = taskSets(0)
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
complete(taskSet, Seq((Success, 42)))
assert(results === Map(0 -> 42))
completeAndCheckAnswer(taskSet, Seq((Success, 42)), Map(0 -> 42))
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -729,8 +734,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(failure === null)

// When the task set completes normally, state should be correctly updated.
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
completeAndCheckAnswer(taskSets(0), Seq((Success, 42)), Map(0 -> 42))
assertDataStructuresEmpty()

assert(sparkListener.failedStages.isEmpty)
Expand All @@ -746,8 +750,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
completeShuffleMapStageSuccessfully(0, 0, 1)
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
completeAndCheckAnswer(taskSets(1), Seq((Success, 42)), Map(0 -> 42))
assertDataStructuresEmpty()
}

Expand All @@ -771,8 +774,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// we can see both result blocks now
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet ===
HashSet("hostA", "hostB"))
complete(taskSets(3), Seq((Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
completeAndCheckAnswer(taskSets(3), Seq((Success, 43)), Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -1454,8 +1456,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))

// finish the next stage normally, which completes the job
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
assert(results === Map(0 -> 42, 1 -> 43))
completeAndCheckAnswer(taskSets(1), Seq((Success, 42), (Success, 43)), Map(0 -> 42, 1 -> 43))
assertDataStructuresEmpty()
}

Expand Down Expand Up @@ -1796,9 +1797,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi

// lets say there is a fetch failure in this task set, which makes us go back and
// run stage 0, attempt 1
complete(taskSets(1), Seq(
(FetchFailed(makeBlockManagerId("hostA"),
shuffleDep1.shuffleId, 0L, 0, 0, "ignored"), null)))
completeNextStageWithFetchFailure(1, 0, shuffleDep1)
scheduler.resubmitFailedStages()

// stage 0, attempt 1 should have the properties of job2
Expand Down Expand Up @@ -1872,9 +1871,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// have the second stage complete normally
completeShuffleMapStageSuccessfully(1, 0, 1, Seq("hostA", "hostC"))
// fail the third stage because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"),
shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null)))
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
// TODO assert this:
// blockManagerMaster.removeExecutor("hostA-exec")
// have DAGScheduler try again
Expand All @@ -1900,9 +1897,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// complete stage 1
completeShuffleMapStageSuccessfully(1, 0, 1)
// pretend stage 2 failed because hostA went down
complete(taskSets(2), Seq(
(FetchFailed(makeBlockManagerId("hostA"),
shuffleDepTwo.shuffleId, 0L, 0, 0, "ignored"), null)))
completeNextStageWithFetchFailure(2, 0, shuffleDepTwo)
// TODO assert this:
// blockManagerMaster.removeExecutor("hostA-exec")
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
Expand Down