diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 687fd2d3ffe64..20fe911f2d294 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -249,7 +249,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, HistoryServer.getAttemptURI(appId, attempt.attemptId), - attempt.startTime) + Some(attempt.lastUpdated), attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 56028710ecc66..4a4ed954d689e 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -47,6 +47,7 @@ private[v1] class AllStagesResource(ui: SparkUI) { listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) } } yield { + stageUiData.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) } } @@ -69,7 +70,8 @@ private[v1] object AllStagesResource { } val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) + Some(stageUiData.taskData.map { case (k, v) => + k -> convertTaskData(v, stageUiData.lastUpdateTime) }) } else { None } @@ -136,13 +138,13 @@ private[v1] object AllStagesResource { } } - def convertTaskData(uiData: TaskUIData): TaskData = { + def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): TaskData = { new TaskData( taskId = uiData.taskInfo.taskId, index = uiData.taskInfo.index, attempt = uiData.taskInfo.attemptNumber, launchTime = new Date(uiData.taskInfo.launchTime), - duration = uiData.taskDuration, + duration = uiData.taskDuration(lastUpdateTime), executorId = uiData.taskInfo.executorId, host = uiData.taskInfo.host, status = uiData.taskInfo.status, diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index 3e6d2942d0fbb..f15073bccced2 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -35,6 +35,7 @@ private[v1] class OneStageResource(ui: SparkUI) { def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { withStage(stageId) { stageAttempts => stageAttempts.map { stage => + stage.ui.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) } @@ -47,6 +48,7 @@ private[v1] class OneStageResource(ui: SparkUI) { @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { withStageAttempt(stageId, stageAttemptId) { stage => + stage.ui.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) } @@ -81,7 +83,8 @@ private[v1] class OneStageResource(ui: SparkUI) { @DefaultValue("20") @QueryParam("length") length: Int, @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { withStageAttempt(stageId, stageAttemptId) { stage => - val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq + val tasks = stage.ui.taskData.values + .map{ AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq .sorted(OneStageResource.ordering(sortBy)) tasks.slice(offset, offset + length) } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 589f811145519..f3fcf2778d39e 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,6 +50,7 @@ private[spark] class SparkUI private ( val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, + val lastUpdateTime: Option[Long] = None, val startTime: Long) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -176,9 +177,11 @@ private[spark] object SparkUI { securityManager: SecurityManager, appName: String, basePath: String, + lastUpdateTime: Option[Long], startTime: Long): SparkUI = { val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) + None, conf, listenerBus, securityManager, appName, basePath, + lastUpdateTime = lastUpdateTime, startTime = startTime) val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], Utils.getContextOrSparkClassLoader).asScala @@ -204,6 +207,7 @@ private[spark] object SparkUI { appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, + lastUpdateTime: Option[Long] = None, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { @@ -226,6 +230,6 @@ private[spark] object SparkUI { new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + appName, basePath, lastUpdateTime, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 633e740b9c9bd..4d80308eb0a6d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -299,6 +299,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled, + parent.lastUpdateTime, currentTime, pageSize = taskPageSize, sortColumn = taskSortColumn, @@ -863,6 +864,7 @@ private[ui] class TaskDataSource( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, + lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, @@ -889,8 +891,9 @@ private[ui] class TaskDataSource( private def taskRow(taskData: TaskUIData): TaskTableRowData = { val info = taskData.taskInfo val metrics = taskData.metrics - val duration = taskData.taskDuration.getOrElse(1L) - val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("") + val duration = taskData.taskDuration(lastUpdateTime).getOrElse(1L) + val formatDuration = + taskData.taskDuration(lastUpdateTime).map(d => UIUtils.formatDuration(d)).getOrElse("") val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) @@ -1154,6 +1157,7 @@ private[ui] class TaskPagedTable( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, + lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, @@ -1179,6 +1183,7 @@ private[ui] class TaskPagedTable( hasShuffleRead, hasShuffleWrite, hasBytesSpilled, + lastUpdateTime, currentTime, pageSize, sortColumn, diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 799d769626395..0787ea6625903 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -30,6 +30,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener val executorsListener = parent.executorsListener + val lastUpdateTime = parent.lastUpdateTime attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 9448baac096dc..d9c87f69d8a54 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -97,6 +97,7 @@ private[spark] object UIData { var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ var isBlacklisted: Int = _ + var lastUpdateTime: Option[Long] = None var schedulingPool: String = "" var description: Option[String] = None @@ -133,9 +134,9 @@ private[spark] object UIData { _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics) } - def taskDuration: Option[Long] = { + def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = { if (taskInfo.status == "RUNNING") { - Some(_taskInfo.timeRunning(System.currentTimeMillis)) + Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis))) } else { _metrics.map(_.executorRunTime) }