Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,13 +329,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val metrics = TaskMetrics.empty
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo, Some(metrics)))
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
Expand Down Expand Up @@ -405,7 +404,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}

val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info, None))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important note here: in the old code, the elseUpdate branch would only be taken in rare error cases where we somehow purged the TaskUIData which should have been created when the task launched. It technically doesn't matter what we put in for the Option[Metrics] here since it just gets unconditionally overwritten on line 410 in the old code. So while my new code constructs TaskUIData with default metrics it doesn't actually change the behavior of this block.

val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info))
taskData.updateTaskInfo(info)
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
Expand Down
54 changes: 28 additions & 26 deletions core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ private[spark] object UIData {
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
class TaskUIData private(
private var _taskInfo: TaskInfo,
private var _metrics: Option[TaskMetricsUIData]) {
class TaskUIData private(private var _taskInfo: TaskInfo) {

private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this be None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only way for this to become None is if updateTaskMetrics is called with None.

updateTaskMetrics is called in two places:

  • In JobProgressListener.onTaskEnd, where the metrics are from Option(taskEnd.taskMetrics), where taskEnd.taskMetrics can be null in case the task has failed (according to docs).
  • In JobProgressListener.onExecutorMetricsUpdate, where the metrics are guaranteed to be defined / non-None.


var errorMessage: Option[String] = None

Expand All @@ -127,7 +127,7 @@ private[spark] object UIData {
}

def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
_metrics = TaskUIData.toTaskMetricsUIData(metrics)
_metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
}

def taskDuration: Option[Long] = {
Expand All @@ -140,28 +140,8 @@ private[spark] object UIData {
}

object TaskUIData {
def apply(taskInfo: TaskInfo, metrics: Option[TaskMetrics]): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo), toTaskMetricsUIData(metrics))
}

private def toTaskMetricsUIData(metrics: Option[TaskMetrics]): Option[TaskMetricsUIData] = {
metrics.map { m =>
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
def apply(taskInfo: TaskInfo): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
}

/**
Expand Down Expand Up @@ -206,6 +186,28 @@ private[spark] object UIData {
shuffleReadMetrics: ShuffleReadMetricsUIData,
shuffleWriteMetrics: ShuffleWriteMetricsUIData)

object TaskMetricsUIData {
def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}

val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
}

case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
object InputMetricsUIData {
def apply(metrics: InputMetrics): InputMetricsUIData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class AllStagesResourceSuite extends SparkFunSuite {
val tasks = new LinkedHashMap[Long, TaskUIData]
taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
tasks(idx.toLong) = TaskUIData(
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None)
new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false))
}

val stageUiData = new StageUIData()
Expand Down