@@ -24,6 +24,7 @@ import org.apache.spark.scheduler.StageInfo
2424import org .apache .spark .ui .jobs .JobProgressListener
2525import org .apache .spark .ui .jobs .UIData .StageUIData
2626import org .apache .spark .util .SparkEnum
27+ import org .apache .spark .status .api .v1 .StageStatus ._
2728
2829@ Produces (Array (MediaType .APPLICATION_JSON ))
2930private [v1] class OneStageResource (uiRoot : UIRoot ) {
@@ -34,15 +35,9 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
3435 @ PathParam (" appId" ) appId : String ,
3536 @ PathParam (" stageId" ) stageId : Int
3637 ): Seq [StageData ] = {
37- forStage(appId, stageId){ (listener,stageAttempts) =>
38- stageAttempts.map { case (status, stageInfo) =>
39- val stageUiData = listener.synchronized {
40- listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
41- getOrElse(throw new SparkException (" failed to get full stage data for stage: " +
42- stageInfo.stageId + " :" + stageInfo.attemptId)
43- )
44- }
45- AllStagesResource .stageUiToStageData(status, stageInfo, stageUiData,
38+ withStage(appId, stageId){ stageAttempts =>
39+ stageAttempts.map { stage =>
40+ AllStagesResource .stageUiToStageData(stage.status, stage.info, stage.ui,
4641 includeDetails = true )
4742 }
4843 }
@@ -55,8 +50,8 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
5550 @ PathParam (" stageId" ) stageId : Int ,
5651 @ PathParam (" attemptId" ) attemptId : Int
5752 ): StageData = {
58- forStageAttempt (appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
59- AllStagesResource .stageUiToStageData(status, stageInfo, stageUiData ,
53+ withStageAttempt (appId, stageId, attemptId) { stage =>
54+ AllStagesResource .stageUiToStageData(stage. status, stage.info, stage.ui ,
6055 includeDetails = true )
6156 }
6257 }
@@ -69,16 +64,16 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
6964 @ PathParam (" attemptId" ) attemptId : Int ,
7065 @ DefaultValue (" 0.05,0.25,0.5,0.75,0.95" ) @ QueryParam (" quantiles" ) quantileString : String
7166 ): TaskMetricDistributions = {
72- forStageAttempt (appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
73- val quantiles = quantileString.split(" ," ).map{ s =>
67+ withStageAttempt (appId, stageId, attemptId) { stage =>
68+ val quantiles = quantileString.split(" ," ).map { s =>
7469 try {
7570 s.toDouble
7671 } catch {
7772 case nfe : NumberFormatException =>
7873 throw new BadParameterException (" quantiles" , " double" , s)
7974 }
8075 }
81- AllStagesResource .taskMetricDistributions(stageUiData .taskData.values, quantiles)
76+ AllStagesResource .taskMetricDistributions(stage.ui .taskData.values, quantiles)
8277 }
8378 }
8479
@@ -92,49 +87,57 @@ private[v1] class OneStageResource(uiRoot: UIRoot) {
9287 @ DefaultValue (" 20" ) @ QueryParam (" length" ) length : Int ,
9388 @ DefaultValue (" ID" ) @ QueryParam (" sortBy" ) sortBy : TaskSorting
9489 ): Seq [TaskData ] = {
95- forStageAttempt (appId, stageId, attemptId) { case (status, stageInfo, stageUiData) =>
96- val tasks = stageUiData .taskData.values.map{AllStagesResource .convertTaskData}.toIndexedSeq
90+ withStageAttempt (appId, stageId, attemptId) { stage =>
91+ val tasks = stage.ui .taskData.values.map{AllStagesResource .convertTaskData}.toIndexedSeq
9792 .sorted(sortBy.ordering)
9893 tasks.slice(offset, offset + length)
9994 }
10095 }
10196
97+ private case class StageStatusInfoUi (status : StageStatus , info : StageInfo , ui : StageUIData )
10298
103- def forStage [T ](appId : String , stageId : Int )
104- (f : ( JobProgressListener , Seq [( StageStatus , StageInfo )]) => T ): T = {
99+ private def withStage [T ](appId : String , stageId : Int )
100+ (f : Seq [StageStatusInfoUi ] => T ): T = {
105101 uiRoot.withSparkUI(appId) { ui =>
106- val stageAndStatus = AllStagesResource .stagesAndStatus(ui)
107- val stageAttempts = stageAndStatus.flatMap { case (status, stages) =>
108- val matched = stages.filter { stage => stage.stageId == stageId}
109- matched.map {
110- status -> _
111- }
112- }
102+ val stageAttempts = findStageStatusUIData(ui.jobProgressListener, stageId)
113103 if (stageAttempts.isEmpty) {
114104 throw new NotFoundException (" unknown stage: " + stageId)
115105 } else {
116- f(ui.jobProgressListener, stageAttempts)
106+ f(stageAttempts)
107+ }
108+ }
109+ }
110+
111+ private def findStageStatusUIData (
112+ listener : JobProgressListener ,
113+ stageId : Int ): Seq [StageStatusInfoUi ] = {
114+ listener.synchronized {
115+ def getStatusInfoUi (status : StageStatus , infos : Seq [StageInfo ]): Seq [StageStatusInfoUi ] = {
116+ infos.filter { _.stageId == stageId }.map { info =>
117+ val ui = listener.stageIdToData.getOrElse((info.stageId, info.attemptId),
118+ // this is an internal error -- we should always have uiData
119+ throw new SparkException (
120+ s " no stage ui data found for stage: ${info.stageId}: ${info.attemptId}" )
121+ )
122+ StageStatusInfoUi (status, info, ui)
123+ }
117124 }
125+ getStatusInfoUi(Active , listener.activeStages.values.toSeq) ++
126+ getStatusInfoUi(Complete , listener.completedStages) ++
127+ getStatusInfoUi(Failed , listener.failedStages) ++
128+ getStatusInfoUi(Pending , listener.pendingStages.values.toSeq)
118129 }
119130 }
120131
121- def forStageAttempt [T ](appId : String , stageId : Int , attemptId : Int )
122- (f : (StageStatus , StageInfo , StageUIData ) => T ): T = {
123- forStage(appId, stageId) { case (listener, attempts) =>
124- val oneAttempt = attempts.filter{ case (status, stage) =>
125- stage.attemptId == attemptId
126- }.headOption
132+ private def withStageAttempt [T ](appId : String , stageId : Int , attemptId : Int )
133+ (f : StageStatusInfoUi => T ): T = {
134+ withStage(appId, stageId) { attempts =>
135+ val oneAttempt = attempts.find { stage => stage.info.attemptId == attemptId }
127136 oneAttempt match {
128- case Some ((status, stageInfo)) =>
129- val stageUiData = listener.synchronized {
130- listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)).
131- getOrElse(throw new SparkException (" failed to get full stage data for stage: " +
132- stageInfo.stageId + " :" + stageInfo.attemptId)
133- )
134- }
135- f(status, stageInfo, stageUiData)
137+ case Some (stage) =>
138+ f(stage)
136139 case None =>
137- val stageAttempts = attempts.map { _._2 .attemptId}
140+ val stageAttempts = attempts.map { _.info .attemptId }
138141 throw new NotFoundException (s " unknown attempt for stage $stageId. " +
139142 s " Found attempts: ${stageAttempts.mkString(" [" , " ," , " ]" )}" )
140143 }
@@ -146,17 +149,18 @@ sealed abstract class TaskSorting extends SparkEnum {
146149 def ordering : Ordering [TaskData ]
147150 def alternateNames : Seq [String ] = Seq ()
148151}
152+
149153object TaskSorting extends JerseyEnum [TaskSorting ] {
150154 final val ID = {
151155 case object ID extends TaskSorting {
152- def ordering = Ordering .by{ td : TaskData => td.taskId}
156+ def ordering = Ordering .by { td : TaskData => td.taskId }
153157 }
154158 ID
155159 }
156160
157161 final val IncreasingRuntime = {
158162 case object IncreasingRuntime extends TaskSorting {
159- def ordering = Ordering .by{ td : TaskData =>
163+ def ordering = Ordering .by { td : TaskData =>
160164 td.taskMetrics.map{_.executorRunTime}.getOrElse(- 1L )
161165 }
162166 override def alternateNames = Seq (" runtime" , " +runtime" )
@@ -179,14 +183,13 @@ object TaskSorting extends JerseyEnum[TaskSorting] {
179183 )
180184
181185 val alternateNames : Map [String , TaskSorting ] =
182- values.flatMap{ x => x.alternateNames.map{ _ -> x} }.toMap
186+ values.flatMap { x => x.alternateNames.map { _ -> x } }.toMap
183187
184188 override def fromString (s : String ): TaskSorting = {
185189 alternateNames.find { case (k, v) =>
186190 k.toLowerCase() == s.toLowerCase()
187- }.map{ _._2}.getOrElse{
191+ }.map { _._2 }.getOrElse{
188192 super .fromString(s)
189193 }
190194 }
191195}
192-
0 commit comments