Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.streaming.ui

import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest

import scala.collection.mutable
import scala.xml.Node

import org.apache.commons.text.StringEscapeUtils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.util.Utils

private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
extends WebUIPage("") with Logging {
Expand All @@ -35,11 +39,147 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent)
}

def generateDataRow(request: HttpServletRequest, queryActive: Boolean)
(query: StreamingQueryUIData): Seq[Node] = {
private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
.partition(_.isActive)

val content = mutable.ListBuffer[Node]()
// show active queries table only if there is at least one active query
if (activeQueries.nonEmpty) {
// scalastyle:off
content ++=
<span id="active" class="collapse-aggregated-activeQueries collapse-table"
onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')">
<h5 id="activequeries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Active Streaming Queries ({activeQueries.length})</a>
</h5>
</span> ++
<div>
<ul class="aggregated-activeQueries collapsible-table">
{queryTable(activeQueries, request, "active")}
</ul>
</div>
// scalastyle:on
}
// show active queries table only if there is at least one completed query
if (inactiveQueries.nonEmpty) {
// scalastyle:off
content ++=
<span id="completed" class="collapse-aggregated-completedQueries collapse-table"
onClick="collapseTable('collapse-aggregated-completedQueries','aggregated-completedQueries')">
<h5 id="completedqueries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Completed Streaming Queries ({inactiveQueries.length})</a>
</h5>
</span> ++
<div>
<ul class="aggregated-completedQueries collapsible-table">
{queryTable(inactiveQueries, request, "completed")}
</ul>
</div>
// scalastyle:on
}
content
}

private def queryTable(data: Seq[StreamingQueryUIData], request: HttpServletRequest,
tableTag: String): Seq[Node] = {

val isActive = if (tableTag.contains("active")) true else false
val page = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)

try {
new StreamingQueryPagedTable(
request,
parent,
data,
tableTag,
isActive,
SparkUIUtils.prependBaseUri(request, parent.basePath),
"StreamingQuery"
).table(page)
} catch {
case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
<div class="alert alert-error">
<p>Error while rendering execution table:</p>
<pre>
{Utils.exceptionString(e)}
</pre>
</div>
}
}
}

class StreamingQueryPagedTable(
request: HttpServletRequest,
parent: StreamingQueryTab,
data: Seq[StreamingQueryUIData],
tableTag: String,
isActive: Boolean,
basePath: String,
subPath: String) extends PagedTable[StructuredStreamingRow] {

private val (sortColumn, sortDesc, pageSize) = getTableParameters(request, tableTag, "Start Time")
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

override def tableId: String = s"$tableTag-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

override def pageSizeFormField: String = s"$tableTag.pageSize"

override def pageNumberFormField: String = s"$tableTag.page"

override def pageLink(page: Int): String = {
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$tableTag.sort=$encodedSortColumn" +
s"&$tableTag.desc=$sortDesc" +
s"&$pageSizeFormField=$pageSize" +
s"#$tableTag"
}

override def goButtonFormPath: String =
s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$sortDesc#$tableTag"

override def dataSource: PagedDataSource[StructuredStreamingRow] =
new StreamingQueryDataSource(data, sortColumn, sortDesc, pageSize, isActive)

override def headers: Seq[Node] = {
val headerAndCss: Seq[(String, Boolean, Option[String])] = {
Seq(
("Name", true, None),
("Status", false, None),
("ID", true, None),
("Run ID", true, None),
("Start Time", true, None),
("Duration", true, None),
("Avg Input /sec", true, None),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen @uncleGen @sarutak Should we add tooltips for these columns.

("Avg Process /sec", true, None),
("Latest Batch", true, None)) ++ {
if (!isActive) {
Seq(("Error", false, None))
} else {
Nil
}
}
}
isSortColumnValid(headerAndCss, sortColumn)

headerRow(headerAndCss, sortDesc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
}

override def row(query: StructuredStreamingRow): Seq[Node] = {
val streamingQuery = query.streamingUIData
val statisticsLink = "%s/%s/statistics?id=%s"
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix,
streamingQuery.runId)

def details(detail: Any): Seq[Node] = {
if (queryActive) {
if (isActive) {
return Seq.empty[Node]
}
val detailString = detail.asInstanceOf[String]
Expand All @@ -51,12 +191,39 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
<td>{summary}{details}</td>
}

val statisticsLink = "%s/%s/statistics?id=%s"
.format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId)
<tr>
<td>{UIUtils.getQueryName(streamingQuery)}</td>
<td>{UIUtils.getQueryStatus(streamingQuery)}</td>
<td>{streamingQuery.id}</td>
<td><a href={statisticsLink}>{streamingQuery.runId}</a></td>
<td>{SparkUIUtils.formatDate(streamingQuery.startTimestamp)}</td>
<td>{query.duration}</td>
<td>{withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")}</td>
<td>{withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")}</td>
{details(streamingQuery.exception.getOrElse("-"))}
</tr>
}
}

val name = UIUtils.getQueryName(query)
val status = UIUtils.getQueryStatus(query)
val duration = if (queryActive) {
case class StructuredStreamingRow(
duration: String,
avgInput: Double,
avgProcess: Double,
streamingUIData: StreamingQueryUIData)

class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: String, desc: Boolean,
pageSize: Int, isActive: Boolean) extends PagedDataSource[StructuredStreamingRow](pageSize) {

// convert StreamingQueryUIData to StreamingRow to provide required data for sorting and sort it
private val data = uiData.map(streamingRow).sorted(ordering(sortColumn, desc))

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to)

private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = {
val duration = if (isActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp)
} else {
withNoProgress(query, {
Expand All @@ -65,79 +232,31 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
}, "-")
}

<tr>
<td> {name} </td>
<td> {status} </td>
<td> {query.id} </td>
<td> <a href={statisticsLink}> {query.runId} </a> </td>
<td> {SparkUIUtils.formatDate(query.startTimestamp)} </td>
<td> {duration} </td>
<td> {withNoProgress(query, {
(query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
query.recentProgress.length).formatted("%.2f") }, "NaN")}
</td>
<td> {withNoProgress(query, {
(query.recentProgress.map(p => withNumberInvalid(p.processedRowsPerSecond)).sum /
query.recentProgress.length).formatted("%.2f") }, "NaN")}
</td>
<td> {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} </td>
{details(query.exception.getOrElse("-"))}
</tr>
}
val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
query.recentProgress.length)

