Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,15 @@ private[v1] object AllStagesResource {
speculative = uiData.taskInfo.speculative,
accumulatorUpdates = uiData.taskInfo.accumulables.map { convertAccumulableInfo },
errorMessage = uiData.errorMessage,
taskMetrics = uiData.taskMetrics.map { convertUiTaskMetrics }
taskMetrics = uiData.metrics.map { convertUiTaskMetrics }
)
}

def taskMetricDistributions(
allTaskData: Iterable[TaskUIData],
quantiles: Array[Double]): TaskMetricDistributions = {

val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq
val rawMetrics = allTaskData.flatMap{_.metrics}.toSeq

def metricQuantiles(f: InternalTaskMetrics => Double): IndexedSeq[Double] =
Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
executorIdToData(eid) = ExecutorUIData(executorAdded.time)
executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
}

override def onExecutorRemoved(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
None
}
taskMetrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.taskMetrics)
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}

val taskData = stageData.taskData.getOrElseUpdate(info.taskId, new TaskUIData(info))
taskData.taskInfo = info
taskData.taskMetrics = taskMetrics
taskData.metrics = taskMetrics
taskData.errorMessage = errorMessage

for (
Expand Down Expand Up @@ -506,9 +506,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val metrics = TaskMetrics.fromAccumulatorUpdates(accumUpdates)
taskData.foreach { t =>
if (!t.taskInfo.finished) {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.taskMetrics)
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
// Overwrite task metrics
t.taskMetrics = Some(metrics)
t.metrics = Some(metrics)
}
}
}
Expand Down
85 changes: 43 additions & 42 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
else taskTable.dataSource.slicedTaskIds

// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.taskMetrics.isDefined)
val validTasks = tasks.filter(t => t.taskInfo.status == "SUCCESS" && t.metrics.isDefined)

val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
Expand All @@ -348,8 +348,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>)
}

val deserializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorDeserializeTime.toDouble
val deserializationTimes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.executorDeserializeTime.toDouble
}
val deserializationQuantiles =
<td>
Expand All @@ -359,13 +359,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(deserializationTimes)

val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
val serviceTimes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.executorRunTime.toDouble
}
val serviceQuantiles = <td>Duration</td> +: getFormattedTimeQuantiles(serviceTimes)

val gcTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.jvmGCTime.toDouble
val gcTimes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.jvmGCTime.toDouble
}
val gcQuantiles =
<td>
Expand All @@ -374,8 +374,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(gcTimes)

val serializationTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.resultSerializationTime.toDouble
val serializationTimes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.resultSerializationTime.toDouble
}
val serializationQuantiles =
<td>
Expand All @@ -385,8 +385,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</span>
</td> +: getFormattedTimeQuantiles(serializationTimes)

val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
getGettingResultTime(info, currentTime).toDouble
val gettingResultTimes = validTasks.map { taskUIData: TaskUIData =>
getGettingResultTime(taskUIData.taskInfo, currentTime).toDouble
}
val gettingResultQuantiles =
<td>
Expand All @@ -397,8 +397,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(gettingResultTimes)

val peakExecutionMemory = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.peakExecutionMemory.toDouble
val peakExecutionMemory = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.peakExecutionMemory.toDouble
}
val peakExecutionMemoryQuantiles = {
<td>
Expand All @@ -412,8 +412,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
// if it needed to be fetched from the block manager on the worker).
val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) =>
getSchedulerDelay(info, metrics.get, currentTime).toDouble
val schedulerDelays = validTasks.map { taskUIData: TaskUIData =>
getSchedulerDelay(taskUIData.taskInfo, taskUIData.metrics.get, currentTime).toDouble
}
val schedulerDelayTitle = <td><span data-toggle="tooltip"
title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
Expand All @@ -427,30 +427,30 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
)
}

val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
val inputSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}

val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
val inputRecords = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}

val inputQuantiles = <td>Input Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)

val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
val outputSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}

val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
val outputRecords = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}

val outputQuantiles = <td>Output Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)

val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
val shuffleReadBlockedTimes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble
}
val shuffleReadBlockedQuantiles =
<td>
Expand All @@ -461,11 +461,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedTimeQuantiles(shuffleReadBlockedTimes)

val shuffleReadTotalSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
val shuffleReadTotalSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.shuffleReadMetrics.map(_.totalBytesRead).getOrElse(0L).toDouble
}
val shuffleReadTotalRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
val shuffleReadTotalRecords = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble
}
val shuffleReadTotalQuantiles =
<td>
Expand All @@ -476,8 +476,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)

val shuffleReadRemoteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
val shuffleReadRemoteSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadRemoteQuantiles =
<td>
Expand All @@ -488,25 +488,25 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
</td> +:
getFormattedSizeQuantiles(shuffleReadRemoteSizes)

val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
val shuffleWriteSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}

val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
val shuffleWriteRecords = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}

val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)

val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.memoryBytesSpilled.toDouble
val memoryBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.memoryBytesSpilled.toDouble
}
val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +:
getFormattedSizeQuantiles(memoryBytesSpilledSizes)

val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.diskBytesSpilled.toDouble
val diskBytesSpilledSizes = validTasks.map { taskUIData: TaskUIData =>
taskUIData.metrics.get.diskBytesSpilled.toDouble
}
val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +:
getFormattedSizeQuantiles(diskBytesSpilledSizes)
Expand Down Expand Up @@ -601,7 +601,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {

def toProportion(time: Long) = time.toDouble / totalExecutionTime * 100

val metricsOpt = taskUIData.taskMetrics
val metricsOpt = taskUIData.metrics
val shuffleReadTime =
metricsOpt.flatMap(_.shuffleReadMetrics.map(_.fetchWaitTime)).getOrElse(0L)
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
Expand Down Expand Up @@ -868,7 +868,8 @@ private[ui] class TaskDataSource(
def slicedTaskIds: Set[Long] = _slicedTaskIds

private def taskRow(taskData: TaskUIData): TaskTableRowData = {
val TaskUIData(info, metrics, errorMessage) = taskData
val info = taskData.taskInfo
val metrics = taskData.metrics
val duration = if (info.status == "RUNNING") info.timeRunning(currentTime)
else metrics.map(_.executorRunTime).getOrElse(1L)
val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
Expand Down Expand Up @@ -1014,7 +1015,7 @@ private[ui] class TaskDataSource(
shuffleRead,
shuffleWrite,
bytesSpilled,
errorMessage.getOrElse(""))
taskData.errorMessage.getOrElse(""))
}

/**
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
case class TaskUIData(
class TaskUIData(
var taskInfo: TaskInfo,
var taskMetrics: Option[TaskMetrics] = None,
var metrics: Option[TaskMetrics] = None,
var errorMessage: Option[String] = None)

case class ExecutorUIData(
class ExecutorUIData(
val startTime: Long,
var finishTime: Option[Long] = None,
var finishReason: Option[String] = None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,11 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 207)
assert(stage0Data.outputBytes == 116)
assert(stage1Data.outputBytes == 208)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 2)
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 102)
assert(stage1Data.taskData.get(1236L).get.taskMetrics.get.shuffleReadMetrics.get
assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 202)

// task that was included in a heartbeat
Expand Down Expand Up @@ -355,9 +355,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
assert(stage1Data.inputBytes == 614)
assert(stage0Data.outputBytes == 416)
assert(stage1Data.outputBytes == 616)
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 302)
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get
assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get
.totalBlocksFetched == 402)
}
}