Skip to content

Commit aaba896

Browse files
committed
wire up task summary
1 parent a4b1397 commit aaba896

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,9 @@ private[v1] object AllStagesResource {
146146
)
147147
}
148148

149-
def taskMetricDistributions(allTaskData: Seq[TaskUIData], quantiles: Array[Double]): TaskMetricDistributions = {
149+
def taskMetricDistributions(allTaskData: Iterable[TaskUIData], quantiles: Array[Double]): TaskMetricDistributions = {
150150

151-
val rawMetrics = allTaskData.flatMap{_.taskMetrics}
151+
val rawMetrics = allTaskData.flatMap{_.taskMetrics}.toSeq
152152

153153
def getMetric[T](data: Seq[T], f: T => Double): IndexedSeq[Double] =
154154
Distribution(data.map{d=> f(d)}).get.getQuantiles(quantiles)

core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {
7373
}
7474

7575

76+
@Path("applications/{appId}/stages/{stageId: \\d+}/{attemptId: \\d+}/taskSummary")
77+
def getStageAttemptTaskSummary(): StageTaskSummary = {
78+
new StageTaskSummary(uiRoot)
79+
}
80+
7681
@Path("applications/{appId}/storage/rdd")
7782
def getRdds(): AllRDDResource = {
7883
new AllRDDResource(uiRoot)
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status.api.v1
18+
19+
import javax.ws.rs.core.MediaType
20+
import javax.ws.rs._
21+
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.SparkException
24+
25+
@Produces(Array(MediaType.APPLICATION_JSON))
26+
private[v1] class StageTaskSummary(uiRoot: UIRoot) {
27+
28+
@GET
29+
def stageData(
30+
@PathParam("appId") appId: String,
31+
@PathParam("stageId") stageId: Int,
32+
@PathParam("attemptId") attemptId: Int,
33+
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String
34+
): TaskMetricDistributions = {
35+
uiRoot.withSparkUI(appId) { ui =>
36+
val listener = ui.stagesTab.listener
37+
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
38+
val oneStage = stageAndStatus.flatMap { case (status, stages) =>
39+
val matched = stages.find { stage =>
40+
stage.stageId == stageId && stage.attemptId == attemptId
41+
}
42+
matched.map { status -> _ }
43+
}.headOption
44+
oneStage match {
45+
case Some((status, stageInfo)) =>
46+
val stageUiData = listener.synchronized {
47+
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
48+
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
49+
stageInfo.stageId + ":" + stageInfo.attemptId)
50+
)
51+
}
52+
val quantiles = quantileString.split(",").map{_.toDouble}
53+
println("quantiles = " + quantiles.mkString(","))
54+
AllStagesResource.taskMetricDistributions(stageUiData.taskData.values, quantiles)
55+
case None =>
56+
val stageAttempts = stageAndStatus.flatMap{ case (status, stages) =>
57+
stages.filter { _.stageId == stageId }.map{_.attemptId}
58+
}
59+
if (stageAttempts.isEmpty) {
60+
throw new NotFoundException(s"unknown stage: $stageId")
61+
} else {
62+
throw new NotFoundException(s"unknown attempt for stage $stageId. " +
63+
s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
64+
}
65+
}
66+
67+
}
68+
}
69+
}

0 commit comments

Comments
 (0)