From 6d2b40eb72ea585e7f923c793ba857f823648f8d Mon Sep 17 00:00:00 2001 From: iRakson Date: Sat, 9 May 2020 18:10:08 +0530 Subject: [PATCH 1/3] [SPARK-31642] Add Pagination Support for Structured Streaming Page --- .../sql/streaming/ui/StreamingQueryPage.scala | 349 ++++++++++++++---- .../ui/StreamingQueryPageSuite.scala | 3 - .../sql/streaming/ui/UISeleniumSuite.scala | 24 +- 3 files changed, 283 insertions(+), 93 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index 733676546eab..89409616a194 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -17,15 +17,20 @@ package org.apache.spark.sql.streaming.ui +import java.net.URLEncoder +import java.nio.charset.StandardCharsets.UTF_8 import javax.servlet.http.HttpServletRequest -import scala.xml.Node +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.xml.{Node, Unparsed} import org.apache.commons.text.StringEscapeUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.streaming.ui.UIUtils._ -import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils, WebUIPage} +import org.apache.spark.util.Utils private[ui] class StreamingQueryPage(parent: StreamingQueryTab) extends WebUIPage("") with Logging { @@ -35,11 +40,218 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent) } - def generateDataRow(request: HttpServletRequest, queryActive: Boolean) - (query: StreamingQueryUIData): Seq[Node] = { + private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = { + val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus + .partition(_.isActive) + + val content = mutable.ListBuffer[Node]() + // show active queries table only if there is at least one active query + if (activeQueries.nonEmpty) { + // scalastyle:off + content ++= + +
+ + Active Streaming Queries ({activeQueries.length}) +
+
++ +
+ +
+ // scalastyle:on + } + // show active queries table only if there is at least one completed query + if (inactiveQueries.nonEmpty) { + // scalastyle:off + content ++= + +
+ + Completed Streaming Queries ({inactiveQueries.length}) +
+
++ +
+ +
+ // scalastyle:on + } + content + } + + private def queryTable(data: Seq[StreamingQueryUIData], request: HttpServletRequest, + tableTag: String): Seq[Node] = { + + val isActive = if (tableTag.contains("active")) true else false + val parameterOtherTable = request.getParameterMap.asScala + .filterNot(_._1.startsWith(tableTag)) + .map { case (name, vals) => + name + "=" + vals(0) + } + + val parameterPage = request.getParameter(s"$tableTag.page") + val parameterSortColumn = request.getParameter(s"$tableTag.sort") + val parameterSortDesc = request.getParameter(s"$tableTag.desc") + val parameterPageSize = request.getParameter(s"$tableTag.pageSize") + + val page = Option(parameterPage).map(_.toInt).getOrElse(1) + val sortColumn = Option(parameterSortColumn).map { sortColumn => + SparkUIUtils.decodeURLParameter(sortColumn) + }.getOrElse("Start Time") + val sortDesc = Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn == "Start Time") + val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + + try { + new StreamingQueryPagedTable( + request, + parent, + data, + tableTag, + pageSize, + sortColumn, + sortDesc, + isActive, + parameterOtherTable, + SparkUIUtils.prependBaseUri(request, parent.basePath), + "StreamingQuery" + ).table(page) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => +
+

Error while rendering execution table:

+
+            {Utils.exceptionString(e)}
+          
+
+ } + } +} + +class StreamingQueryPagedTable( + request: HttpServletRequest, + parent: StreamingQueryTab, + data: Seq[StreamingQueryUIData], + tableTag: String, + pageSize: Int, + sortColumn: String, + sortDesc: Boolean, + isActive: Boolean, + parameterOtherTable: Iterable[String], + basePath: String, + subPath: String) extends PagedTable[StructuredStreamingRow] { + + private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" + private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) + + override def tableId: String = s"$tableTag-table" + + override def tableCssClass: String = + "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited" + + override def pageSizeFormField: String = s"$tableTag.pageSize" + + override def pageNumberFormField: String = s"$tableTag.page" + + override def pageLink(page: Int): String = { + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$sortDesc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" + } + + override def goButtonFormPath: String = + s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$sortDesc#$tableTag" + + override def dataSource: PagedDataSource[StructuredStreamingRow] = + new StreamingQueryDataSource(data, sortColumn, sortDesc, pageSize, isActive) + + override def headers: Seq[Node] = { + val headerAndCss: Seq[(String, Boolean)] = { + Seq( + ("Name", true), + ("Status", false), + ("ID", true), + ("Run ID", true), + ("Start Time", true), + ("Duration", false), + ("Avg Input /sec", false), + ("Avg Process /sec", false), + ("Lastest Batch", true)) ++ { + if (!isActive) { + Seq(("Error", false)) + } else { + Nil + } + } + } + + val sortableColumnHeaders = headerAndCss.filter { + case (_, sortable) => sortable + }.map { case (title, _) => title } + + // sort column must be one of sortable columns of the table + require(sortableColumnHeaders.contains(sortColumn), + s"Sorting is not allowed on this column: $sortColumn") + + val headerRow: Seq[Node] = { + headerAndCss.map { case (header, sortable) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.desc=${!sortDesc}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$tableTag") + val arrow = if (sortDesc) "▾" else "▴" + + + + + {header} {Unparsed(arrow)} + + + + } else { + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + + s"&$tableTag.pageSize=$pageSize" + + s"#$tableTag") + + + + {header} + + + } else { + + {header} + + } + } + } + } + + {headerRow} + + } + + override def row(query: StructuredStreamingRow): Seq[Node] = { + val streamingQuery = query.streamingUIData + val statisticsLink = "%s/%s/statistics?id=%s" + .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, + streamingQuery.runId) def details(detail: Any): Seq[Node] = { - if (queryActive) { + if (isActive) { return Seq.empty[Node] } val detailString = detail.asInstanceOf[String] @@ -51,12 +263,39 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) {summary}{details} } - val statisticsLink = "%s/%s/statistics?id=%s" - .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId) + + {UIUtils.getQueryName(streamingQuery)} + {UIUtils.getQueryStatus(streamingQuery)} + {streamingQuery.id} + {streamingQuery.runId} + {SparkUIUtils.formatDate(streamingQuery.startTimestamp)} + {query.duration} + {withNoProgress(streamingQuery, {query.avgInput.formatted("%.2f")}, "NaN")} + {withNoProgress(streamingQuery, {query.avgProcess.formatted("%.2f")}, "NaN")} + {withNoProgress(streamingQuery, {streamingQuery.lastProgress.batchId}, "NaN")} + {details(streamingQuery.exception.getOrElse("-"))} + + } +} + +case class StructuredStreamingRow( + duration: String, + avgInput: Double, + avgProcess: Double, + streamingUIData: StreamingQueryUIData) + +class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: String, desc: Boolean, + pageSize: Int, isActive: Boolean) extends PagedDataSource[StructuredStreamingRow](pageSize) { + + // convert StreamingQueryUIData to StreamingRow to provide required data for sorting and sort it + private val data = uiData.map(streamingRow).sorted(ordering(sortColumn, desc)) + + override def dataSize: Int = data.size - val name = UIUtils.getQueryName(query) - val status = UIUtils.getQueryStatus(query) - val duration = if (queryActive) { + override def sliceData(from: Int, to: Int): Seq[StructuredStreamingRow] = data.slice(from, to) + + private def streamingRow(query: StreamingQueryUIData): StructuredStreamingRow = { + val duration = if (isActive) { SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.startTimestamp) } else { withNoProgress(query, { @@ -65,79 +304,31 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) }, "-") } - - {name} - {status} - {query.id} - {query.runId} - {SparkUIUtils.formatDate(query.startTimestamp)} - {duration} - {withNoProgress(query, { - (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / - query.recentProgress.length).formatted("%.2f") }, "NaN")} - - {withNoProgress(query, { - (query.recentProgress.map(p => withNumberInvalid(p.processedRowsPerSecond)).sum / - query.recentProgress.length).formatted("%.2f") }, "NaN")} - - {withNoProgress(query, { query.lastProgress.batchId }, "NaN")} - {details(query.exception.getOrElse("-"))} - - } + val avgInput = (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / + query.recentProgress.length) - private def generateStreamingQueryTable(request: HttpServletRequest): Seq[Node] = { - val (activeQueries, inactiveQueries) = parent.statusListener.allQueryStatus - .partition(_.isActive) - val activeQueryTables = if (activeQueries.nonEmpty) { - val headerRow = Seq( - "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch") + val avgProcess = (query.recentProgress.map(p => + withNumberInvalid(p.processedRowsPerSecond)).sum / query.recentProgress.length) - Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true), - activeQueries, true, Some("activeQueries-table"), Seq(null), false)) - } else { - None - } - - val inactiveQueryTables = if (inactiveQueries.nonEmpty) { - val headerRow = Seq( - "Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch", "Error") + StructuredStreamingRow(duration, avgInput, avgProcess, query) + } - Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false), - inactiveQueries, true, Some("completedQueries-table"), Seq(null), false)) + private def ordering(sortColumn: String, desc: Boolean): Ordering[StructuredStreamingRow] = { + val ordering: Ordering[StructuredStreamingRow] = sortColumn match { + case "Name" => Ordering.by(q => UIUtils.getQueryName(q.streamingUIData)) + case "ID" => Ordering.by(_.streamingUIData.id) + case "Run ID" => Ordering.by(_.streamingUIData.runId) + case "Start Time" => Ordering.by(_.streamingUIData.startTimestamp) + case "Duration" => Ordering.by(_.duration) + case "Avg Input /sec" => Ordering.by(_.avgInput) + case "Avg Process /sec" => Ordering.by(_.avgProcess) + case "Lastest Batch" => Ordering.by(_.streamingUIData.lastProgress.batchId) + case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") + } + if (desc) { + ordering.reverse } else { - None + ordering } - - // scalastyle:off - val content = - -
- - Active Streaming Queries ({activeQueries.length}) -
-
++ -
- -
++ - -
- - Completed Streaming Queries ({inactiveQueries.length}) -
-
++ -
- -
- // scalastyle:on - - content } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala index 2a1e18ab66bb..640c21c52a14 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -43,13 +43,11 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { var html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) assert(html.contains("active streaming queries (1)")) - assert(html.contains("completed streaming queries (0)")) when(streamQuery.isActive).thenReturn(false) when(streamQuery.exception).thenReturn(None) html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) - assert(html.contains("active streaming queries (0)")) assert(html.contains("completed streaming queries (1)")) assert(html.contains("finished")) @@ -57,7 +55,6 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { when(streamQuery.exception).thenReturn(Option("exception in query")) html = renderStreamingQueryPage(request, tab) .toString().toLowerCase(Locale.ROOT) - assert(html.contains("active streaming queries (0)")) assert(html.contains("completed streaming queries (1)")) assert(html.contains("failed")) assert(html.contains("exception in query")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala index fdf4c6634d79..f87dfb493408 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -86,26 +86,28 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B case _: StreamingQueryException => } - eventually(timeout(30.seconds), interval(100.milliseconds)) { + eventually(timeout(300.seconds), interval(100.milliseconds)) { // Check the query list page goToUi(spark, "/StreamingQuery") findAll(cssSelector("h3")).map(_.text).toSeq should contain("Streaming Query") - findAll(cssSelector("""#activeQueries-table th""")).map(_.text).toSeq should be { - List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch") + + val arrow = 0x25BE.toChar + findAll(cssSelector("""#active-table th""")).map(_.text).toList should be { + List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration", + "Avg Input /sec", "Avg Process /sec", "Lastest Batch") } val activeQueries = - findAll(cssSelector("""#activeQueries-table td""")).map(_.text).toSeq + findAll(cssSelector("""#active-table td""")).map(_.text).toSeq activeQueries should contain(activeQuery.id.toString) activeQueries should contain(activeQuery.runId.toString) - findAll(cssSelector("""#completedQueries-table th""")) - .map(_.text).toSeq should be { - List("Name", "Status", "Id", "Run ID", "Start Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Lastest Batch", "Error") + findAll(cssSelector("""#completed-table th""")) + .map(_.text).toList should be { + List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration", + "Avg Input /sec", "Avg Process /sec", "Lastest Batch", "Error") } val completedQueries = - findAll(cssSelector("""#completedQueries-table td""")).map(_.text).toSeq + findAll(cssSelector("""#completed-table td""")).map(_.text).toSeq completedQueries should contain(completedQuery.id.toString) completedQueries should contain(completedQuery.runId.toString) completedQueries should contain(failedQuery.id.toString) @@ -113,7 +115,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B // Check the query statistics page val activeQueryLink = - findAll(cssSelector("""#activeQueries-table a""")).flatMap(_.attribute("href")).next + findAll(cssSelector("""#active-table td a""")).flatMap(_.attribute("href")).next go to activeQueryLink findAll(cssSelector("h3")) From c5cc954c009ce557299830e699cd844616f164b6 Mon Sep 17 00:00:00 2001 From: iRakson Date: Mon, 11 May 2020 12:52:30 +0530 Subject: [PATCH 2/3] fix --- .../spark/sql/streaming/ui/StreamingQueryPage.scala | 10 +++++----- .../spark/sql/streaming/ui/UISeleniumSuite.scala | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index 89409616a194..4a9986410805 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -159,11 +159,11 @@ class StreamingQueryPagedTable( override def pageLink(page: Int): String = { parameterPath + - s"&$pageNumberFormField=$page" + - s"&$tableTag.sort=$encodedSortColumn" + - s"&$tableTag.desc=$sortDesc" + - s"&$pageSizeFormField=$pageSize" + - s"#$tableTag" + s"&$pageNumberFormField=$page" + + s"&$tableTag.sort=$encodedSortColumn" + + s"&$tableTag.desc=$sortDesc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableTag" } override def goButtonFormPath: String = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala index f87dfb493408..db039538f21b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -86,7 +86,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B case _: StreamingQueryException => } - eventually(timeout(300.seconds), interval(100.milliseconds)) { + eventually(timeout(30.seconds), interval(100.milliseconds)) { // Check the query list page goToUi(spark, "/StreamingQuery") From 088c55186919e9c671942694c2f16933cc806a9d Mon Sep 17 00:00:00 2001 From: iRakson Date: Fri, 22 May 2020 00:09:48 +0530 Subject: [PATCH 3/3] rebase and update --- .../sql/streaming/ui/StreamingQueryPage.scala | 108 +++--------------- .../sql/streaming/ui/UISeleniumSuite.scala | 4 +- 2 files changed, 20 insertions(+), 92 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index 4a9986410805..b969e41e4e55 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -21,9 +21,8 @@ import java.net.URLEncoder import java.nio.charset.StandardCharsets.UTF_8 import javax.servlet.http.HttpServletRequest -import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.xml.{Node, Unparsed} +import scala.xml.Node import org.apache.commons.text.StringEscapeUtils @@ -88,23 +87,7 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) tableTag: String): Seq[Node] = { val isActive = if (tableTag.contains("active")) true else false - val parameterOtherTable = request.getParameterMap.asScala - .filterNot(_._1.startsWith(tableTag)) - .map { case (name, vals) => - name + "=" + vals(0) - } - - val parameterPage = request.getParameter(s"$tableTag.page") - val parameterSortColumn = request.getParameter(s"$tableTag.sort") - val parameterSortDesc = request.getParameter(s"$tableTag.desc") - val parameterPageSize = request.getParameter(s"$tableTag.pageSize") - - val page = Option(parameterPage).map(_.toInt).getOrElse(1) - val sortColumn = Option(parameterSortColumn).map { sortColumn => - SparkUIUtils.decodeURLParameter(sortColumn) - }.getOrElse("Start Time") - val sortDesc = Option(parameterSortDesc).map(_.toBoolean).getOrElse(sortColumn == "Start Time") - val pageSize = Option(parameterPageSize).map(_.toInt).getOrElse(100) + val page = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1) try { new StreamingQueryPagedTable( @@ -112,11 +95,7 @@ private[ui] class StreamingQueryPage(parent: StreamingQueryTab) parent, data, tableTag, - pageSize, - sortColumn, - sortDesc, isActive, - parameterOtherTable, SparkUIUtils.prependBaseUri(request, parent.basePath), "StreamingQuery" ).table(page) @@ -137,15 +116,12 @@ class StreamingQueryPagedTable( parent: StreamingQueryTab, data: Seq[StreamingQueryUIData], tableTag: String, - pageSize: Int, - sortColumn: String, - sortDesc: Boolean, isActive: Boolean, - parameterOtherTable: Iterable[String], basePath: String, subPath: String) extends PagedTable[StructuredStreamingRow] { - private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" + private val (sortColumn, sortDesc, pageSize) = getTableParameters(request, tableTag, "Start Time") + private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}" private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name()) override def tableId: String = s"$tableTag-table" @@ -173,75 +149,27 @@ class StreamingQueryPagedTable( new StreamingQueryDataSource(data, sortColumn, sortDesc, pageSize, isActive) override def headers: Seq[Node] = { - val headerAndCss: Seq[(String, Boolean)] = { + val headerAndCss: Seq[(String, Boolean, Option[String])] = { Seq( - ("Name", true), - ("Status", false), - ("ID", true), - ("Run ID", true), - ("Start Time", true), - ("Duration", false), - ("Avg Input /sec", false), - ("Avg Process /sec", false), - ("Lastest Batch", true)) ++ { + ("Name", true, None), + ("Status", false, None), + ("ID", true, None), + ("Run ID", true, None), + ("Start Time", true, None), + ("Duration", true, None), + ("Avg Input /sec", true, None), + ("Avg Process /sec", true, None), + ("Latest Batch", true, None)) ++ { if (!isActive) { - Seq(("Error", false)) + Seq(("Error", false, None)) } else { Nil } } } + isSortColumnValid(headerAndCss, sortColumn) - val sortableColumnHeaders = headerAndCss.filter { - case (_, sortable) => sortable - }.map { case (title, _) => title } - - // sort column must be one of sortable columns of the table - require(sortableColumnHeaders.contains(sortColumn), - s"Sorting is not allowed on this column: $sortColumn") - - val headerRow: Seq[Node] = { - headerAndCss.map { case (header, sortable) => - if (header == sortColumn) { - val headerLink = Unparsed( - parameterPath + - s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&$tableTag.desc=${!sortDesc}" + - s"&$tableTag.pageSize=$pageSize" + - s"#$tableTag") - val arrow = if (sortDesc) "▾" else "▴" - - - - - {header} {Unparsed(arrow)} - - - - } else { - if (sortable) { - val headerLink = Unparsed( - parameterPath + - s"&$tableTag.sort=${URLEncoder.encode(header, UTF_8.name())}" + - s"&$tableTag.pageSize=$pageSize" + - s"#$tableTag") - - - - {header} - - - } else { - - {header} - - } - } - } - } - - {headerRow} - + headerRow(headerAndCss, sortDesc, pageSize, sortColumn, parameterPath, tableTag, tableTag) } override def row(query: StructuredStreamingRow): Seq[Node] = { @@ -322,7 +250,7 @@ class StreamingQueryDataSource(uiData: Seq[StreamingQueryUIData], sortColumn: St case "Duration" => Ordering.by(_.duration) case "Avg Input /sec" => Ordering.by(_.avgInput) case "Avg Process /sec" => Ordering.by(_.avgProcess) - case "Lastest Batch" => Ordering.by(_.streamingUIData.lastProgress.batchId) + case "Latest Batch" => Ordering.by(_.streamingUIData.lastProgress.batchId) case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn") } if (desc) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala index db039538f21b..63b5792ebd51 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -95,7 +95,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B val arrow = 0x25BE.toChar findAll(cssSelector("""#active-table th""")).map(_.text).toList should be { List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration", - "Avg Input /sec", "Avg Process /sec", "Lastest Batch") + "Avg Input /sec", "Avg Process /sec", "Latest Batch") } val activeQueries = findAll(cssSelector("""#active-table td""")).map(_.text).toSeq @@ -104,7 +104,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B findAll(cssSelector("""#completed-table th""")) .map(_.text).toList should be { List("Name", "Status", "ID", "Run ID", s"Start Time $arrow", "Duration", - "Avg Input /sec", "Avg Process /sec", "Lastest Batch", "Error") + "Avg Input /sec", "Avg Process /sec", "Latest Batch", "Error") } val completedQueries = findAll(cssSelector("""#completed-table td""")).map(_.text).toSeq