Skip to content

Commit c9bae1c

Browse files
committed
handle multiple attempts per app
1 parent b87cd63 commit c9bae1c

File tree

26 files changed

+779
-221
lines changed

26 files changed

+779
-221
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,26 @@ import org.apache.spark.ui.jobs.JobProgressListener
2626
import org.apache.spark.ui.jobs.UIData.JobUIData
2727

2828
@Produces(Array(MediaType.APPLICATION_JSON))
29-
private[v1] class AllJobsResource(uiRoot: UIRoot) {
29+
private[v1] class AllJobsResource(ui: SparkUI) {
3030

3131
@GET
32-
def jobsList(
33-
@PathParam("appId") appId: String,
34-
@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
35-
uiRoot.withSparkUI(appId) { ui =>
36-
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
37-
AllJobsResource.getStatusToJobs(ui)
38-
val adjStatuses: JList[JobExecutionStatus] = {
39-
if (statuses.isEmpty) {
40-
Arrays.asList(JobExecutionStatus.values(): _*)
41-
} else {
42-
statuses
43-
}
32+
def jobsList(@QueryParam("status") statuses: JList[JobExecutionStatus]): Seq[JobData] = {
33+
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
34+
AllJobsResource.getStatusToJobs(ui)
35+
val adjStatuses: JList[JobExecutionStatus] = {
36+
if (statuses.isEmpty) {
37+
Arrays.asList(JobExecutionStatus.values(): _*)
38+
} else {
39+
statuses
4440
}
45-
val jobInfos = for {
46-
(status, jobs) <- statusToJobs
47-
job <- jobs if adjStatuses.contains(status)
48-
} yield {
49-
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
50-
}
51-
jobInfos.sortBy{- _.jobId}
5241
}
42+
val jobInfos = for {
43+
(status, jobs) <- statusToJobs
44+
job <- jobs if adjStatuses.contains(status)
45+
} yield {
46+
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
47+
}
48+
jobInfos.sortBy{- _.jobId}
5349
}
5450

5551
}

core/src/main/scala/org/apache/spark/status/api/v1/AllRDDResource.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,23 @@
1616
*/
1717
package org.apache.spark.status.api.v1
1818

19-
import javax.ws.rs.{GET, PathParam, Produces}
19+
import javax.ws.rs.{GET, Produces}
2020
import javax.ws.rs.core.MediaType
2121

2222
import org.apache.spark.storage.{RDDInfo, StorageStatus, StorageUtils}
23+
import org.apache.spark.ui.SparkUI
2324
import org.apache.spark.ui.storage.StorageListener
2425

2526
@Produces(Array(MediaType.APPLICATION_JSON))
26-
private[v1] class AllRDDResource(uiRoot: UIRoot) {
27+
private[v1] class AllRDDResource(ui: SparkUI) {
2728

2829
@GET
29-
def jobsList(@PathParam("appId") appId: String): Seq[RDDStorageInfo] = {
30-
uiRoot.withSparkUI(appId) { ui =>
31-
val storageStatusList = ui.storageListener.storageStatusList
32-
val rddInfos = ui.storageListener.rddInfoList
33-
rddInfos.map{rddInfo =>
34-
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
35-
includeDetails = false)
36-
}
37-
30+
def rddList(): Seq[RDDStorageInfo] = {
31+
val storageStatusList = ui.storageListener.storageStatusList
32+
val rddInfos = ui.storageListener.rddInfoList
33+
rddInfos.map{rddInfo =>
34+
AllRDDResource.getRDDStorageInfo(rddInfo.id, rddInfo, storageStatusList,
35+
includeDetails = false)
3836
}
3937
}
4038

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -27,32 +27,27 @@ import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
2727
import org.apache.spark.util.Distribution
2828

2929
@Produces(Array(MediaType.APPLICATION_JSON))
30-
private[v1] class AllStagesResource(uiRoot: UIRoot) {
30+
private[v1] class AllStagesResource(ui: SparkUI) {
3131

3232
@GET
33-
def stageList(
34-
@PathParam("appId") appId: String,
35-
@QueryParam("status") statuses: JList[StageStatus]
36-
): Seq[StageData] = {
37-
uiRoot.withSparkUI(appId) { ui =>
38-
val listener = ui.jobProgressListener
39-
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
40-
val adjStatuses = {
41-
if (statuses.isEmpty()) {
42-
Arrays.asList(StageStatus.values(): _*)
43-
} else {
44-
statuses
45-
}
33+
def stageList(@QueryParam("status") statuses: JList[StageStatus]): Seq[StageData] = {
34+
val listener = ui.jobProgressListener
35+
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
36+
val adjStatuses = {
37+
if (statuses.isEmpty()) {
38+
Arrays.asList(StageStatus.values(): _*)
39+
} else {
40+
statuses
4641
}
47-
for {
48-
(status, stageList) <- stageAndStatus
49-
stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
50-
stageUiData: StageUIData <- listener.synchronized {
51-
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
52-
}
53-
} yield {
54-
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
42+
}
43+
for {
44+
(status, stageList) <- stageAndStatus
45+
stageInfo: StageInfo <- stageList if adjStatuses.contains(status)
46+
stageUiData: StageUIData <- listener.synchronized {
47+
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
5548
}
49+
} yield {
50+
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
5651
}
5752
}
5853
}

core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,18 @@ package org.apache.spark.status.api.v1
1919
import javax.ws.rs.{GET, PathParam, Produces}
2020
import javax.ws.rs.core.MediaType
2121

22+
import org.apache.spark.ui.SparkUI
2223
import org.apache.spark.ui.exec.ExecutorsPage
2324

2425
@Produces(Array(MediaType.APPLICATION_JSON))
25-
private[v1] class ExecutorListResource(uiRoot: UIRoot) {
26+
private[v1] class ExecutorListResource(ui: SparkUI) {
2627

2728
@GET
28-
def jobsList(@PathParam("appId") appId: String): Seq[ExecutorSummary] = {
29-
uiRoot.withSparkUI(appId) { ui =>
30-
val listener = ui.executorsListener
31-
val storageStatusList = listener.storageStatusList
32-
(0 until storageStatusList.size).map { statusId =>
33-
ExecutorsPage.getExecInfo(listener, statusId)
34-
}
29+
def executorList(): Seq[ExecutorSummary] = {
30+
val listener = ui.executorsListener
31+
val storageStatusList = listener.storageStatusList
32+
(0 until storageStatusList.size).map { statusId =>
33+
ExecutorsPage.getExecInfo(listener, statusId)
3534
}
3635
}
3736
}

core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala

Lines changed: 98 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,39 +51,117 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {
5151
new OneApplicationResource(uiRoot)
5252
}
5353

54+
@Path("applications/{appId}/{attemptId}/jobs")
55+
def getJobs(
56+
@PathParam("appId") appId: String,
57+
@PathParam("attemptId") attemptId: String): AllJobsResource = {
58+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
59+
new AllJobsResource(ui)
60+
}
61+
}
62+
5463
@Path("applications/{appId}/jobs")
55-
def getJobs(): AllJobsResource = {
56-
new AllJobsResource(uiRoot)
64+
def getJobs(@PathParam("appId") appId: String): AllJobsResource = {
65+
uiRoot.withSparkUI(appId, None) { ui =>
66+
new AllJobsResource(ui)
67+
}
5768
}
5869

5970
@Path("applications/{appId}/jobs/{jobId: \\d+}")
60-
def getJob(): OneJobResource = {
61-
new OneJobResource(uiRoot)
71+
def getJob(@PathParam("appId") appId: String): OneJobResource = {
72+
uiRoot.withSparkUI(appId, None) { ui =>
73+
new OneJobResource(ui)
74+
}
75+
}
76+
77+
@Path("applications/{appId}/{attemptId}/jobs/{jobId: \\d+}")
78+
def getJob(
79+
@PathParam("appId") appId: String,
80+
@PathParam("attemptId") attemptId: String): OneJobResource = {
81+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
82+
new OneJobResource(ui)
83+
}
6284
}
6385

6486
@Path("applications/{appId}/executors")
65-
def getExecutors(): ExecutorListResource = {
66-
new ExecutorListResource(uiRoot)
87+
def getExecutors(@PathParam("appId") appId: String): ExecutorListResource = {
88+
uiRoot.withSparkUI(appId, None) { ui =>
89+
new ExecutorListResource(ui)
90+
}
91+
}
92+
93+
@Path("applications/{appId}/{attemptId}/executors")
94+
def getExecutors(
95+
@PathParam("appId") appId: String,
96+
@PathParam("attemptId") attemptId: String): ExecutorListResource = {
97+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
98+
new ExecutorListResource(ui)
99+
}
67100
}
68101

102+
69103
@Path("applications/{appId}/stages")
70-
def getStages(): AllStagesResource= {
71-
new AllStagesResource(uiRoot)
104+
def getStages(@PathParam("appId") appId: String): AllStagesResource= {
105+
uiRoot.withSparkUI(appId, None) { ui =>
106+
new AllStagesResource(ui)
107+
}
108+
}
109+
110+
@Path("applications/{appId}/{attemptId}/stages")
111+
def getStages(
112+
@PathParam("appId") appId: String,
113+
@PathParam("attemptId") attemptId: String): AllStagesResource= {
114+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
115+
new AllStagesResource(ui)
116+
}
72117
}
73118

74119
@Path("applications/{appId}/stages/{stageId: \\d+}")
75-
def getStage(): OneStageResource= {
76-
new OneStageResource(uiRoot)
120+
def getStage(@PathParam("appId") appId: String): OneStageResource= {
121+
uiRoot.withSparkUI(appId, None) { ui =>
122+
new OneStageResource(ui)
123+
}
124+
}
125+
126+
@Path("applications/{appId}/{attemptId}/stages/{stageId: \\d+}")
127+
def getStage(
128+
@PathParam("appId") appId: String,
129+
@PathParam("attemptId") attemptId: String): OneStageResource = {
130+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
131+
new OneStageResource(ui)
132+
}
77133
}
78134

79135
@Path("applications/{appId}/storage/rdd")
80-
def getRdds(): AllRDDResource = {
81-
new AllRDDResource(uiRoot)
136+
def getRdds(@PathParam("appId") appId: String): AllRDDResource = {
137+
uiRoot.withSparkUI(appId, None) { ui =>
138+
new AllRDDResource(ui)
139+
}
140+
}
141+
142+
@Path("applications/{appId}/{attemptId}/storage/rdd")
143+
def getRdds(
144+
@PathParam("appId") appId: String,
145+
@PathParam("attemptId") attemptId: String): AllRDDResource = {
146+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
147+
new AllRDDResource(ui)
148+
}
82149
}
83150

84151
@Path("applications/{appId}/storage/rdd/{rddId: \\d+}")
85-
def getRdd(): OneRDDResource = {
86-
new OneRDDResource(uiRoot)
152+
def getRdd(@PathParam("appId") appId: String): OneRDDResource = {
153+
uiRoot.withSparkUI(appId, None) { ui =>
154+
new OneRDDResource(ui)
155+
}
156+
}
157+
158+
@Path("applications/{appId}/{attemptId}/storage/rdd/{rddId: \\d+}")
159+
def getRdd(
160+
@PathParam("appId") appId: String,
161+
@PathParam("attemptId") attemptId: String): OneRDDResource = {
162+
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
163+
new OneRDDResource(ui)
164+
}
87165
}
88166

89167
}
@@ -119,8 +197,9 @@ private[spark] trait UIRoot {
119197
* Get the spark UI with the given appID, and apply a function
120198
* to it. If there is no such app, throw an appropriate exception
121199
*/
122-
def withSparkUI[T](appId: String)(f: SparkUI => T): T = {
123-
getSparkUI(appId) match {
200+
def withSparkUI[T](appId: String, attemptId: Option[String])(f: SparkUI => T): T = {
201+
val appKey = attemptId.map(appId + "/" + _).getOrElse(appId)
202+
getSparkUI(appKey) match {
124203
case Some(ui) =>
125204
f(ui)
126205
case None => throw new NotFoundException("no such app: " + appId)
@@ -130,10 +209,13 @@ private[spark] trait UIRoot {
130209
}
131210

132211
private[v1] object UIRootFromServletContext {
212+
133213
private val attribute = getClass.getCanonicalName
214+
134215
def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot): Unit = {
135216
contextHandler.setAttribute(attribute, uiRoot)
136217
}
218+
137219
def getUiRoot(context: ServletContext): UIRoot = {
138220
context.getAttribute(attribute).asInstanceOf[UIRoot]
139221
}

core/src/main/scala/org/apache/spark/status/api/v1/OneJobResource.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,21 @@ import javax.ws.rs.{PathParam, GET, Produces}
2020
import javax.ws.rs.core.MediaType
2121

2222
import org.apache.spark.JobExecutionStatus
23+
import org.apache.spark.ui.SparkUI
2324
import org.apache.spark.ui.jobs.UIData.JobUIData
2425

2526
@Produces(Array(MediaType.APPLICATION_JSON))
26-
private[v1] class OneJobResource(uiRoot: UIRoot) {
27+
private[v1] class OneJobResource(ui: SparkUI) {
2728

2829
@GET
29-
def jobsList(@PathParam("appId") appId: String, @PathParam("jobId") jobId: Int): JobData = {
30-
uiRoot.withSparkUI(appId) { ui =>
31-
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
32-
AllJobsResource.getStatusToJobs(ui)
33-
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
34-
jobOpt.map { job =>
35-
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
36-
}.getOrElse {
37-
throw new NotFoundException("unknown job: " + jobId)
38-
}
30+
def oneJob(@PathParam("jobId") jobId: Int): JobData = {
31+
val statusToJobs: Seq[(JobExecutionStatus, Seq[JobUIData])] =
32+
AllJobsResource.getStatusToJobs(ui)
33+
val jobOpt = statusToJobs.map {_._2} .flatten.find { jobInfo => jobInfo.jobId == jobId}
34+
jobOpt.map { job =>
35+
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
36+
}.getOrElse {
37+
throw new NotFoundException("unknown job: " + jobId)
3938
}
4039
}
4140

core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,16 @@ package org.apache.spark.status.api.v1
1919
import javax.ws.rs.{PathParam, GET, Produces}
2020
import javax.ws.rs.core.MediaType
2121

22+
import org.apache.spark.ui.SparkUI
23+
2224
@Produces(Array(MediaType.APPLICATION_JSON))
23-
private[v1] class OneRDDResource(uiRoot: UIRoot) {
25+
private[v1] class OneRDDResource(ui: SparkUI) {
2426

25-
@GET
26-
def rddData(
27-
@PathParam("appId") appId: String,
28-
@PathParam("rddId") rddId: Int): RDDStorageInfo = {
29-
uiRoot.withSparkUI(appId) { ui =>
30-
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
31-
throw new NotFoundException(s"no rdd found w/ id $rddId")
32-
)
33-
}
34-
}
27+
@GET
28+
def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = {
29+
AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse(
30+
throw new NotFoundException(s"no rdd found w/ id $rddId")
31+
)
32+
}
3533

3634
}

0 commit comments

Comments
 (0)