Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class TaskInfo(

def status: String = {
if (running) {
"RUNNING"
} else if (gettingResult) {
"GET RESULT"
if (gettingResult) {
"GET RESULT"
} else {
"RUNNING"
}
} else if (failed) {
"FAILED"
} else if (successful) {
Expand Down
25 changes: 18 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +: getFormattedTimeQuantiles(serializationTimes)

val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
(info.finishTime - info.gettingResultTime).toDouble
} else {
0.0
}
getGettingResultTime(info).toDouble
}
val gettingResultQuantiles =
<td>
Expand Down Expand Up @@ -462,7 +458,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
val gettingResultTime = info.gettingResultTime
val gettingResultTime = getGettingResultTime(info)

val maybeAccumulators = info.accumulables
val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"}
Expand Down Expand Up @@ -625,6 +621,19 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<td>{errorSummary}{details}</td>
}

private def getGettingResultTime(info: TaskInfo): Long = {
if (info.gettingResultTime > 0) {
if (info.finishTime > 0) {
info.finishTime - info.gettingResultTime
} else {
// The task is still fetching the result.
System.currentTimeMillis - info.gettingResultTime
}
} else {
0L
}
}

private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
val totalExecutionTime =
if (info.gettingResult) {
Expand All @@ -636,6 +645,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
}
val executorOverhead = (metrics.executorDeserializeTime +
metrics.resultSerializationTime)
math.max(0, totalExecutionTime - metrics.executorRunTime - executorOverhead)
math.max(
0,
totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info))
}
}