@@ -792,9 +792,14 @@ class DAGScheduler(
792792 val stageId = task.stageId
793793 val taskType = Utils .getFormattedClassName(task)
794794
795- if (! stageIdToStage.contains(task.stageId)) {
795+ // The success case is dealt with separately below, since we need to compute accumulator
796+ // updates before posting.
797+ if (event.reason != Success ) {
796798 listenerBus.post(SparkListenerTaskEnd (stageId, taskType, event.reason, event.taskInfo,
797799 event.taskMetrics))
800+ }
801+
802+ if (! stageIdToStage.contains(task.stageId)) {
798803 // Skip all the actions if the stage has been cancelled.
799804 return
800805 }
@@ -829,6 +834,8 @@ class DAGScheduler(
829834 AccumulableInfo (id, name, Some (stringPartialValue), stringValue)
830835 }
831836 }
837+ listenerBus.post(SparkListenerTaskEnd (stageId, taskType, event.reason, event.taskInfo,
838+ event.taskMetrics))
832839 }
833840 pendingTasks(stage) -= task
834841 task match {
@@ -959,8 +966,6 @@ class DAGScheduler(
959966 // Unrecognized failure - also do nothing. If the task fails repeatedly, the TaskScheduler
960967 // will abort the job.
961968 }
962- listenerBus.post(SparkListenerTaskEnd (stageId, taskType, event.reason, event.taskInfo,
963- event.taskMetrics))
964969 submitWaitingStages()
965970 }
966971
0 commit comments