@@ -1045,31 +1045,37 @@ class DAGScheduler(
10451045 stage.pendingTasks += task
10461046
10471047 case FetchFailed (bmAddress, shuffleId, mapId, reduceId) =>
1048- // Mark the stage that the reducer was in as unrunnable
10491048 val failedStage = stageIdToStage(task.stageId)
1050- markStageAsFinished(failedStage, Some (" Fetch failure" ))
1051- runningStages -= failedStage
1052- // TODO: Cancel running tasks in the stage
1053- logInfo(" Marking " + failedStage + " (" + failedStage.name +
1054- " ) for resubmision due to a fetch failure" )
1055- // Mark the map whose fetch failed as broken in the map stage
1056- val mapStage = shuffleToMapStage(shuffleId)
1057- if (mapId != - 1 ) {
1058- mapStage.removeOutputLoc(mapId, bmAddress)
1059- mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1060- }
1061- logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1062- " ); marking it for resubmission" )
1063- if (failedStages.isEmpty && eventProcessActor != null ) {
1064- // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1065- // in that case the event will already have been scheduled. eventProcessActor may be
1066- // null during unit tests.
1067- import env .actorSystem .dispatcher
1068- env.actorSystem.scheduler.scheduleOnce(
1069- RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1049+ // It is likely that we receive multiple FetchFailed for a single stage (because we have
1050+ // multiple tasks running concurrently on different executors). In that case, it is possible
1051+ // the fetch failure has already been handled by the executor.
1052+ if (runningStages.contains(failedStage)) {
1053+ markStageAsFinished(failedStage, Some (" Fetch failure" ))
1054+ runningStages -= failedStage
1055+ // TODO: Cancel running tasks in the stage
1056+ logInfo(" Marking " + failedStage + " (" + failedStage.name +
1057+ " ) for resubmision due to a fetch failure" )
1058+
1059+ // Mark the map whose fetch failed as broken in the map stage
1060+ val mapStage = shuffleToMapStage(shuffleId)
1061+ if (mapId != - 1 ) {
1062+ mapStage.removeOutputLoc(mapId, bmAddress)
1063+ mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
1064+ }
1065+
1066+ logInfo(" The failed fetch was from " + mapStage + " (" + mapStage.name +
1067+ " ); marking it for resubmission" )
1068+ if (failedStages.isEmpty && eventProcessActor != null ) {
1069+ // Don't schedule an event to resubmit failed stages if failed isn't empty, because
1070+ // in that case the event will already have been scheduled. eventProcessActor may be
1071+ // null during unit tests.
1072+ import env .actorSystem .dispatcher
1073+ env.actorSystem.scheduler.scheduleOnce(
1074+ RESUBMIT_TIMEOUT , eventProcessActor, ResubmitFailedStages )
1075+ }
1076+ failedStages += failedStage
1077+ failedStages += mapStage
10701078 }
1071- failedStages += failedStage
1072- failedStages += mapStage
10731079 // TODO: mark the executor as failed only if there were lots of fetch failures on it
10741080 if (bmAddress != null ) {
10751081 handleExecutorLost(bmAddress.executorId, Some (task.epoch))
0 commit comments