private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = {
val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus
.partition(_.isActive)
val activeQueryTables = if (activeQueries.nonEmpty) {
val headerRow = Seq(
"Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch")

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true),
activeQueries, true, Some("activeQueries-table"), Seq(null), false))
} else {
None
}
val avgProcess = (query.recentProgress.map(p =>
withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length)

val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
val headerRow = Seq(
"Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch", "Error")
StructuredStreamingRow(duration, avgInput, avgProcess, query)
}

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false),
inactiveQueries, true, Some("completedQueries-table"), Seq(null), false))
private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = {
val ordering: Ordering[StructuredStreamingRow] = sortColumn match {
case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData))
case "ID" => Ordering.by(_.streamingUIData.id)
case "Run ID" => Ordering.by(_.streamingUIData.runId)
case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp)
case "Duration" => Ordering.by(_.duration)
case "Avg Input /sec" => Ordering.by(_.avgInput)
case "Avg Process /sec" => Ordering.by(_.avgProcess)
case "Latest Batch" => Ordering.by(_.streamingUIData.lastProgress.batchId)
case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
None
ordering
}

// scalastyle:off
val content =
<span id="active" class="collapse-aggregated-activeQueries collapse-table"
onClick="collapseTable('collapse-aggregated-activeQueries','aggregated-activeQueries')">
<h5 id="activequeries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Active Streaming Queries ({activeQueries.length})</a>
</h5>
</span> ++
<div>
<ul class="aggregated-activeQueries collapsible-table">
{activeQueryTables.getOrElse(Seq.empty[Node])}
</ul>
</div> ++
<span id="completed" class="collapse-aggregated-completedQueries collapse-table"
onClick="collapseTable('collapse-aggregated-completedQueries','aggregated-completedQueries')">
<h5 id="completedqueries">
<span class="collapse-table-arrow arrow-open"></span>
<a>Completed Streaming Queries ({inactiveQueries.length})</a>
</h5>
</span> ++
<div>
<ul class="aggregated-completedQueries collapsible-table">
{inactiveQueryTables.getOrElse(Seq.empty[Node])}
</ul>
</div>
// scalastyle:on

content
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,18 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
var html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("active streaming queries (1)"))
assert(html.contains("completed streaming queries (0)"))

when(streamQuery.isActive).thenReturn(false)
when(streamQuery.exception).thenReturn(None)
html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("active streaming queries (0)"))
assert(html.contains("completed streaming queries (1)"))
assert(html.contains("finished"))

when(streamQuery.isActive).thenReturn(false)
when(streamQuery.exception).thenReturn(Option("exception in query"))
html = renderStreamingQueryPage(request, tab)
.toString().toLowerCase(Locale.ROOT)
assert(html.contains("active streaming queries (0)"))
assert(html.contains("completed streaming queries (1)"))
assert(html.contains("failed"))
assert(html.contains("exception in query"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,29 +91,31 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
goToUi(spark, "/StreamingQuery")

findAll(cssSelector("h3")).map(_.text).toSeq should contain("Streaming Query")
findAll(cssSelector("""#activeQueries-table th""")).map(_.text).toSeq should be {
List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch")

val arrow = 0x25BE.toChar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not just simply use a literal like '\u25BE' ?

findAll(cssSelector("""#active-table th""")).map(_.text).toList should be {
List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration",
"Avg Input /sec", "Avg Process /sec", "Latest Batch")
}
val activeQueries =
findAll(cssSelector("""#activeQueries-table td""")).map(_.text).toSeq
findAll(cssSelector("""#active-table td""")).map(_.text).toSeq
activeQueries should contain(activeQuery.id.toString)
activeQueries should contain(activeQuery.runId.toString)
findAll(cssSelector("""#completedQueries-table th"""))
.map(_.text).toSeq should be {
List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch", "Error")
findAll(cssSelector("""#completed-table th"""))
.map(_.text).toList should be {
List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration",
"Avg Input /sec", "Avg Process /sec", "Latest Batch", "Error")
}
val completedQueries =
findAll(cssSelector("""#completedQueries-table td""")).map(_.text).toSeq
findAll(cssSelector("""#completed-table td""")).map(_.text).toSeq
completedQueries should contain(completedQuery.id.toString)
completedQueries should contain(completedQuery.runId.toString)
completedQueries should contain(failedQuery.id.toString)
completedQueries should contain(failedQuery.runId.toString)

// Check the query statistics page
val activeQueryLink =
findAll(cssSelector("""#activeQueries-table a""")).flatMap(_.attribute("href")).next
findAll(cssSelector("""#active-table td a""")).flatMap(_.attribute("href")).next
go to activeQueryLink

findAll(cssSelector("h3"))
Expand Down