Skip to content

Commit 0c7b2eb

Browse files
committed
Add BatchPage to display details of a batch
1 parent 3ceb810 commit 0c7b2eb

File tree

10 files changed

+352
-19
lines changed

10 files changed

+352
-19
lines changed

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.util.collection.OpenHashSet
2424

2525
import scala.collection.mutable.HashMap
2626

27-
private[jobs] object UIData {
27+
private[spark] object UIData {
2828

2929
class ExecutorSummary {
3030
var taskTime : Long = 0

streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ abstract class DStream[T: ClassTag] (
626626
println("Time: " + time)
627627
println("-------------------------------------------")
628628
firstNum.take(num).foreach(println)
629-
if (firstNum.size > num) println("...")
629+
if (firstNum.length > num) println("...")
630630
println()
631631
}
632632
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,11 @@ case class BatchInfo(
5858
*/
5959
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
6060
.map(x => x._1 + x._2).headOption
61+
62+
/**
63+
* The number of recorders received by the receivers in this batch.
64+
*/
65+
def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
66+
infos.map(_.numRecords).sum
67+
}.sum
6168
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,43 @@ import scala.util.Try
2525
*/
2626
private[streaming]
2727
class Job(val time: Time, func: () => _) {
28-
var id: String = _
29-
var result: Try[_] = null
28+
private var _id: String = _
29+
private var _outputOpId: Int = _
30+
private var isSet = false
31+
private var _result: Try[_] = null
3032

3133
def run() {
32-
result = Try(func())
34+
_result = Try(func())
3335
}
3436

35-
def setId(number: Int) {
36-
id = "streaming job " + time + "." + number
37+
def result: Try[_] = {
38+
if (_result == null) {
39+
throw new IllegalStateException("Cannot access result before job finishes")
40+
}
41+
_result
42+
}
43+
44+
def id: String = {
45+
if (!isSet) {
46+
throw new IllegalStateException("Cannot access id before calling setId")
47+
}
48+
_id
49+
}
50+
51+
def outputOpId: Int = {
52+
if (!isSet) {
53+
throw new IllegalStateException("Cannot access number before calling setId")
54+
}
55+
_outputOpId
56+
}
57+
58+
def setOutputOpId(outputOpId: Int) {
59+
if (isSet) {
60+
throw new IllegalStateException("Cannot call setOutputOpId more than once")
61+
}
62+
isSet = true
63+
_id = "streaming job " + time + "." + outputOpId
64+
_outputOpId = outputOpId
3765
}
3866

3967
override def toString: String = id

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,16 +170,25 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
170170
ssc.waiter.notifyError(e)
171171
}
172172

173-
private class JobHandler(job: Job) extends Runnable {
173+
private class JobHandler(job: Job) extends Runnable with Logging {
174174
def run() {
175+
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
176+
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
175177
eventActor ! JobStarted(job)
176178
// Disable checks for existing output directories in jobs launched by the streaming scheduler,
177179
// since we may need to write output to an existing directory during checkpoint recovery;
178180
// see SPARK-4835 for more details.
179181
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
180182
job.run()
181183
}
184+
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
185+
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
182186
eventActor ! JobCompleted(job)
183187
}
184188
}
185189
}
190+
191+
private[streaming] object JobScheduler {
192+
private[streaming] val BATCH_TIME_PROPERTY_KEY = "spark.streaming.internal.batchTime"
193+
private[streaming] val OUTPUT_OP_ID_PROPERTY_KEY = "spark.streaming.internal.outputOpId"
194+
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ case class JobSet(
3535
private var processingStartTime = -1L // when the first job of this jobset started processing
3636
private var processingEndTime = -1L // when the last job of this jobset finished processing
3737

38-
jobs.zipWithIndex.foreach { case (job, i) => job.setId(i) }
38+
jobs.zipWithIndex.foreach { case (job, i) => job.setOutputOpId(i) }
3939
incompleteJobs ++= jobs
4040

4141
def handleJobStart(job: Job) {
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.ui
19+
20+
import javax.servlet.http.HttpServletRequest
21+
22+
import org.apache.commons.lang3.StringEscapeUtils
23+
import org.apache.spark.streaming.Time
24+
import org.apache.spark.ui.{UIUtils, WebUIPage}
25+
import org.apache.spark.streaming.ui.StreamingJobProgressListener.{JobId, OutputOpId}
26+
import org.apache.spark.ui.jobs.UIData.JobUIData
27+
28+
import scala.xml.{NodeSeq, Node}
29+
30+
class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
31+
private val streaminglistener = parent.listener
32+
private val sparkListener = parent.ssc.sc.jobProgressListener
33+
34+
private def columns: Seq[Node] = {
35+
<th>Output Op Id</th>
36+
<th>Description</th>
37+
<th>Duration</th>
38+
<th>Job Id</th>
39+
<th>Duration</th>
40+
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
41+
<th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
42+
<th>Last Error</th>
43+
}
44+
45+
private def makeOutputOpIdRow(outputOpId: OutputOpId, jobs: Seq[JobUIData]): Seq[Node] = {
46+
val jobDurations = jobs.map(job => {
47+
job.submissionTime.map { start =>
48+
val end = job.completionTime.getOrElse(System.currentTimeMillis())
49+
end - start
50+
}
51+
})
52+
val formattedOutputOpDuration =
53+
if (jobDurations.exists(_ == None)) {
54+
// If any job does not finish, set "formattedOutputOpDuration" to "-"
55+
"-"
56+
} else {
57+
UIUtils.formatDuration(jobDurations.flatMap(x => x).sum)
58+
}
59+
60+
def makeJobRow(job: JobUIData, isFirstRow: Boolean): Seq[Node] = {
61+
val lastStageInfo = Option(job.stageIds)
62+
.filter(_.nonEmpty)
63+
.flatMap { ids => sparkListener.stageIdToInfo.get(ids.max) }
64+
val lastStageData = lastStageInfo.flatMap { s =>
65+
sparkListener.stageIdToData.get((s.stageId, s.attemptId))
66+
}
67+
68+
val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
69+
val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
70+
val duration: Option[Long] = {
71+
job.submissionTime.map { start =>
72+
val end = job.completionTime.getOrElse(System.currentTimeMillis())
73+
end - start
74+
}
75+
}
76+
val lastFailureReason = job.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get).
77+
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
78+
flatMap(info => info.failureReason).headOption.getOrElse("")
79+
val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("-")
80+
val detailUrl = s"${UIUtils.prependBaseUri(parent.basePath)}/jobs/job?id=${job.jobId}"
81+
<tr>
82+
{if(isFirstRow) {
83+
<td rowspan={jobs.size.toString}>{outputOpId}</td>
84+
<td rowspan={jobs.size.toString}>
85+
<span class="description-input" title={lastStageDescription}>
86+
{lastStageDescription}
87+
</span>{lastStageName}
88+
</td>
89+
<td rowspan={jobs.size.toString}>{formattedOutputOpDuration}</td>}
90+
}
91+
<td sorttable_customkey={job.jobId.toString}>
92+
<a href={detailUrl}>
93+
{job.jobId}{job.jobGroup.map(id => s"($id)").getOrElse("")}
94+
</a>
95+
</td>
96+
<td sorttable_customkey={duration.getOrElse(Long.MaxValue).toString}>
97+
{formattedDuration}
98+
</td>
99+
<td class="stage-progress-cell">
100+
{job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
101+
{if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
102+
{if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
103+
</td>
104+
<td class="progress-cell">
105+
{UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
106+
failed = job.numFailedTasks, skipped = job.numSkippedTasks,
107+
total = job.numTasks - job.numSkippedTasks)}
108+
</td>
109+
{failureReasonCell(lastFailureReason)}
110+
</tr>
111+
}
112+
113+
makeJobRow(jobs.head, true) ++ (jobs.tail.map(job => makeJobRow(job, false)).flatMap(x => x))
114+
}
115+
116+
private def failureReasonCell(failureReason: String): Seq[Node] = {
117+
val isMultiline = failureReason.indexOf('\n') >= 0
118+
// Display the first line by default
119+
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
120+
if (isMultiline) {
121+
failureReason.substring(0, failureReason.indexOf('\n'))
122+
} else {
123+
failureReason
124+
})
125+
val details = if (isMultiline) {
126+
// scalastyle:off
127+
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
128+
class="expand-details">
129+
+details
130+
</span> ++
131+
<div class="stacktrace-details collapsed">
132+
<pre>{failureReason}</pre>
133+
</div>
134+
// scalastyle:on
135+
} else {
136+
""
137+
}
138+
<td valign="middle">{failureReasonSummary}{details}</td>
139+
}
140+
141+
private def jobsTable(jobInfos: Seq[(OutputOpId, JobId)]): Seq[Node] = {
142+
def getJobData(jobId: JobId): Option[JobUIData] = {
143+
sparkListener.activeJobs.get(jobId).orElse {
144+
sparkListener.completedJobs.find(_.jobId == jobId).orElse {
145+
sparkListener.failedJobs.find(_.jobId == jobId)
146+
}
147+
}
148+
}
149+
150+
// Group jobInfos by OutputOpId firstly, then sort them.
151+
// E.g., [(0, 1), (1, 3), (0, 2), (1, 4)] => [(0, [1, 2]), (1, [3, 4])]
152+
val outputOpIdWithJobIds: Seq[(OutputOpId, Seq[JobId])] =
153+
jobInfos.groupBy(_._1).toSeq.sortBy(_._1). // sorted by OutputOpId
154+
map { case (outputOpId, jobs) =>
155+
(outputOpId, jobs.map(_._2).sortBy(x => x).toSeq)} // sort JobIds for each OutputOpId
156+
sparkListener.synchronized {
157+
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[JobUIData])] = outputOpIdWithJobIds.map {
158+
case (outputOpId, jobIds) =>
159+
// Filter out JobIds that don't exist in sparkListener
160+
(outputOpId, jobIds.flatMap(getJobData))
161+
}
162+
163+
<table id="batch-job-table" class="table table-bordered table-striped table-condensed">
164+
<thead>
165+
{columns}
166+
</thead>
167+
<tbody>
168+
{outputOpIdWithJobs.map { case (outputOpId, jobs) => makeOutputOpIdRow(outputOpId, jobs)}}
169+
</tbody>
170+
</table>
171+
}
172+
}
173+
174+
def render(request: HttpServletRequest): Seq[Node] = {
175+
val batchTime = Option(request.getParameter("id")).map(id => Time(id.toLong)).getOrElse {
176+
throw new IllegalArgumentException(s"Missing id parameter")
177+
}
178+
val formattedBatchTime = UIUtils.formatDate(batchTime.milliseconds)
179+
val (batchInfo, jobInfos) = streaminglistener.synchronized {
180+
val _batchInfo = streaminglistener.getBatchInfo(batchTime).getOrElse {
181+
throw new IllegalArgumentException(s"Batch $formattedBatchTime does not exist")
182+
}
183+
val _jobInfos = streaminglistener.getJobInfos(batchTime)
184+
(_batchInfo, _jobInfos)
185+
}
186+
187+
val formattedSchedulingDelay =
188+
batchInfo.schedulingDelay.map(UIUtils.formatDuration).getOrElse("-")
189+
val formattedProcessingTime =
190+
batchInfo.processingDelay.map(UIUtils.formatDuration).getOrElse("-")
191+
val formattedTotalDelay = batchInfo.totalDelay.map(UIUtils.formatDuration).getOrElse("-")
192+
193+
val summary: NodeSeq =
194+
<div>
195+
<ul class="unstyled">
196+
<li>
197+
<strong>Batch Duration: </strong>
198+
{UIUtils.formatDuration(streaminglistener.batchDuration)}
199+
</li>
200+
<li>
201+
<strong>Input data size: </strong>
202+
{batchInfo.numRecords} records
203+
</li>
204+
<li>
205+
<strong>Scheduling delay: </strong>
206+
{formattedSchedulingDelay} records
207+
</li>
208+
<li>
209+
<strong>Processing time: </strong>
210+
{formattedProcessingTime}
211+
</li>
212+
<li>
213+
<strong>Total delay: </strong>
214+
{formattedTotalDelay} records
215+
</li>
216+
</ul>
217+
</div>
218+
219+
val content = summary ++ jobInfos.map(jobsTable).getOrElse {
220+
<div>Cannot find any job for Batch {formattedBatchTime}</div>
221+
}
222+
UIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
223+
}
224+
}

0 commit comments

Comments
 (0)