@@ -1046,22 +1046,22 @@ class DAGScheduler(
10461046
10471047 case FetchFailed (bmAddress, shuffleId, mapId, reduceId) =>
10481048 val failedStage = stageIdToStage(task.stageId)
1049- val mapStage = shuffleToMapStage(shuffleId)
10501049 // It is likely that we receive multiple FetchFailed for a single stage (because we have
10511050 // multiple tasks running concurrently on different executors). In that case, it is possible
10521051 // the fetch failure has already been handled by the scheduler.
10531052 if (runningStages.contains(failedStage)) {
10541053 markStageAsFinished(failedStage, Some (" Fetch failure" ))
10551054 runningStages -= failedStage
1056- // TODO: Cancel running tasks in the stage
1057- logInfo(s " Marking $failedStage ( ${failedStage.name}) for resubmision " +
1058- s " due to a fetch failure from $mapStage ( ${mapStage.name}" )
10591055 }
10601056
1057+ val mapStage = shuffleToMapStage(shuffleId)
10611058 if (failedStages.isEmpty && eventProcessActor != null ) {
10621059 // Don't schedule an event to resubmit failed stages if failed isn't empty, because
10631060 // in that case the event will already have been scheduled. eventProcessActor may be
10641061 // null during unit tests.
1062+ // TODO: Cancel running tasks in the stage
1063+ logInfo(s " Marking $failedStage ( ${failedStage.name}) for resubmision " +
1064+ s " due to a fetch failure from $mapStage ( ${mapStage.name}" )
10651065 import env .actorSystem .dispatcher
10661066 env.actorSystem.scheduler.scheduleOnce(
10671067 RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
0 commit comments