Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ case class BatchInfo(

private var _failureReasons: Map[Int, String] = Map.empty

private var _numOutputOp: Int = 0

@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)

Expand Down Expand Up @@ -77,4 +79,12 @@ case class BatchInfo(

/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons

/** Set the number of output operations in this batch */
private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
_numOutputOp = numOutputOp
}

/** Return the number of output operations in this batch */
private[streaming] def numOutputOp: Int = _numOutputOp
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ case class JobSet(
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
binfo.setNumOutputOp(jobs.size)
binfo
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {

override protected def columns: Seq[Node] = super.columns ++
<th>Total Delay
{SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th>
}

override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
Expand All @@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val numFailedOutputOp = batch.failureReason.size
val outputOpColumn = if (numFailedOutputOp > 0) {
s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
s" (${numFailedOutputOp} failed)"
} else {
s"${batch.numOutputOp}/${batch.numOutputOp}"
}
baseRow(batch) ++
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
<td>{outputOpColumn}</td>
}
}
134 changes: 113 additions & 21 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Output Op Id</th>
<th>Description</th>
<th>Duration</th>
<th>Status</th>
<th>Job Id</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
Expand All @@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}

private def generateOutputOpRowWithoutSparkJobs(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String): Seq[Node] = {
<tr>
<td class="output-op-id-cell" >{outputOpId.toString}</td>
<td>{outputOpDescription}</td>
<td>{formattedOutputOpDuration}</td>
{outputOpStatusCell(outputOpStatus, rowspan = 1)}
<!-- Job Id -->
<td>-</td>
<!-- Duration -->
<td>-</td>
<!-- Stages: Succeeded/Total -->
<td>-</td>
<!-- Tasks (for all stages): Succeeded/Total -->
<td>-</td>
<!-- Error -->
<td>-</td>
</tr>
}

/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
Expand All @@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
Expand All @@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<td rowspan={numSparkJobRowsInOutputOp.toString}>
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
{outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
Expand Down Expand Up @@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
{failureReasonCell(lastFailureReason)}
{failureReasonCell(lastFailureReason, rowspan = 1)}
</tr>
}

Expand All @@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
Expand All @@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
{outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
Expand All @@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
{jobId.toString}
{if (jobId >= 0) jobId.toString else "-"}
</td>
<!-- Duration -->
<td>-</td>
Expand All @@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}

private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
outputOpId: OutputOpId,
outputOpStatus: String,
sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
Expand All @@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {

val description = generateOutputOpDescription(sparkJobs)

generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
if (sparkJobs.isEmpty) {
generateOutputOpRowWithoutSparkJobs(
outputOpId, description, formattedOutputOpDuration, outputOpStatus)
} else {
val firstRow =
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
outputOpId,
description,
formattedOutputOpDuration,
outputOpStatus,
sparkJobs.size,
true,
sparkJobs.head)
val tailRows =
sparkJobs.tail.map { sparkJob =>
generateJobRow(
outputOpId,
description,
formattedOutputOpDuration,
outputOpStatus,
sparkJobs.size,
false,
sparkJob)
}
(firstRow ++ tailRows).flatten
}
}

private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
Expand Down Expand Up @@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}

private def failureReasonCell(failureReason: String): Seq[Node] = {
private def failureReasonCell(
failureReason: String,
rowspan: Int,
includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
Expand All @@ -237,20 +291,34 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
} else {
failureReason
})
val failureDetails =
if (isMultiline && !includeFirstLineInExpandDetails) {
// Skip the first line
failureReason.substring(failureReason.indexOf('\n') + 1)
} else {
failureReason
}
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
class="expand-details">
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
<pre>{failureDetails}</pre>
</div>
// scalastyle:on
} else {
""
}
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>

if (rowspan == 1) {
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
} else {
<td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
{failureReasonSummary}{details}
</td>
}
}

private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
Expand All @@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
sortBy(_._1). // sorted by OutputOpId
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
val status = batchUIData.failureReason.get(outputOpId).map { failure =>
if (failure.startsWith("org.apache.spark.SparkException")) {
"Failed due to Spark job error\n" + failure
} else {
var nextLineIndex = failure.indexOf("\n")
if (nextLineIndex < 0) {
nextLineIndex = failure.size
}
val firstLine = failure.substring(0, nextLineIndex)
s"Failed due to error: $firstLine\n$failure"
}
}.getOrElse("Succeeded")
val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
(outputOpId, status, sparkJobIds)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
(outputOpId,
val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
outputOps.map { case (outputOpId, status, sparkJobIds) =>
(outputOpId, status,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}

Expand All @@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
case (outputOpId, status, sparkJobIds) =>
generateOutputOpIdRow(outputOpId, status, sparkJobIds)
}
}
</tbody>
Expand Down Expand Up @@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
}

private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
if (status == "Succeeded") {
<td rowspan={rowspan.toString}>Succeeded</td>
} else {
failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
val numOutputOp: Int,
val failureReason: Map[Int, String],
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {

/**
Expand Down Expand Up @@ -69,7 +71,9 @@ private[ui] object BatchUIData {
batchInfo.streamIdToInputInfo,
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime
batchInfo.processingEndTime,
batchInfo.numOutputOp,
batchInfo.failureReasons
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class UISeleniumSuite
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
"Total Delay (?)")
"Total Delay (?)", "Output Ops: Succeeded/Total")
}

val batchLinks =
Expand All @@ -138,7 +138,7 @@ class UISeleniumSuite
summaryText should contain ("Total delay:")

findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration",
"Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
}

Expand Down