Skip to content

Commit dddbd29

Browse files
committed
stages have attempt; jobs are sorted; resource for all attempts for one stage
1 parent 190c17a commit dddbd29

File tree

17 files changed

+444
-74
lines changed

17 files changed

+444
-74
lines changed

core/src/main/scala/org/apache/spark/status/api/v1/AllJobsResource.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ class AllJobsResource(uiRoot: UIRoot) {
4848
statuses
4949
}
5050
}
51-
for {
51+
val jobInfos = for {
5252
(status, jobs) <- statusToJobs
5353
job <- jobs if adjStatuses.contains(status)
5454
} yield {
5555
AllJobsResource.convertJobData(job, ui.jobProgressListener, false)
5656
}
57+
jobInfos.sortBy{- _.jobId}
5758
}
5859
}
5960

core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
2121
import javax.ws.rs.core.MediaType
2222

2323
import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
24-
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo}
25-
import org.apache.spark.scheduler.StageInfo
24+
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
2625
import org.apache.spark.status.api._
2726
import org.apache.spark.ui.SparkUI
2827
import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
@@ -94,6 +93,7 @@ object AllStagesResource {
9493
new StageData(
9594
status = status,
9695
stageId = stageInfo.stageId,
96+
attemptId = stageInfo.attemptId,
9797
numActiveTasks = stageUiData.numActiveTasks,
9898
numCompleteTasks = stageUiData.numCompleteTasks,
9999
numFailedTasks = stageUiData.numFailedTasks,

core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ class JsonRootResource extends UIRootFromServletContext {
6161
new OneStageResource(uiRoot)
6262
}
6363

64+
@Path("applications/{appId}/stages/{stageId: \\d+}/{attemptId: \\d+}")
65+
def getStageAttempt(): OneStageAttemptResource= {
66+
new OneStageAttemptResource(uiRoot)
67+
}
68+
69+
6470
@Path("applications/{appId}/storage/rdd")
6571
def getRdds(): AllRDDResource = {
6672
new AllRDDResource(uiRoot)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.status.api.v1
18+
19+
import javax.ws.rs.core.MediaType
20+
import javax.ws.rs.{GET, PathParam, Produces}
21+
22+
import org.apache.spark.SparkException
23+
24+
@Produces(Array(MediaType.APPLICATION_JSON))
25+
class OneStageAttemptResource(uiRoot: UIRoot) {
26+
27+
@GET
28+
def stageData(
29+
@PathParam("appId") appId: String,
30+
@PathParam("stageId") stageId: Int,
31+
@PathParam("attemptId") attemptId: Int
32+
): StageData = {
33+
uiRoot.withSparkUI(appId) { ui =>
34+
val listener = ui.stagesTab.listener
35+
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
36+
val oneStage = stageAndStatus.flatMap { case (status, stages) =>
37+
val matched = stages.find { stage =>
38+
stage.stageId == stageId && stage.attemptId == attemptId
39+
}
40+
matched.map { status -> _ }
41+
}.headOption
42+
oneStage match {
43+
case Some((status, stageInfo)) =>
44+
val stageUiData = listener.synchronized {
45+
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
46+
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
47+
stageInfo.stageId + ":" + stageInfo.attemptId)
48+
)
49+
}
50+
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
51+
includeDetails = true)
52+
case None =>
53+
throw new NotFoundException(s"unknown (stage, attempt): ($stageId, $attemptId)")
54+
}
55+
56+
}
57+
}
58+
}

