Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.internal.config.Status._
Expand Down Expand Up @@ -868,13 +868,17 @@ private[spark] class AppStatusListener(
// check if there is a new peak value for any of the executor level memory metrics
// for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
// for the live UI.
event.executorUpdates.foreach { case (_, peakUpdates) =>
event.executorUpdates.foreach { case (key, peakUpdates) =>
liveExecutors.get(event.execId).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(peakUpdates)) {
maybeUpdate(exec, now)
update(exec, now)
}
}

// Update stage level peak executor metrics.
updateStageLevelPeakExecutorMetrics(key._1, key._2, event.execId, peakUpdates, now)
}

// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
// here to ensure the staleness of Spark UI doesn't last more than
// `max(heartbeat interval, liveUpdateMinFlushPeriod)`.
Expand All @@ -885,17 +889,38 @@ private[spark] class AppStatusListener(
}
}

override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
override def onStageExecutorMetrics(event: SparkListenerStageExecutorMetrics): Unit = {
val now = System.nanoTime()

// check if there is a new peak value for any of the executor level memory metrics,
// while reading from the log. SparkListenerStageExecutorMetrics are only processed
// when reading logs.
liveExecutors.get(executorMetrics.execId).orElse(
deadExecutors.get(executorMetrics.execId)).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
update(exec, now)
}
liveExecutors.get(event.execId).orElse(
deadExecutors.get(event.execId)).foreach { exec =>
if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(event.executorMetrics)) {
update(exec, now)
}
}

// Update stage level peak executor metrics.
updateStageLevelPeakExecutorMetrics(
event.stageId, event.stageAttemptId, event.execId, event.executorMetrics, now)
}

private def updateStageLevelPeakExecutorMetrics(
stageId: Int,
stageAttemptId: Int,
executorId: String,
executorMetrics: ExecutorMetrics,
now: Long): Unit = {
Option(liveStages.get((stageId, stageAttemptId))).foreach { stage =>
if (stage.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics)) {
update(stage, now)
}
val esummary = stage.executorSummary(executorId)
if (esummary.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics)) {
update(esummary, now)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ private[spark] class AppStatusStore(
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary,
resourceProfileId = stage.resourceProfileId)
resourceProfileId = stage.resourceProfileId,
peakExecutorMetrics = stage.peakExecutorMetrics)
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ private class LiveExecutorStageSummary(

var metrics = createMetrics(default = 0L)

val peakExecutorMetrics = new ExecutorMetrics()

override protected def doUpdate(): Any = {
val info = new v1.ExecutorStageSummary(
taskTime,
Expand All @@ -381,7 +383,8 @@ private class LiveExecutorStageSummary(
metrics.shuffleWriteMetrics.recordsWritten,
metrics.memoryBytesSpilled,
metrics.diskBytesSpilled,
isBlacklisted)
isBlacklisted,
Some(peakExecutorMetrics).filter(_.isSet))
new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
}

Expand Down Expand Up @@ -420,6 +423,8 @@ private class LiveStage extends LiveEntity {

var blackListedExecutors = new HashSet[String]()

val peakExecutorMetrics = new ExecutorMetrics()

// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
@volatile var cleaning = false
var savedTasks = new AtomicInteger(0)
Expand Down Expand Up @@ -484,7 +489,8 @@ private class LiveStage extends LiveEntity {
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary,
resourceProfileId = info.resourceProfileId)
resourceProfileId = info.resourceProfileId,
Some(peakExecutorMetrics).filter(_.isSet))
}

override protected def doUpdate(): Any = {
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ class ExecutorStageSummary private[spark](
val shuffleWriteRecords : Long,
val memoryBytesSpilled : Long,
val diskBytesSpilled : Long,
val isBlacklistedForStage: Boolean)
val isBlacklistedForStage: Boolean,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics])

class ExecutorSummary private[spark](
val id: String,
Expand Down Expand Up @@ -259,7 +262,10 @@ class StageData private[spark](
val tasks: Option[Map[Long, TaskData]],
val executorSummary: Option[Map[String, ExecutorStageSummary]],
val killedTasksSummary: Map[String, Int],
val resourceProfileId: Int)
val resourceProfileId: Int,
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakExecutorMetrics: Option[ExecutorMetrics])

class TaskData private[spark](
val taskId: Long,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
tasks = None,
executorSummary = None,
killedTasksSummary = Map(),
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID)
ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID,
peakExecutorMetrics = None)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down Expand Up @@ -28,19 +43,4 @@
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
"attempts" : [ {
"startTime" : "2019-04-08T20:39:44.286GMT",
"endTime" : "2019-04-08T20:40:46.454GMT",
"lastUpdated" : "",
"duration" : 62168,
"sparkUser" : "systest",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1554755984286,
"endTimeEpoch" : 1554756046454,
"lastUpdatedEpoch" : 0
} ]
} ]
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand All @@ -14,22 +29,20 @@
"lastUpdatedEpoch" : 0
} ]
}, {
"id": "application_1555004656427_0144",
"name": "Spark shell",
"attempts": [
{
"startTime": "2019-07-02T21:02:17.180GMT",
"endTime": "2019-07-02T21:02:35.974GMT",
"lastUpdated": "",
"duration": 18794,
"sparkUser": "tgraves",
"completed": true,
"appSparkVersion": "3.0.0-SNAPSHOT",
"startTimeEpoch": 1562101337180,
"lastUpdatedEpoch": 0,
"endTimeEpoch": 1562101355974
}
]
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"endTimeEpoch" : 1562101355974,
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1553914137147_0018",
"name" : "LargeBlocks",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "app-20200706201101-0003",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2020-07-07T03:11:00.235GMT",
"endTime" : "2020-07-07T03:17:04.231GMT",
"lastUpdated" : "",
"duration" : 363996,
"sparkUser" : "terryk",
"completed" : true,
"appSparkVersion" : "3.1.0-SNAPSHOT",
"endTimeEpoch" : 1594091824231,
"startTimeEpoch" : 1594091460235,
"lastUpdatedEpoch" : 0
} ]
}, {
"id" : "application_1578436911597_0052",
"name" : "Spark shell",
"attempts" : [ {
Expand Down
Loading