Skip to content

Commit c414c36

Browse files
committed
Fixed the hanging in JobCancellationSuite.
1 parent b3e2eed commit c414c36

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -677,8 +677,10 @@ class DAGScheduler(
677677
}
678678

679679
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
680-
val stageInfo = stageIdToStage(task.stageId).latestInfo
681-
listenerBus.post(SparkListenerTaskStart(task.stageId, stageInfo.attemptId, taskInfo))
680+
// Note that there is a chance that this task is launched after the stage is cancelled.
681+
// In that case, we wouldn't have the stage anymore in stageIdToStage.
682+
val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
683+
listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
682684
submitWaitingStages()
683685
}
684686

0 commit comments

Comments
 (0)