Skip to content

Commit b2efcaf

Browse files
committed
cleanup, combine stage-related paths into one resource
1 parent aaba896 commit b2efcaf

File tree

4 files changed

+82
-160
lines changed

4 files changed

+82
-160
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,17 +67,6 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {
6767
new OneStageResource(uiRoot)
6868
}
6969

70-
@Path("applications/{appId}/stages/{stageId: \\d+}/{attemptId: \\d+}")
71-
def getStageAttempt(): OneStageAttemptResource= {
72-
new OneStageAttemptResource(uiRoot)
73-
}
74-
75-
76-
@Path("applications/{appId}/stages/{stageId: \\d+}/{attemptId: \\d+}/taskSummary")
77-
def getStageAttemptTaskSummary(): StageTaskSummary = {
78-
new StageTaskSummary(uiRoot)
79-
}
80-
8170
@Path("applications/{appId}/storage/rdd")
8271
def getRdds(): AllRDDResource = {
8372
new AllRDDResource(uiRoot)

core/src/main/scala/org/apache/spark/status/api/v1/OneStageAttemptResource.scala

Lines changed: 0 additions & 66 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,41 +16,109 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19-
import javax.ws.rs.{GET, PathParam, Produces}
19+
import javax.ws.rs._
2020
import javax.ws.rs.core.MediaType
2121

2222
import org.apache.spark.SparkException
23+
import org.apache.spark.scheduler.StageInfo
24+
import org.apache.spark.ui.jobs.JobProgressListener
2325

2426
@Produces(Array(MediaType.APPLICATION_JSON))
2527
private[v1] class OneStageResource(uiRoot: UIRoot) {
2628

2729
@GET
30+
@Path("")
2831
def stageData(
2932
@PathParam("appId") appId: String,
3033
@PathParam("stageId") stageId: Int
3134
): Seq[StageData] = {
35+
forStage(appId, stageId){ (listener,stageAttempts) =>
36+
stageAttempts.map { case (status, stageInfo) =>
37+
val stageUiData = listener.synchronized {
38+
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
39+
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
40+
stageInfo.stageId + ":" + stageInfo.attemptId)
41+
)
42+
}
43+
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
44+
includeDetails = true)
45+
}
46+
}
47+
}
48+
49+
@GET
50+
@Path("/{attemptId: \\d+}")
51+
def oneAttemptData(
52+
@PathParam("appId") appId: String,
53+
@PathParam("stageId") stageId: Int,
54+
@PathParam("attemptId") attemptId: Int
55+
): StageData = {
56+
forStageAttempt(appId, stageId, attemptId) { case (listener, status, stageInfo) =>
57+
val stageUiData = listener.synchronized {
58+
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
59+
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
60+
stageInfo.stageId + ":" + stageInfo.attemptId)
61+
)
62+
}
63+
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
64+
includeDetails = true)
65+
}
66+
}
67+
68+
@GET
69+
@Path("/{attemptId: \\d+}/taskSummary")
70+
def stageData(
71+
@PathParam("appId") appId: String,
72+
@PathParam("stageId") stageId: Int,
73+
@PathParam("attemptId") attemptId: Int,
74+
@DefaultValue("0.05,0.25,0.5,0.75,0.95") @QueryParam("quantiles") quantileString: String
75+
): TaskMetricDistributions = {
76+
forStageAttempt(appId, stageId, attemptId) { case (listener, status, stageInfo) =>
77+
val stageUiData = listener.synchronized {
78+
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
79+
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
80+
stageInfo.stageId + ":" + stageInfo.attemptId)
81+
)
82+
}
83+
//TODO error handling
84+
val quantiles = quantileString.split(",").map{_.toDouble}
85+
println("quantiles = " + quantiles.mkString(","))
86+
AllStagesResource.taskMetricDistributions(stageUiData.taskData.values, quantiles)
87+
}
88+
}
89+
90+
def forStage[T](appId: String, stageId: Int)
91+
(f: (JobProgressListener, Seq[(StageStatus, StageInfo)]) => T): T = {
3292
uiRoot.withSparkUI(appId) { ui =>
33-
val listener = ui.stagesTab.listener
3493
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
3594
val stageAttempts = stageAndStatus.flatMap { case (status, stages) =>
36-
val matched = stages.filter{ stage => stage.stageId == stageId}
37-
matched.map { status -> _ }
95+
val matched = stages.filter { stage => stage.stageId == stageId}
96+
matched.map {
97+
status -> _
98+
}
3899
}
39100
if (stageAttempts.isEmpty) {
40101
throw new NotFoundException("unknown stage: " + stageId)
41102
} else {
42-
stageAttempts.map { case (status, stageInfo) =>
43-
val stageUiData = listener.synchronized {
44-
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
45-
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
46-
stageInfo.stageId + ":" + stageInfo.attemptId)
47-
)
48-
}
49-
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
50-
includeDetails = true)
51-
}
103+
f(ui.jobProgressListener, stageAttempts)
52104
}
105+
}
106+
}
53107

108+
def forStageAttempt[T](appId: String, stageId: Int, attemptId: Int)
109+
(f: (JobProgressListener, StageStatus, StageInfo) => T): T = {
110+
forStage(appId, stageId) { case (listener, attempts) =>
111+
val oneAttempt = attempts.filter{ case (status, stage) =>
112+
stage.attemptId == attemptId
113+
}.headOption
114+
oneAttempt match {
115+
case Some((status, stageInfo)) =>
116+
f(listener, status, stageInfo)
117+
case None =>
118+
val stageAttempts = attempts.map { _._2.attemptId}
119+
throw new NotFoundException(s"unknown attempt for stage $stageId. " +
120+
s"Found attempts: ${stageAttempts.mkString("[", ",", "]")}")
121+
}
54122
}
55123
}
56124
}

core/src/main/scala/org/apache/spark/status/api/v1/StageTaskSummary.scala

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)