core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,18 @@ class OneStageResource(uiRoot: UIRoot) {
2828
def stageData(
2929
@PathParam("appId") appId: String,
3030
@PathParam("stageId") stageId: Int
31-
): StageData = {
31+
): Seq[StageData] = {
3232
uiRoot.withSparkUI(appId) { ui =>
3333
val listener = ui.stagesTab.listener
3434
val stageAndStatus = AllStagesResource.stagesAndStatus(ui)
35-
val oneStage = stageAndStatus.flatMap { case (status, stages) =>
36-
val matched = stages.find { _.stageId == stageId }
35+
val stageAttempts = stageAndStatus.flatMap { case (status, stages) =>
36+
val matched = stages.filter{ stage => stage.stageId == stageId}
3737
matched.map { status -> _ }
38-
}.headOption
39-
oneStage match {
40-
case Some((status, stageInfo)) =>
38+
}
39+
if (stageAttempts.isEmpty) {
40+
throw new NotFoundException("unknown stage: " + stageId)
41+
} else {
42+
stageAttempts.map { case (status, stageInfo) =>
4143
val stageUiData = listener.synchronized {
4244
listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
4345
getOrElse(throw new SparkException("failed to get full stage data for stage: " +
@@ -46,8 +48,7 @@ class OneStageResource(uiRoot: UIRoot) {
4648
}
4749
AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData,
4850
includeDetails = true)
49-
case None =>
50-
throw new NotFoundException("unknown stage: " + stageId)
51+
}
5152
}
5253

5354
}

core/src/main/scala/org/apache/spark/status/api/v1/api.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class RDDPartitionInfo(
114114
class StageData(
115115
val status: StageStatus,
116116
val stageId: Int,
117+
val attemptId: Int,
117118
val numActiveTasks: Int ,
118119
val numCompleteTasks: Int,
119120
val numFailedTasks: Int,

core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs/json_expectation

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,4 @@
11
[ {
2-
"jobId" : 0,
3-
"name" : "count at <console>:15",
4-
"stageIds" : [ 0 ],
5-
"status" : "SUCCEEDED",
6-
"numTasks" : 8,
7-
"numActiveTasks" : 0,
8-
"numCompletedTasks" : 8,
9-
"numSkippedTasks" : 8,
10-
"numFailedTasks" : 0,
11-
"numActiveStages" : 0,
12-
"numCompletedStages" : 1,
13-
"numSkippedStages" : 0,
14-
"numFailedStages" : 0
15-
}, {
162
"jobId" : 2,
173
"name" : "count at <console>:17",
184
"stageIds" : [ 3 ],
@@ -40,4 +26,18 @@
4026
"numCompletedStages" : 1,
4127
"numSkippedStages" : 0,
4228
"numFailedStages" : 1
29+
}, {
30+
"jobId" : 0,
31+
"name" : "count at <console>:15",
32+
"stageIds" : [ 0 ],
33+
"status" : "SUCCEEDED",
34+
"numTasks" : 8,
35+
"numActiveTasks" : 0,
36+
"numCompletedTasks" : 8,
37+
"numSkippedTasks" : 8,
38+
"numFailedTasks" : 0,
39+
"numActiveStages" : 0,
40+
"numCompletedStages" : 1,
41+
"numSkippedStages" : 0,
42+
"numFailedStages" : 0
4343
} ]

core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded&status=failed/json_expectation

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,4 @@
11
[ {
2-
"jobId" : 0,
3-
"name" : "count at <console>:15",
4-
"stageIds" : [ 0 ],
5-
"status" : "SUCCEEDED",
6-
"numTasks" : 8,
7-
"numActiveTasks" : 0,
8-
"numCompletedTasks" : 8,
9-
"numSkippedTasks" : 8,
10-
"numFailedTasks" : 0,
11-
"numActiveStages" : 0,
12-
"numCompletedStages" : 1,
13-
"numSkippedStages" : 0,
14-
"numFailedStages" : 0
15-
}, {
162
"jobId" : 2,
173
"name" : "count at <console>:17",
184
"stageIds" : [ 3 ],
@@ -40,4 +26,18 @@
4026
"numCompletedStages" : 1,
4127
"numSkippedStages" : 0,
4228
"numFailedStages" : 1
29+
}, {
30+
"jobId" : 0,
31+
"name" : "count at <console>:15",
32+
"stageIds" : [ 0 ],
33+
"status" : "SUCCEEDED",
34+
"numTasks" : 8,
35+
"numActiveTasks" : 0,
36+
"numCompletedTasks" : 8,
37+
"numSkippedTasks" : 8,
38+
"numFailedTasks" : 0,
39+
"numActiveStages" : 0,
40+
"numCompletedStages" : 1,
41+
"numSkippedStages" : 0,
42+
"numFailedStages" : 0
4343
} ]

core/src/test/resources/HistoryServerExpectations/applications/local-1422981780767/jobs?status=succeeded/json_expectation

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[ {
2-
"jobId" : 0,
3-
"name" : "count at <console>:15",
4-
"stageIds" : [ 0 ],
2+
"jobId" : 2,
3+
"name" : "count at <console>:17",
4+
"stageIds" : [ 3 ],
55
"status" : "SUCCEEDED",
66
"numTasks" : 8,
77
"numActiveTasks" : 0,
@@ -13,9 +13,9 @@
1313
"numSkippedStages" : 0,
1414
"numFailedStages" : 0
1515
}, {
16-
"jobId" : 2,
17-
"name" : "count at <console>:17",
18-
"stageIds" : [ 3 ],
16+
"jobId" : 0,
17+
"name" : "count at <console>:15",
18+
"stageIds" : [ 0 ],
1919
"status" : "SUCCEEDED",
2020
"numTasks" : 8,
2121
"numActiveTasks" : 0,

0 commit comments

Comments
 (0)