Skip to content

Commit 1bbd13c

Browse files
committed
[SPARK-26399][CORE] Add new stage-level REST APIs and parameters to get stage level executor peak metrics distribution
1 parent fc7d016 commit 1bbd13c

File tree

6 files changed

+99
-0
lines changed

6 files changed

+99
-0
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,20 @@ private[spark] class AppStatusStore(
146146
(stage, stageDataWrapper.jobIds.toSeq)
147147
}
148148

149+
def stageExecutorSummary(
150+
stageId: Int,
151+
stageAttemptId: Int,
152+
unsortedQuantiles: Array[Double]): Option[v1.ExecutorMetricsDistributions] = {
153+
val quantiles = unsortedQuantiles.sorted
154+
val summary = executorSummary(stageId, stageAttemptId)
155+
if (summary.isEmpty) {
156+
None
157+
} else {
158+
val executorMetricsSummary = summary.values.flatMap(_.peakMemoryMetrics).toIndexedSeq
159+
Some(new v1.ExecutorMetricsDistributions(quantiles, executorMetricsSummary))
160+
}
161+
}
162+
149163
def taskCount(stageId: Int, stageAttemptId: Int): Long = {
150164
store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId))
151165
}

core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,29 @@ private[v1] class StagesResource extends BaseAppResource {
154154
}
155155
}
156156

157+
@GET
158+
@Path("{stageId: \\d+}/{stageAttemptId: \\d+}/executorMetricsDistribution")
159+
def executorSummary(
160+
@PathParam("stageId") stageId: Int,
161+
@PathParam("stageAttemptId") stageAttemptId: Int,
162+
@DefaultValue("0.05,0.25,0.5,0.75,0.95")
163+
@QueryParam("quantiles") quantileString: String): ExecutorMetricsDistributions = {
164+
withUI { ui =>
165+
val quantiles = quantileString.split(",").map { s =>
166+
try {
167+
s.toDouble
168+
} catch {
169+
case nfe: NumberFormatException =>
170+
throw new BadParameterException("quantiles", "double", s)
171+
}
172+
}
173+
174+
ui.store.stageExecutorSummary(stageId, stageAttemptId, quantiles).getOrElse(
175+
throw new NotFoundException(s"No executor reported metrics yet.")
176+
)
177+
}
178+
}
179+
157180
// Performs pagination on the server side
158181
def doPagination(queryParameters: MultivaluedMap[String, String], stageId: Int,
159182
stageAttemptId: Int, isSearch: Boolean, totalRecords: Int): Seq[TaskData] = {

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer,
2828
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
2929

3030
import org.apache.spark.JobExecutionStatus
31+
import org.apache.spark.annotation.DeveloperApi
3132
import org.apache.spark.executor.ExecutorMetrics
3233
import org.apache.spark.metrics.ExecutorMetricType
3334
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest}
@@ -170,6 +171,19 @@ private[spark] class ExecutorMetricsJsonSerializer
170171
value.isEmpty
171172
}
172173

