Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,17 @@ class DAGSchedulerSuite
* @param stageId - The current stageId
* @param attemptIdx - The current attempt count
*/
private def completeNextResultStageWithSuccess(stageId: Int, attemptIdx: Int): Unit = {
private def completeNextResultStageWithSuccess(
stageId: Int,
attemptIdx: Int,
partitionToResult: Int => Int = _ => 42): Unit = {
val stageAttempt = taskSets.last
checkStageId(stageId, attemptIdx, stageAttempt)
assert(scheduler.stageIdToStage(stageId).isInstanceOf[ResultStage])
complete(stageAttempt, stageAttempt.tasks.zipWithIndex.map(_ => (Success, 42)).toSeq)
val taskResults = stageAttempt.tasks.zipWithIndex.map { case (task, idx) =>
(Success, partitionToResult(idx))
}
complete(stageAttempt, taskResults.toSeq)
}

/**
Expand Down Expand Up @@ -1054,6 +1060,47 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

/**
* Run two jobs, with a shared dependency. We simulate a fetch failure in the second job, which
* requires regenerating some outputs of the shared dependency. One key aspect of this test is
* that the second job actually uses a different stage for the shared dependency (a "skipped"
* stage).
*/
test("shuffle fetch failure in a reused shuffle dependency") {
// Run the first job successfully, which creates one shuffle dependency

val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))

completeShuffleMapStageSuccessfully(0, 0, 2)
completeNextResultStageWithSuccess(1, 0)
assert(results === Map(0 -> 42, 1 -> 42))
assertDataStructuresEmpty()

// submit another job w/ the shared dependency, and have a fetch failure
val reduce2 = new MyRDD(sc, 2, List(shuffleDep))
submit(reduce2, Array(0, 1))
// Note that the stage numbering here is only b/c the shared dependency produces a new, skipped
// stage. If instead it reused the existing stage, then this would be stage 2
completeNextStageWithFetchFailure(3, 0, shuffleDep)
scheduler.resubmitFailedStages()

// the scheduler now creates a new task set to regenerate the missing map output, but this time
// using a different stage, the "skipped" one

// SPARK-9809 -- this stage is submitted without a task for each partition (because some of
// the shuffle map output is still available from stage 0); make sure we've still got internal
// accumulators setup
assert(scheduler.stageIdToStage(2).internalAccumulators.nonEmpty)
completeShuffleMapStageSuccessfully(2, 0, 2)
completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
assert(results === Map(0 -> 1234, 1 -> 1235))

assertDataStructuresEmpty()
}

/**
* This test runs a three stage job, with a fetch failure in stage 1. but during the retry, we
* have completions from both the first & second attempt of stage 1. So all the map output is
Expand Down