Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 174 additions & 139 deletions streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -17,39 +17,122 @@

package org.apache.spark.streaming.ui

import scala.xml.Node
import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest

import scala.xml.{Node, Unparsed}

import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}

private[ui] class StreamingBatchPagedTable(
request: HttpServletRequest,
parent: StreamingTab,
batchInterval: Long,
batchData: Seq[BatchUIData],
streamingBatchTag: String,
basePath: String,
subPath: String,
parameterOtherTable: Iterable[String],
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedTable[BatchUIData] {

override val dataSource = new StreamingBatchTableDataSource(batchData, pageSize, sortColumn, desc)
private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}"
private val firstFailureReason = getFirstFailureReason(batchData)

override def tableId: String = streamingBatchTag

override def tableCssClass: String =
"table table-bordered table-condensed table-striped " +
"table-head-clickable table-cell-width-limited"

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$streamingBatchTag.sort=$encodedSortColumn" +
s"&$streamingBatchTag.desc=$desc" +
s"&$pageSizeFormField=$pageSize"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to add #tableHeaderId here?

}

import org.apache.spark.ui.{UIUtils => SparkUIUtils}
override def pageSizeFormField: String = s"$streamingBatchTag.pageSize"

private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
override def pageNumberFormField: String = s"$streamingBatchTag.page"

protected def columns: Seq[Node] = {
<th>Batch Time</th>
<th>Records</th>
<th>Scheduling Delay
{SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
</th>
<th>Processing Time
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
s"$parameterPath&$streamingBatchTag.sort=$encodedSortColumn&$streamingBatchTag.desc=$desc"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also it seems we need to add #tableHeaderId ?

s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add the tableHeaderId in the link.

}

/**
* Return the first failure reason if finding in the batches.
*/
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}

protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
UIUtils.failureReasonCell(
failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false)
}.getOrElse(<td>-</td>)
override def headers: Seq[Node] = {
val completedBatchTableHeaders = Seq("Batch Time", "Records", "Scheduling Delay",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the headers only for completedBatchTables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the tables are identical i.e. the schema is same for both. So headers will remain same for both. But yeah, completedBatchTableHeaders is misleading. So, i will update the variable name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the attached screenshot, I can see Total Delay isn't there in the ActiveBatches table and Status field isn't there in the CompletedBatches table?

"Processing Delay", "Total Delay", "Output Ops: Succeeded/Total")

val tooltips = Seq(None, None, Some("Time taken by Streaming scheduler to" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the tooltip newly added in this PR or was that already there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tooltips were already there.

" submit jobs of a batch"), Some("Time taken to process all jobs of a batch"),
Some("Total time taken to handle a batch"), None)

assert(completedBatchTableHeaders.length == tooltips.length)

val headerRow: Seq[Node] = {
completedBatchTableHeaders.zip(tooltips).map { case (header, tooltip) =>
if (header == sortColumn) {
val headerLink = Unparsed(
parameterPath +
s"&$streamingBatchTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$streamingBatchTag.desc=${!desc}" +
s"&$streamingBatchTag.pageSize=$pageSize" +
s"#$streamingBatchTag")
val arrow = if (desc) "&#x25BE;" else "&#x25B4;" // UP or DOWN

if (tooltip.nonEmpty) {
<th>
<a href={headerLink}>
<span data-toggle="tooltip" title={tooltip.get}>
{header}&nbsp;{Unparsed(arrow)}
</span>
</a>
</th>
} else {
<th>
<a href={headerLink}>
{header}&nbsp;{Unparsed(arrow)}
</a>
</th>
}
} else {
val headerLink = Unparsed(
parameterPath +
s"&$streamingBatchTag.sort=${URLEncoder.encode(header, UTF_8.name())}" +
s"&$streamingBatchTag.pageSize=$pageSize" +
s"#$streamingBatchTag")

if(tooltip.nonEmpty) {
<th>
<a href={headerLink}>
<span data-toggle="tooltip" title={tooltip.get}>
{header}
</span>
</a>
</th>
} else {
<th>
<a href={headerLink}>
{header}
</a>
</th>
}
}
}
}
<thead>
{headerRow}
</thead>
}

protected def baseRow(batch: BatchUIData): Seq[Node] = {
override def row(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = UIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
Expand All @@ -58,138 +141,90 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")

<td id={batchTimeId} sorttable_customkey={batchTime.toString}
isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
<tr>
<td id={batchTimeId}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td>
{numRecords.toString} records
</td>
<td>
{formattedSchedulingDelay}
</td>
<td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
<td>
{formattedProcessingTime}
</td>
}

private def batchTable: Seq[Node] = {
<table id={tableId} class="table table-bordered table-striped table-condensed sortable">
<thead>
{columns}
</thead>
<tbody>
{renderRows}
</tbody>
</table>
}

def toNodeSeq: Seq[Node] = {
batchTable
}

protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
<td class="progress-cell">
<td>
{formattedTotalDelay}
</td>
<td class="progress-cell">
{SparkUIUtils.makeProgressBar(started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp, failed = batch.numFailedOutputOp, skipped = 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the same code is there in the method createOutputOperationProgressBar? Can't we use that?

reasonToNumKilled = Map.empty, total = batch.outputOperations.size)}
</td>
{
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
}
</td>
}

/**
* Return HTML for all rows of this table.
*/
protected def renderRows: Seq[Node]
}

private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {

private val firstFailureReason = getFirstFailureReason(runningBatches)

override protected def columns: Seq[Node] = super.columns ++ {
<th>Output Ops: Succeeded/Total</th>
<th>Status</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
}
}

override protected def renderRows: Seq[Node] = {
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
// waiting batches before running batches
waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
</tr>
}

private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it newly added? getFirstFailureReason method already exist right? can't we reuse it?

}

private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
}
protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Avoid duplication of the methods

firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
UIUtils.failureReasonCell(
failureReasonForUI, rowspan = 1, includeFirstLineInExpandDetails = false)
}.getOrElse(<td>-</td>)
}
}

private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
private[ui] class StreamingBatchTableDataSource(
info: Seq[BatchUIData],
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {

private val firstFailureReason = getFirstFailureReason(batches)
private val data = info.sorted(ordering(sortColumn, desc))
private var _slicedStartTime: Set[Long] = null

override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override def dataSize: Int = data.size

override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
override def sliceData(from: Int, to: Int): Seq[BatchUIData] = {
val r = data.slice(from, to)
_slicedStartTime = r.map(_.batchTime.milliseconds).toSet
r
}

private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")

baseRow(batch) ++ {
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
} ++ createOutputOperationProgressBar(batch)++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
/**
* Return Ordering according to sortColumn and desc.
*/
private def ordering(sortColumn: String, desc: Boolean): Ordering[BatchUIData] = {
val ordering: Ordering[BatchUIData] = sortColumn match {
case "Batch Time" => Ordering.by(_.batchTime)
case "Records" => Ordering.by(_.numRecords)
case "Scheduling Delay" => Ordering.by(_.schedulingDelay)
case "Processing Delay" => Ordering.by(_.processingDelay)
case "Total Delay" => Ordering.by(_.totalDelay)
case "Output Ops: Succeeded/Total" => Ordering.by(_.batchTime)
case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
}
}
}
Loading