174+
private[spark] class ExecutorMetricsDistributionJsonSerializer
175+
extends JsonSerializer[ExecutorMetricsDistributions] {
176+
override def serialize(
177+
metrics: ExecutorMetricsDistributions,
178+
jsonGenerator: JsonGenerator,
179+
serializerProvider: SerializerProvider): Unit = {
180+
val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
181+
metric -> metrics.getMetricDistribution(metric)
182+
}
183+
jsonGenerator.writeObject(metricsMap)
184+
}
185+
}
186+
173187
class JobData private[spark](
174188
val jobId: Int,
175189
val name: String,
@@ -360,6 +374,21 @@ class TaskMetricDistributions private[spark](
360374
val shuffleReadMetrics: ShuffleReadMetricDistributions,
361375
val shuffleWriteMetrics: ShuffleWriteMetricDistributions)
362376

377+
@DeveloperApi
378+
@JsonSerialize(using = classOf[ExecutorMetricsDistributionJsonSerializer])
379+
class ExecutorMetricsDistributions private[spark](
380+
val quantiles: IndexedSeq[Double],
381+
val executorMetrics: IndexedSeq[ExecutorMetrics]) {
382+
private lazy val count = executorMetrics.length
383+
private lazy val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
384+
385+
/** Returns the distributions for the specified metric. */
386+
def getMetricDistribution(metricName: String): IndexedSeq[Double] = {
387+
val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted
388+
indices.map(i => sorted(i.toInt).toDouble).toIndexedSeq
389+
}
390+
}
391+
363392
class InputMetricDistributions private[spark](
364393
val bytesRead: IndexedSeq[Double],
365394
val recordsRead: IndexedSeq[Double])
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"JVMHeapMemory" : [ 1.40508272E8, 1.40508272E8, 4.99823904E8, 5.08119376E8, 5.08119376E8 ],
3+
"JVMOffHeapMemory" : [ 5.8916608E7, 5.8916608E7, 5.8989648E7, 8.8453064E7, 8.8453064E7 ],
4+
"OnHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
5+
"OffHeapExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
6+
"OnHeapStorageMemory" : [ 5514.0, 5514.0, 5514.0, 5514.0, 5514.0 ],
7+
"OffHeapStorageMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
8+
"OnHeapUnifiedMemory" : [ 5514.0, 5514.0, 5514.0, 5514.0, 5514.0 ],
9+
"OffHeapUnifiedMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
10+
"DirectPoolMemory" : [ 10246.0, 10246.0, 10440.0, 137861.0, 137861.0 ],
11+
"MappedPoolMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
12+
"ProcessTreeJVMVMemory" : [ 8.286560256E9, 8.286560256E9, 9.678606336E9, 9.680105472E9, 9.680105472E9 ],
13+
"ProcessTreeJVMRSSMemory" : [ 4.97471488E8, 4.97471488E8, 7.7697024E8, 8.58959872E8, 8.58959872E8 ],
14+
"ProcessTreePythonVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
15+
"ProcessTreePythonRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
16+
"ProcessTreeOtherVMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
17+
"ProcessTreeOtherRSSMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
18+
"MinorGCCount" : [ 7.0, 7.0, 19.0, 19.0, 19.0 ],
19+
"MinorGCTime" : [ 55.0, 55.0, 118.0, 122.0, 122.0 ],
20+
"MajorGCCount" : [ 2.0, 2.0, 2.0, 3.0, 3.0 ],
21+
"MajorGCTime" : [ 60.0, 60.0, 63.0, 144.0, 144.0 ]
22+
}

core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
163163
"stage task list w/ status & sortBy short names: runtime" ->
164164
"applications/local-1430917381534/stages/0/0/taskList?status=success&sortBy=runtime",
165165

166+
"stage executor peak memory metrics distributions json" ->
167+
"applications/application_1553914137147_0018/stages/0/0/executorMetricsDistribution",
168+
166169
"stage list with accumulable json" -> "applications/local-1426533911241/1/stages",
167170
"stage with accumulable json" -> "applications/local-1426533911241/1/stages/0/0",
168171
"stage task list from multi-attempt app json(1)" ->

docs/monitoring.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,14 @@ can be identified by their `[attempt-id]`. In the API listed below, when running
503503
<br>Example: <code>?offset=10&amp;length=50&amp;sortBy=runtime&amp;status=running</code>
504504
</td>
505505
</tr>
506+
<tr>
507+
<td><code>/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/executorMetricsDistribution</code></td>
508+
<td>
509+
Summary peak executor metrics of all executors in the given stage attempt.
510+
<br><code>?quantiles</code> summarize the metrics with the given quantiles.
511+
<br>Example: <code>?quantiles=0.01,0.5,0.99</code>
512+
</td>
513+
</tr>
506514
<tr>
507515
<td><code>/applications/[app-id]/executors</code></td>
508516
<td>A list of all active executors for the given application.</td>

0 commit comments

Comments
 (0)