diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 3c869561ef8b..463f899dc249 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -41,6 +41,8 @@ case class BatchInfo( private var _failureReasons: Map[Int, String] = Map.empty + private var _numOutputOp: Int = 0 + @deprecated("Use streamIdToInputInfo instead", "1.5.0") def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords) @@ -77,4 +79,12 @@ case class BatchInfo( /** Failure reasons corresponding to every output ops in the batch */ private[streaming] def failureReasons = _failureReasons + + /** Set the number of output operations in this batch */ + private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = { + _numOutputOp = numOutputOp + } + + /** Return the number of output operations in this batch */ + private[streaming] def numOutputOp: Int = _numOutputOp } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index 255ccf053668..08f63cc99268 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -81,6 +81,7 @@ case class JobSet( if (processingEndTime >= 0) Some(processingEndTime) else None ) binfo.setFailureReason(failureReasons) + binfo.setNumOutputOp(jobs.size) binfo } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala index f702bd5bc946..3e6590d66f58 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala @@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable( private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long) extends BatchTableBase("completed-batches-table", batchInterval) { - override protected def columns: Seq[Node] = super.columns ++ - Total Delay - {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} + override protected def columns: Seq[Node] = super.columns ++ { + Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} + Output Ops: Succeeded/Total + } override protected def renderRows: Seq[Node] = { batches.flatMap(batch => {completedBatchRow(batch)}) @@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: private def completedBatchRow(batch: BatchUIData): Seq[Node] = { val totalDelay = batch.totalDelay val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-") + val numFailedOutputOp = batch.failureReason.size + val outputOpColumn = if (numFailedOutputOp > 0) { + s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" + + s" (${numFailedOutputOp} failed)" + } else { + s"${batch.numOutputOp}/${batch.numOutputOp}" + } baseRow(batch) ++ {formattedTotalDelay} + {outputOpColumn} } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala index 9129c1f26abd..1b717b64542d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala @@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { Output Op Id Description Duration + Status Job Id Duration Stages: Succeeded/Total @@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { outputOpId: OutputOpId, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, + outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: SparkJobIdWithUIData): Seq[Node] = { if (sparkJob.jobUIData.isDefined) { generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, - numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) + outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) } else { generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration, - numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) + outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) } } + private def generateOutputOpRowWithoutSparkJobs( + outputOpId: OutputOpId, + outputOpDescription: Seq[Node], + formattedOutputOpDuration: String, + outputOpStatus: String): Seq[Node] = { + + {outputOpId.toString} + {outputOpDescription} + {formattedOutputOpDuration} + {outputOpStatusCell(outputOpStatus, rowspan = 1)} + + - + + - + + - + + - + + - + + } + /** * Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into * one cell, we use "rowspan" for the first row of a output op. @@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { outputOpId: OutputOpId, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, + outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, sparkJob: JobUIData): Seq[Node] = { @@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { {outputOpDescription} - {formattedOutputOpDuration} + {formattedOutputOpDuration} ++ + {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)} } else { Nil } @@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { total = sparkJob.numTasks - sparkJob.numSkippedTasks) } - {failureReasonCell(lastFailureReason)} + {failureReasonCell(lastFailureReason, rowspan = 1)} } @@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { outputOpId: OutputOpId, outputOpDescription: Seq[Node], formattedOutputOpDuration: String, + outputOpStatus: String, numSparkJobRowsInOutputOp: Int, isFirstRow: Boolean, jobId: Int): Seq[Node] = { @@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { if (isFirstRow) { {outputOpId.toString} {outputOpDescription} - {formattedOutputOpDuration} + {formattedOutputOpDuration} ++ + {outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)} } else { Nil } @@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { {prefixCells} - {jobId.toString} + {if (jobId >= 0) jobId.toString else "-"} - @@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } private def generateOutputOpIdRow( - outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { + outputOpId: OutputOpId, + outputOpStatus: String, + sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { // We don't count the durations of dropped jobs val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get). map(sparkJob => { @@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { val description = generateOutputOpDescription(sparkJobs) - generateJobRow( - outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++ - sparkJobs.tail.map { sparkJob => + if (sparkJobs.isEmpty) { + generateOutputOpRowWithoutSparkJobs( + outputOpId, description, formattedOutputOpDuration, outputOpStatus) + } else { + val firstRow = generateJobRow( - outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob) - }.flatMap(x => x) + outputOpId, + description, + formattedOutputOpDuration, + outputOpStatus, + sparkJobs.size, + true, + sparkJobs.head) + val tailRows = + sparkJobs.tail.map { sparkJob => + generateJobRow( + outputOpId, + description, + formattedOutputOpDuration, + outputOpStatus, + sparkJobs.size, + false, + sparkJob) + } + (firstRow ++ tailRows).flatten + } } private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = { @@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } } - private def failureReasonCell(failureReason: String): Seq[Node] = { + private def failureReasonCell( + failureReason: String, + rowspan: Int, + includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = { val isMultiline = failureReason.indexOf('\n') >= 0 // Display the first line by default val failureReasonSummary = StringEscapeUtils.escapeHtml4( @@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { } else { failureReason }) + val failureDetails = + if (isMultiline && !includeFirstLineInExpandDetails) { + // Skip the first line + failureReason.substring(failureReason.indexOf('\n') + 1) + } else { + failureReason + } val details = if (isMultiline) { // scalastyle:off ++ // scalastyle:on } else { "" } - {failureReasonSummary}{details} + + if (rowspan == 1) { + {failureReasonSummary}{details} + } else { + + {failureReasonSummary}{details} + + } } private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = { @@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { * Generate the job table for the batch. */ private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = { - val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq. - sortBy(_._1). // sorted by OutputOpId + val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId). map { case (outputOpId, outputOpIdAndSparkJobIds) => // sort SparkJobIds for each OutputOpId (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted) } + val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId => + val status = batchUIData.failureReason.get(outputOpId).map { failure => + if (failure.startsWith("org.apache.spark.SparkException")) { + "Failed due to Spark job error\n" + failure + } else { + var nextLineIndex = failure.indexOf("\n") + if (nextLineIndex < 0) { + nextLineIndex = failure.size + } + val firstLine = failure.substring(0, nextLineIndex) + s"Failed due to error: $firstLine\n$failure" + } + }.getOrElse("Succeeded") + val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty) + (outputOpId, status, sparkJobIds) + } sparkListener.synchronized { - val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] = - outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) => - (outputOpId, + val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] = + outputOps.map { case (outputOpId, status, sparkJobIds) => + (outputOpId, status, sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId)))) } @@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { { outputOpIdWithJobs.map { - case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds) + case (outputOpId, status, sparkJobIds) => + generateOutputOpIdRow(outputOpId, status, sparkJobIds) } } @@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription). replaceAllLiterally("\t", "    ").replaceAllLiterally("\n", "
")) } + + private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = { + if (status == "Succeeded") { + Succeeded + } else { + failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false) + } + } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index ae508c0e9577..e6c2e2140c6c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -30,6 +30,8 @@ private[ui] case class BatchUIData( val submissionTime: Long, val processingStartTime: Option[Long], val processingEndTime: Option[Long], + val numOutputOp: Int, + val failureReason: Map[Int, String], var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) { /** @@ -69,7 +71,9 @@ private[ui] object BatchUIData { batchInfo.streamIdToInputInfo, batchInfo.submissionTime, batchInfo.processingStartTime, - batchInfo.processingEndTime + batchInfo.processingEndTime, + batchInfo.numOutputOp, + batchInfo.failureReasons ) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala index 068a6cb0e8fa..d1df78871d3b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala @@ -121,7 +121,7 @@ class UISeleniumSuite } findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be { List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)", - "Total Delay (?)") + "Total Delay (?)", "Output Ops: Succeeded/Total") } val batchLinks = @@ -138,7 +138,7 @@ class UISeleniumSuite summaryText should contain ("Total delay:") findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be { - List("Output Op Id", "Description", "Duration", "Job Id", "Duration", + List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration", "Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error") }