Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,21 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
it.remove()
update(stage, now)
// Mark the stage as skipped if in Pending status
Comment thread
ankuriitg marked this conversation as resolved.
Outdated
if (v1.StageStatus.PENDING.equals(stage.status)) {
Comment thread
juliuszsompolski marked this conversation as resolved.
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
job.activeStages -= 1

pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds - stage.info.stageId
update(pool, now)
}

it.remove()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removal from iterator should always happen

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For already completed stages, we will leave the removal of stage to happen in either onTaskEnd or onStageCompleted event. This ensures that stage metrics are updated even when onJobEnd event is received before onTaskEnd event.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow - if that is the case, why are we doing this for active stages here ? onStageCompleted/onTaskEnd would be fired for active stages as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, there is an existing bug that we are not updating pool, etc which we do in onStageCompleted ...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the assumption here is that we will always receive onStageCompleted event before onJobEvent. If that does not occur for some reason, then any active stages are marked as skipped.
I don't know the scenario when onStageCompleted event is not received before onJobEnd event (or received at all). Let me look further into it. Additionally, I will also fix the bug for updating pool.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen when events get dropped ...
Spark makes best case effort to deliver events in order; but when events get dropped, UI becomes inconsistent. I assume this might be an effort to recover in that case ? Any thoughts @tgravescs, @vanzin ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your question, this == this PR?

If so, no, that's not what it's fixing. Task end events can "naturally" arrive after the stage end event in the case of a stage failure, and this code was missing that case.

When event drops occur, a lot of things get out of sync, and this change wouldn't fix that. It perhaps could make it a little worse: if a task end event does not arrive, then maybe with this change the stage will never be actually removed from the live stages map. Not sure how easy it would be to recover from that though, since dropped events could probably cause other sorts of leaks in this class too, but I also feel that's a separate issue.

(Also, hopefully, dropped events for this listener should be less common in 2.3 after the listener bus changes.)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I was referring to 'this' being job end event received before stage end (for a stage which is part of a job).

I was not referring to task end event's (those can come in after stage or job end's).

Thanks for clarifying @vanzin ... given the snippet is not trying to recover from events drop, wondering why "non"-skipped stages would even be in the list : I would expect all of them to be skipped ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would be in the list if the task end event arrives late, right? (Haven't really re-read the code to be sure.)

Unless it's guaranteed that the task end event will arrive before the job end event, unlike the case for the stage end one.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code now only handles the scenario when onStageCompleted event is dropped (not received). If we don't want to handle that scenario, then we can remove this part of the code altogether.

update(stage, now, last = true)
}
}
}

Expand Down Expand Up @@ -506,7 +516,16 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
}
maybeUpdate(stage, now)
// Remove stage if there are no active tasks left and stage is already finished
Comment thread
ankuriitg marked this conversation as resolved.
Outdated
val removeStage =
stage.activeTasks == 0 &&
(v1.StageStatus.COMPLETE.equals(stage.status) ||
v1.StageStatus.FAILED.equals(stage.status))
if (removeStage) {
update(stage, now, last = true)
Comment thread
ankuriitg marked this conversation as resolved.
} else {
maybeUpdate(stage, now)
}

// Store both stage ID and task index in a single long variable for tracking at job level.
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
Expand All @@ -521,7 +540,7 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
}
maybeUpdate(job, now)
conditionalLiveUpdate(job, now, removeStage)
}

val esummary = stage.executorSummary(event.taskInfo.executorId)
Expand All @@ -532,14 +551,17 @@ private[spark] class AppStatusListener(
if (metricsDelta != null) {
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
}
maybeUpdate(esummary, now)
conditionalLiveUpdate(esummary, now, removeStage)

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
if (removeStage) {
liveStages.remove((event.stageId, event.stageAttemptId))
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand All @@ -564,17 +586,13 @@ private[spark] class AppStatusListener(

// Force an update on live applications when the number of active tasks reaches 0. This is
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
if (exec.activeTasks == 0) {
liveUpdate(exec, now)
} else {
maybeUpdate(exec, now)
}
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
}
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val maybeStage =
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
maybeStage.foreach { stage =>
val now = System.nanoTime()
stage.info = event.stageInfo
Expand Down Expand Up @@ -608,14 +626,20 @@ private[spark] class AppStatusListener(
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now, last = true)

val executorIdsForStage = stage.blackListedExecutors
executorIdsForStage.foreach { executorId =>
liveExecutors.get(executorId).foreach { exec =>
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
}
}

// Remove stage only if there are no active tasks remaining
val removeStage = stage.activeTasks == 0
update(stage, now, last = removeStage)
if (removeStage) {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
}
}

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
Expand Down Expand Up @@ -882,6 +906,14 @@ private[spark] class AppStatusListener(
}
}

private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
if (condition) {
liveUpdate(entity, now)
} else {
maybeUpdate(entity, now)
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}

test("SPARK-24415: update metrics for tasks that finish late") {
val listener = new AppStatusListener(store, conf, true)

val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")

// Start job
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))

// Start 2 stages
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))

// Start 2 Tasks
val tasks = createTasks(2, Array("1"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
}

// Task 1 Finished
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))

// Stage 1 Completed
stage1.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage1))

// Stop job 1
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))

// Task 2 Killed
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
TaskKilled(reason = "Killed"), tasks(1), null))

// Ensure killed task metrics are updated
val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
assert(failedStages.size == 1)
assert(failedStages.head.numKilledTasks == 1)
assert(failedStages.head.numCompleteTasks == 1)

val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
assert(allJobs.size == 1)
Comment thread
vanzin marked this conversation as resolved.
assert(allJobs.head.numKilledTasks == 1)
assert(allJobs.head.numCompletedTasks == 1)
assert(allJobs.head.numActiveStages == 1)
assert(allJobs.head.numFailedStages == 1)
}

test("driver logs") {
val listener = new AppStatusListener(store, conf, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ class UISeleniumSuite
inputStream.foreachRDD { rdd =>
rdd.foreach(_ => {})
try {
rdd.foreach(_ => throw new RuntimeException("Oops"))
rdd.foreach { _ =>
import org.apache.spark.TaskContext
Comment thread
ankuriitg marked this conversation as resolved.
val tc: TaskContext = TaskContext.get
// Failing the task with id 15 to ensure only one task fails
if (tc.taskAttemptId() % 15 == 0) {
throw new RuntimeException("Oops")
}
}
} catch {
case e: SparkException if e.getMessage.contains("Oops") =>
}
Expand Down Expand Up @@ -166,7 +173,7 @@ class UISeleniumSuite

// Check job progress
findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
List("4/4", "4/4", "4/4", "3/4 (1 failed)"))

// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq
Expand Down