Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,20 @@ private[spark] class AppStatusListener(
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
it.remove()
update(stage, now)
if (v1.StageStatus.PENDING.equals(stage.status)) {
Comment thread
juliuszsompolski marked this conversation as resolved.
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
job.activeStages -= 1

pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds - stage.info.stageId
update(pool, now)
}

it.remove()
update(stage, now, last = true)
}
}
}

Expand Down Expand Up @@ -506,7 +515,16 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
stage.killedSummary = killedTasksSummary(event.reason, stage.killedSummary)
}
maybeUpdate(stage, now)
// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
val removeStage =
stage.activeTasks == 0 &&
(v1.StageStatus.COMPLETE.equals(stage.status) ||
v1.StageStatus.FAILED.equals(stage.status))
if (removeStage) {
update(stage, now, last = true)
Comment thread
ankuriitg marked this conversation as resolved.
} else {
maybeUpdate(stage, now)
}

// Store both stage ID and task index in a single long variable for tracking at job level.
val taskIndex = (event.stageId.toLong << Integer.SIZE) | event.taskInfo.index
Expand All @@ -521,7 +539,7 @@ private[spark] class AppStatusListener(
if (killedDelta > 0) {
job.killedSummary = killedTasksSummary(event.reason, job.killedSummary)
}
maybeUpdate(job, now)
conditionalLiveUpdate(job, now, removeStage)
}

val esummary = stage.executorSummary(event.taskInfo.executorId)
Expand All @@ -532,14 +550,17 @@ private[spark] class AppStatusListener(
if (metricsDelta != null) {
esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
}
maybeUpdate(esummary, now)
conditionalLiveUpdate(esummary, now, removeStage)

if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
if (removeStage) {
liveStages.remove((event.stageId, event.stageAttemptId))
}
}

liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
Expand All @@ -564,17 +585,13 @@ private[spark] class AppStatusListener(

// Force an update on live applications when the number of active tasks reaches 0. This is
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
if (exec.activeTasks == 0) {
liveUpdate(exec, now)
} else {
maybeUpdate(exec, now)
}
conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
}
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val maybeStage =
Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
Option(liveStages.get((event.stageInfo.stageId, event.stageInfo.attemptNumber)))
maybeStage.foreach { stage =>
val now = System.nanoTime()
stage.info = event.stageInfo
Expand Down Expand Up @@ -608,14 +625,20 @@ private[spark] class AppStatusListener(
}

stage.executorSummaries.values.foreach(update(_, now))
update(stage, now, last = true)

val executorIdsForStage = stage.blackListedExecutors
executorIdsForStage.foreach { executorId =>
liveExecutors.get(executorId).foreach { exec =>
removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
}
}

// Remove stage only if there are no active tasks remaining
val removeStage = stage.activeTasks == 0
update(stage, now, last = removeStage)
if (removeStage) {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
}
}

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
Expand Down Expand Up @@ -882,6 +905,14 @@ private[spark] class AppStatusListener(
}
}

private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
if (condition) {
liveUpdate(entity, now)
} else {
maybeUpdate(entity, now)
}
}

private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,61 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
}

test("SPARK-24415: update metrics for tasks that finish late") {
val listener = new AppStatusListener(store, conf, true)

val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")

// Start job
listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage1, stage2), null))

// Start 2 stages
listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))

// Start 2 Tasks
val tasks = createTasks(2, Array("1"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
}

// Task 1 Finished
time += 1
tasks(0).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))

// Stage 1 Completed
stage1.failureReason = Some("Failed")
listener.onStageCompleted(SparkListenerStageCompleted(stage1))

// Stop job 1
time += 1
listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))

// Task 2 Killed
time += 1
tasks(1).markFinished(TaskState.FINISHED, time)
listener.onTaskEnd(
SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType",
TaskKilled(reason = "Killed"), tasks(1), null))

// Ensure killed task metrics are updated
val allStages = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
val failedStages = allStages.filter(_.status == v1.StageStatus.FAILED)
assert(failedStages.size == 1)
assert(failedStages.head.numKilledTasks == 1)
assert(failedStages.head.numCompleteTasks == 1)

val allJobs = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
assert(allJobs.size == 1)
Comment thread
vanzin marked this conversation as resolved.
assert(allJobs.head.numKilledTasks == 1)
assert(allJobs.head.numCompletedTasks == 1)
assert(allJobs.head.numActiveStages == 1)
assert(allJobs.head.numFailedStages == 1)
}

test("driver logs") {
val listener = new AppStatusListener(store, conf, true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,14 @@ class UISeleniumSuite
inputStream.foreachRDD { rdd =>
rdd.foreach(_ => {})
try {
rdd.foreach(_ => throw new RuntimeException("Oops"))
rdd.foreach { _ =>
import org.apache.spark.TaskContext
Comment thread
ankuriitg marked this conversation as resolved.
val tc: TaskContext = TaskContext.get
// Failing the task with id 15 to ensure only one task fails
if (tc.taskAttemptId() % 15 == 0) {
throw new RuntimeException("Oops")
}
}
} catch {
case e: SparkException if e.getMessage.contains("Oops") =>
}
Expand Down Expand Up @@ -166,7 +173,7 @@ class UISeleniumSuite

// Check job progress
findAll(cssSelector(""".progress-cell""")).map(_.text).toList should be (
List("4/4", "4/4", "4/4", "0/4 (1 failed)"))
List("4/4", "4/4", "4/4", "3/4 (1 failed)"))

// Check stacktrace
val errorCells = findAll(cssSelector(""".stacktrace-details""")).map(_.underlying).toSeq
Expand Down