diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
similarity index 100%
rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.css
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.css
diff --git a/streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
similarity index 100%
rename from streaming/src/main/resources/org/apache/spark/streaming/ui/static/streaming-page.js
rename to core/src/main/resources/org/apache/spark/ui/static/streaming-page.js
diff --git a/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
new file mode 100644
index 0000000000000..70250fdbd2d0c
--- /dev/null
+++ b/core/src/main/resources/org/apache/spark/ui/static/structured-streaming-page.js
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// pre-define some colors for legends.
+var colorPool = ["#F8C471", "#F39C12", "#B9770E", "#73C6B6", "#16A085", "#117A65", "#B2BABB", "#7F8C8D", "#616A6B"];
+
+function drawAreaStack(id, labels, values, minX, maxX, minY, maxY) {
+ d3.select(d3.select(id).node().parentNode)
+ .style("padding", "8px 0 8px 8px")
+ .style("border-right", "0px solid white");
+
+ // Setup svg using Bostock's margin convention
+ var margin = {top: 20, right: 40, bottom: 30, left: maxMarginLeftForTimeline};
+ var width = 850 - margin.left - margin.right;
+ var height = 300 - margin.top - margin.bottom;
+
+ var svg = d3.select(id)
+ .append("svg")
+ .attr("width", width + margin.left + margin.right)
+ .attr("height", height + margin.top + margin.bottom)
+ .append("g")
+ .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
+
+ var data = values;
+
+ var parse = d3.time.format("%H:%M:%S.%L").parse;
+
+ // Transpose the data into layers
+ var dataset = d3.layout.stack()(labels.map(function(fruit) {
+ return data.map(function(d) {
+ return {_x: d.x, x: parse(d.x), y: +d[fruit]};
+ });
+ }));
+
+
+ // Set x, y and colors
+ var x = d3.scale.ordinal()
+ .domain(dataset[0].map(function(d) { return d.x; }))
+ .rangeRoundBands([10, width-10], 0.02);
+
+ var y = d3.scale.linear()
+ .domain([0, d3.max(dataset, function(d) { return d3.max(d, function(d) { return d.y0 + d.y; }); })])
+ .range([height, 0]);
+
+ var colors = colorPool.slice(0, labels.length)
+
+ // Define and draw axes
+ var yAxis = d3.svg.axis()
+ .scale(y)
+ .orient("left")
+ .ticks(7)
+ .tickFormat( function(d) { return d } );
+
+ var xAxis = d3.svg.axis()
+ .scale(x)
+ .orient("bottom")
+ .tickFormat(d3.time.format("%H:%M:%S.%L"));
+
+ // Only show the first and last time in the graph
+ var xline = []
+ xline.push(x.domain()[0])
+ xline.push(x.domain()[x.domain().length - 1])
+ xAxis.tickValues(xline);
+
+ svg.append("g")
+ .attr("class", "y axis")
+ .call(yAxis)
+ .append("text")
+ .attr("transform", "translate(0," + unitLabelYOffset + ")")
+ .text("ms");
+
+ svg.append("g")
+ .attr("class", "x axis")
+ .attr("transform", "translate(0," + height + ")")
+ .call(xAxis);
+
+ // Create groups for each series, rects for each segment
+ var groups = svg.selectAll("g.cost")
+ .data(dataset)
+ .enter().append("g")
+ .attr("class", "cost")
+ .style("fill", function(d, i) { return colors[i]; });
+
+ var rect = groups.selectAll("rect")
+ .data(function(d) { return d; })
+ .enter()
+ .append("rect")
+ .attr("x", function(d) { return x(d.x); })
+ .attr("y", function(d) { return y(d.y0 + d.y); })
+ .attr("height", function(d) { return y(d.y0) - y(d.y0 + d.y); })
+ .attr("width", x.rangeBand())
+ .on('mouseover', function(d) {
+ var tip = '';
+ var idx = 0;
+ var _values = timeToValues[d._x]
+ _values.forEach(function (k) {
+ tip += labels[idx] + ': ' + k + ' ';
+ idx += 1;
+ });
+ tip += " at " + d._x
+ showBootstrapTooltip(d3.select(this).node(), tip);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ })
+ .on("mousemove", function(d) {
+ var xPosition = d3.mouse(this)[0] - 15;
+ var yPosition = d3.mouse(this)[1] - 25;
+ tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")");
+ tooltip.select("text").text(d.y);
+ });
+
+
+ // Draw legend
+ var legend = svg.selectAll(".legend")
+ .data(colors)
+ .enter().append("g")
+ .attr("class", "legend")
+ .attr("transform", function(d, i) { return "translate(30," + i * 19 + ")"; });
+
+ legend.append("rect")
+ .attr("x", width - 20)
+ .attr("width", 18)
+ .attr("height", 18)
+ .style("fill", function(d, i) {return colors.slice().reverse()[i];})
+ .on('mouseover', function(d, i) {
+ var len = labels.length
+ showBootstrapTooltip(d3.select(this).node(), labels[len - 1 - i]);
+ })
+ .on('mouseout', function() {
+ hideBootstrapTooltip(d3.select(this).node());
+ })
+ .on("mousemove", function(d) {
+ var xPosition = d3.mouse(this)[0] - 15;
+ var yPosition = d3.mouse(this)[1] - 25;
+ tooltip.attr("transform", "translate(" + xPosition + "," + yPosition + ")");
+ tooltip.select("text").text(d.y);
+ });
+
+ // Prep the tooltip bits, initial display is hidden
+ var tooltip = svg.append("g")
+ .attr("class", "tooltip")
+ .style("display", "none");
+
+ tooltip.append("rect")
+ .attr("width", 30)
+ .attr("height", 20)
+ .attr("fill", "white")
+ .style("opacity", 0.5);
+
+ tooltip.append("text")
+ .attr("x", 15)
+ .attr("dy", "1.2em")
+ .style("text-anchor", "middle")
+ .attr("font-size", "12px")
+ .attr("font-weight", "bold");
+}
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index fac464e1353cd..0ba461f02317f 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -90,6 +90,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-sessionstat','aggregated-sessionstat');
collapseTablePageLoad('collapse-aggregated-sqlstat','aggregated-sqlstat');
collapseTablePageLoad('collapse-aggregated-sqlsessionstat','aggregated-sqlsessionstat');
+ collapseTablePageLoad('collapse-aggregated-activeQueries','aggregated-activeQueries');
+ collapseTablePageLoad('collapse-aggregated-completedQueries','aggregated-completedQueries');
});
$(function() {
diff --git a/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala
new file mode 100644
index 0000000000000..87ff677514461
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/GraphUIData.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import java.{util => ju}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.xml.{Node, Unparsed}
+
+/**
+ * A helper class to generate JavaScript and HTML for both timeline and histogram graphs.
+ *
+ * @param timelineDivId the timeline `id` used in the html `div` tag
+ * @param histogramDivId the timeline `id` used in the html `div` tag
+ * @param data the data for the graph
+ * @param minX the min value of X axis
+ * @param maxX the max value of X axis
+ * @param minY the min value of Y axis
+ * @param maxY the max value of Y axis
+ * @param unitY the unit of Y axis
+ * @param batchInterval if `batchInterval` is not None, we will draw a line for `batchInterval` in
+ * the graph
+ */
+private[spark] class GraphUIData(
+ timelineDivId: String,
+ histogramDivId: String,
+ data: Seq[(Long, Double)],
+ minX: Long,
+ maxX: Long,
+ minY: Double,
+ maxY: Double,
+ unitY: String,
+ batchInterval: Option[Double] = None) {
+
+ private var dataJavaScriptName: String = _
+
+ def generateDataJs(jsCollector: JsCollector): Unit = {
+ val jsForData = data.map { case (x, y) =>
+ s"""{"x": $x, "y": $y}"""
+ }.mkString("[", ",", "]")
+ dataJavaScriptName = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+ }
+
+ def generateTimelineHtml(jsCollector: JsCollector): Seq[Node] = {
+ jsCollector.addPreparedStatement(s"registerTimeline($minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawTimeline(" +
+ s"'#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY, '$unitY'," +
+ s" ${batchInterval.get}" +
+ ");")
+ } else {
+ jsCollector.addStatement(
+ s"drawTimeline('#$timelineDivId', $dataJavaScriptName, $minX, $maxX, $minY, $maxY," +
+ s" '$unitY');")
+ }
+
+ }
+
+ def generateHistogramHtml(jsCollector: JsCollector): Seq[Node] = {
+ val histogramData = s"$dataJavaScriptName.map(function(d) { return d.y; })"
+ jsCollector.addPreparedStatement(s"registerHistogram($histogramData, $minY, $maxY);")
+ if (batchInterval.isDefined) {
+ jsCollector.addStatement(
+ "drawHistogram(" +
+ s"'#$histogramDivId', $histogramData, $minY, $maxY, '$unitY', ${batchInterval.get}" +
+ ");")
+ } else {
+ jsCollector.addStatement(
+ s"drawHistogram('#$histogramDivId', $histogramData, $minY, $maxY, '$unitY');")
+ }
+
+ }
+
+ def generateAreaStackHtmlWithData(
+ jsCollector: JsCollector,
+ values: Array[(Long, ju.Map[String, JLong])]): Seq[Node] = {
+ val operationLabels = values.flatMap(_._2.keySet().asScala).toSet
+ val durationDataPadding = UIUtils.durationDataPadding(values)
+ val jsForData = durationDataPadding.map { case (x, y) =>
+ val s = y.toSeq.sortBy(_._1).map(e => s""""${e._1}": "${e._2}"""").mkString(",")
+ s"""{x: "${UIUtils.formatBatchTime(x, 1, showYYYYMMSS = false)}", $s}"""
+ }.mkString("[", ",", "]")
+ val jsForLabels = operationLabels.toSeq.sorted.mkString("[\"", "\",\"", "\"]")
+
+ val (maxX, minX, maxY, minY) = if (values != null && values.length > 0) {
+ val xValues = values.map(_._1.toLong)
+ val yValues = values.map(_._2.asScala.toSeq.map(_._2.toLong).sum)
+ (xValues.max, xValues.min, yValues.max, yValues.min)
+ } else {
+ (0L, 0L, 0L, 0L)
+ }
+
+ dataJavaScriptName = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $dataJavaScriptName = $jsForData;")
+ val labels = jsCollector.nextVariableName
+ jsCollector.addPreparedStatement(s"var $labels = $jsForLabels;")
+ jsCollector.addStatement(
+ s"drawAreaStack('#$timelineDivId', $labels, $dataJavaScriptName, $minX, $maxX, $minY, $maxY)")
+
+ }
+}
+
+/**
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
+ * DOM has finished loading.
+ */
+private[spark] class JsCollector {
+
+ private var variableId = 0
+
+ /**
+ * Return the next unused JavaScript variable name
+ */
+ def nextVariableName: String = {
+ variableId += 1
+ "v" + variableId
+ }
+
+ /**
+ * JavaScript statements that will execute before `statements`
+ */
+ private val preparedStatements = ArrayBuffer[String]()
+
+ /**
+ * JavaScript statements that will execute after `preparedStatements`
+ */
+ private val statements = ArrayBuffer[String]()
+
+ def addPreparedStatement(js: String): Unit = {
+ preparedStatements += js
+ }
+
+ def addStatement(js: String): Unit = {
+ statements += js
+ }
+
+ /**
+ * Generate a html snippet that will execute all scripts when the DOM has finished loading.
+ */
+ def toHtml: Seq[Node] = {
+ val js =
+ s"""
+ |$$(document).ready(function() {
+ | ${preparedStatements.mkString("\n")}
+ | ${statements.mkString("\n")}
+ |});""".stripMargin
+
+
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 143303df0d10e..94c45215b5ff2 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui
+import java.{util => ju}
+import java.lang.{Long => JLong}
import java.net.URLDecoder
import java.nio.charset.StandardCharsets.UTF_8
import java.text.SimpleDateFormat
@@ -24,6 +26,7 @@ import java.util.{Date, Locale, TimeZone}
import javax.servlet.http.HttpServletRequest
import javax.ws.rs.core.{MediaType, Response}
+import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import scala.xml._
import scala.xml.transform.{RewriteRule, RuleTransformer}
@@ -119,6 +122,59 @@ private[spark] object UIUtils extends Logging {
}
}
+ // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
+ private val batchTimeFormat = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.US)
+ }
+
+ private val batchTimeFormatWithMilliseconds = new ThreadLocal[SimpleDateFormat]() {
+ override def initialValue(): SimpleDateFormat =
+ new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS", Locale.US)
+ }
+
+ /**
+ * If `batchInterval` is less than 1 second, format `batchTime` with milliseconds. Otherwise,
+ * format `batchTime` without milliseconds.
+ *
+ * @param batchTime the batch time to be formatted
+ * @param batchInterval the batch interval
+ * @param showYYYYMMSS if showing the `yyyy/MM/dd` part. If it's false, the return value wll be
+ * only `HH:mm:ss` or `HH:mm:ss.SSS` depending on `batchInterval`
+ * @param timezone only for test
+ */
+ def formatBatchTime(
+ batchTime: Long,
+ batchInterval: Long,
+ showYYYYMMSS: Boolean = true,
+ timezone: TimeZone = null): String = {
+ val oldTimezones =
+ (batchTimeFormat.get.getTimeZone, batchTimeFormatWithMilliseconds.get.getTimeZone)
+ if (timezone != null) {
+ batchTimeFormat.get.setTimeZone(timezone)
+ batchTimeFormatWithMilliseconds.get.setTimeZone(timezone)
+ }
+ try {
+ val formattedBatchTime =
+ if (batchInterval < 1000) {
+ batchTimeFormatWithMilliseconds.get.format(batchTime)
+ } else {
+ // If batchInterval >= 1 second, don't show milliseconds
+ batchTimeFormat.get.format(batchTime)
+ }
+ if (showYYYYMMSS) {
+ formattedBatchTime
+ } else {
+ formattedBatchTime.substring(formattedBatchTime.indexOf(' ') + 1)
+ }
+ } finally {
+ if (timezone != null) {
+ batchTimeFormat.get.setTimeZone(oldTimezones._1)
+ batchTimeFormatWithMilliseconds.get.setTimeZone(oldTimezones._2)
+ }
+ }
+ }
+
/** Generate a human-readable string representing a number (e.g. 100 K) */
def formatNumber(records: Double): String = {
val trillion = 1e12
@@ -572,4 +628,39 @@ private[spark] object UIUtils extends Logging {
def buildErrorResponse(status: Response.Status, msg: String): Response = {
Response.status(status).entity(msg).`type`(MediaType.TEXT_PLAIN).build()
}
+
+ /**
+ * There may be different duration labels in each batch. So we need to
+ * mark those missing duration label as '0d' to avoid UI rending error.
+ */
+ def durationDataPadding(
+ values: Array[(Long, ju.Map[String, JLong])]): Array[(Long, Map[String, Double])] = {
+ val operationLabels = values.flatMap(_._2.keySet().asScala).toSet
+ values.map { case (xValue, yValue) =>
+ val dataPadding = operationLabels.map { opLabel =>
+ if (yValue.containsKey(opLabel)) {
+ (opLabel, yValue.get(opLabel).toDouble)
+ } else {
+ (opLabel, 0d)
+ }
+ }
+ (xValue, dataPadding.toMap)
+ }
+ }
+
+ def detailsUINode(isMultiline: Boolean, message: String): Seq[Node] = {
+ if (isMultiline) {
+ // scalastyle:off
+
+ +details
+ ++
+
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b6ec869c425e6..5a1a41edb0a59 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -477,7 +477,10 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegressionModel.setPredictionCol"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setFeaturesCol"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setLabelCol"),
- ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol")
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.regression.AFTSurvivalRegression.setPredictionCol"),
+
+ // [SPARK-29543][SS][UI] Init structured streaming ui
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.this")
)
// Exclude rules for 2.4.x
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index c2798d36d7769..70799817bffc5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1127,6 +1127,18 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val STREAMING_UI_ENABLED =
+ buildConf("spark.sql.streaming.ui.enabled")
+ .doc("Whether to run the structured streaming UI for the Spark application.")
+ .booleanConf
+ .createWithDefault(true)
+
+ val STREAMING_UI_INACTIVE_QUERY_RETENTION =
+ buildConf("spark.sql.streaming.ui.numInactiveQueries")
+ .doc("The number of inactive queries to retain for structured streaming ui.")
+ .intConf
+ .createWithDefault(100)
+
val VARIABLE_SUBSTITUTE_ENABLED =
buildConf("spark.sql.variable.substitute")
.doc("This enables substitution using syntax like ${var} ${system:var} and ${env:var}.")
@@ -2249,6 +2261,10 @@ class SQLConf extends Serializable with Logging {
def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)
+ def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)
+
+ def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)
+
def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
def fileSinkLogDeletion: Boolean = getConf(FILE_SINK_LOG_DELETION)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 71bcd53435850..f20291e11fd70 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -148,8 +148,8 @@ trait ProgressReporter extends Logging {
currentTriggerEndTimestamp = triggerClock.getTimeMillis()
val executionStats = extractExecutionStats(hasNewData)
- val processingTimeSec = Math.max(1L,
- currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
+ val processingTimeMills = currentTriggerEndTimestamp - currentTriggerStartTimestamp
+ val processingTimeSec = Math.max(1L, processingTimeMills).toDouble / MILLIS_PER_SECOND
val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
(currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND
@@ -181,6 +181,7 @@ trait ProgressReporter extends Logging {
name = name,
timestamp = formatTimestamp(currentTriggerStartTimestamp),
batchId = currentBatchId,
+ batchDuration = processingTimeMills,
durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava),
eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
stateOperators = executionStats.stateOperators.toArray,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 6dff5c6f26ee7..ed908a8bad483 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -307,7 +307,8 @@ abstract class StreamExecution(
}
// `postEvent` does not throw non fatal exception.
- postEvent(new QueryStartedEvent(id, runId, name))
+ val submissionTime = triggerClock.getTimeMillis()
+ postEvent(new QueryStartedEvent(id, runId, name, submissionTime))
// Unblock starting thread
startLatch.countDown()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index de3805e105802..fefd72dcf1752 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.internal
import java.net.URL
-import java.util.{Locale, UUID}
+import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.annotation.concurrent.GuardedBy
@@ -36,6 +36,8 @@ import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.ui.{SQLAppStatusListener, SQLAppStatusStore, SQLTab}
import org.apache.spark.sql.internal.StaticSQLConf._
+import org.apache.spark.sql.streaming.StreamingQueryListener
+import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, StreamingQueryTab}
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.Utils
@@ -138,6 +140,21 @@ private[sql] class SharedState(
statusStore
}
+ /**
+ * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
+ * data to show.
+ */
+ lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
+ val sqlConf = SQLConf.get
+ if (sqlConf.isStreamingUIEnabled) {
+ val statusListener = new StreamingQueryStatusListener(sqlConf)
+ sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
+ Some(statusListener)
+ } else {
+ None
+ }
+ }
+
/**
* A catalog that interacts with external systems.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index cc81cf6cfafec..dd842cd1a3e99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -82,13 +82,15 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
+ * @param submissionTime The timestamp to start a query.
* @since 2.1.0
*/
@Evolving
class QueryStartedEvent private[sql](
val id: UUID,
val runId: UUID,
- val name: String) extends Event
+ val name: String,
+ val submissionTime: Long) extends Event
/**
* Event representing any progress updates in a query.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 810f4a182fca6..4d0d8ffd959c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
import org.apache.spark.sql.connector.catalog.{SupportsWrite, Table}
@@ -37,7 +38,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
-import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
@@ -68,6 +69,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
logInfo(s"Registered listener ${listener.getClass.getName}")
})
}
+ sparkSession.sharedState.streamingQueryStatusListener.foreach { listener =>
+ addListener(listener)
+ }
} catch {
case e: Exception =>
throw new SparkException("Exception when registering StreamingQueryListener", e)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index a9681dbd0c676..13b506b60a126 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -85,6 +85,7 @@ class StateOperatorProgress private[sql](
* case of retries after a failure a given batchId my be executed more than once.
* Similarly, when there is no data to be processed, the batchId will not be
* incremented.
+ * @param batchDuration The process duration of each batch.
* @param durationMs The amount of time taken to perform various operations in milliseconds.
* @param eventTime Statistics of event time seen in this batch. It may contain the following keys:
* {{{
@@ -105,6 +106,7 @@ class StreamingQueryProgress private[sql](
val name: String,
val timestamp: String,
val batchId: Long,
+ val batchDuration: Long,
val durationMs: ju.Map[String, JLong],
val eventTime: ju.Map[String, String],
val stateOperators: Array[StateOperatorProgress],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
new file mode 100644
index 0000000000000..650f64fe1688c
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.text.SimpleDateFormat
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.commons.lang3.StringEscapeUtils
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
+import org.apache.spark.sql.streaming.ui.UIUtils._
+import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
+
+private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
+ extends WebUIPage("") with Logging {
+ val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ df.setTimeZone(getTimeZone("UTC"))
+
+ override def render(request: HttpServletRequest): Seq[Node] = {
+ val content = generateStreamingQueryTable(request)
+ SparkUIUtils.headerSparkPage(request, "Streaming Query", content, parent)
+ }
+
+ def generateDataRow(request: HttpServletRequest, queryActive: Boolean)
+ (query: StreamingQueryUIData): Seq[Node] = {
+
+ def details(detail: Any): Seq[Node] = {
+ if (queryActive) {
+ return Seq.empty[Node]
+ }
+ val detailString = detail.asInstanceOf[String]
+ val isMultiline = detailString.indexOf('\n') >= 0
+ val summary = StringEscapeUtils.escapeHtml4(
+ if (isMultiline) detailString.substring(0, detailString.indexOf('\n')) else detailString
+ )
+ val details = SparkUIUtils.detailsUINode(isMultiline, detailString)
+
{summary}{details}
+ }
+
+ val statisticsLink = "%s/%s/statistics?id=%s"
+ .format(SparkUIUtils.prependBaseUri(request, parent.basePath), parent.prefix, query.runId)
+
+ val name = UIUtils.getQueryName(query)
+ val status = UIUtils.getQueryStatus(query)
+ val duration = if (queryActive) {
+ SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
+ } else {
+ withNoProgress(query, {
+ val endTimeMs = query.lastProgress.timestamp
+ SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime)
+ }, "-")
+ }
+
+
+ // scalastyle:on
+
+ generateVar(operationDurationData) ++ generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
new file mode 100644
index 0000000000000..db085dbe87ec4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.text.SimpleDateFormat
+import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress}
+
+/**
+ * A customized StreamingQueryListener used in structured streaming UI, which contains all
+ * UI data for both active and inactive query.
+ * TODO: Add support for history server.
+ */
+private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
+
+ private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
+ timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
+
+ /**
+ * We use runId as the key here instead of id in active query status map,
+ * because the runId is unique for every started query, even it its a restart.
+ */
+ private[ui] val activeQueryStatus = new ConcurrentHashMap[UUID, StreamingQueryUIData]()
+ private[ui] val inactiveQueryStatus = new mutable.Queue[StreamingQueryUIData]()
+
+ private val streamingProgressRetention = sqlConf.streamingProgressRetention
+ private val inactiveQueryStatusRetention = sqlConf.streamingUIInactiveQueryRetention
+
+ override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
+ activeQueryStatus.putIfAbsent(event.runId,
+ new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime))
+ }
+
+ override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
+ val batchTimestamp = timestampFormat.parse(event.progress.timestamp).getTime
+ val queryStatus = activeQueryStatus.getOrDefault(
+ event.progress.runId,
+ new StreamingQueryUIData(event.progress.name, event.progress.id, event.progress.runId,
+ batchTimestamp))
+ queryStatus.updateProcess(event.progress, streamingProgressRetention)
+ }
+
+ override def onQueryTerminated(
+ event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
+ val queryStatus = activeQueryStatus.remove(event.runId)
+ if (queryStatus != null) {
+ queryStatus.queryTerminated(event)
+ inactiveQueryStatus += queryStatus
+ while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
+ inactiveQueryStatus.dequeue()
+ }
+ }
+ }
+
+ def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
+ activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
+ }
+}
+
+/**
+ * This class contains all message related to UI display, each instance corresponds to a single
+ * [[org.apache.spark.sql.streaming.StreamingQuery]].
+ */
+private[ui] class StreamingQueryUIData(
+ val name: String,
+ val id: UUID,
+ val runId: UUID,
+ val submissionTime: Long) {
+
+ /** Holds the most recent query progress updates. */
+ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+ private var _isActive = true
+ private var _exception: Option[String] = None
+
+ def isActive: Boolean = synchronized { _isActive }
+
+ def exception: Option[String] = synchronized { _exception }
+
+ def queryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
+ _isActive = false
+ _exception = event.exception
+ }
+
+ def updateProcess(
+ newProgress: StreamingQueryProgress, retentionNum: Int): Unit = progressBuffer.synchronized {
+ progressBuffer += newProgress
+ while (progressBuffer.length >= retentionNum) {
+ progressBuffer.dequeue()
+ }
+ }
+
+ def recentProgress: Array[StreamingQueryProgress] = progressBuffer.synchronized {
+ progressBuffer.toArray
+ }
+
+ def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+ progressBuffer.lastOption.orNull
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
new file mode 100644
index 0000000000000..f909cfd97514e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming.ui
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+private[sql] class StreamingQueryTab(
+ val statusListener: StreamingQueryStatusListener,
+ sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
+
+ override val name = "Structured Streaming"
+
+ val parent = sparkUI
+
+ attachPage(new StreamingQueryPage(this))
+ attachPage(new StreamingQueryStatisticsPage(this))
+ parent.attachTab(this)
+
+ parent.addStaticHandler(StreamingQueryTab.STATIC_RESOURCE_DIR, "/static/sql")
+}
+
+object StreamingQueryTab {
+ private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
new file mode 100644
index 0000000000000..57b9dec81f28a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/UIUtils.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+private[ui] object UIUtils {
+
+ /**
+ * Check whether `number` is valid, if not return 0.0d
+ */
+ def withNumberInvalid(number: => Double): Double = {
+ if (number.isNaN || number.isInfinite) {
+ 0.0d
+ } else {
+ number
+ }
+ }
+
+ /**
+ * Execute a block of code when there is already one completed batch in streaming query,
+ * otherwise return `default` value.
+ */
+ def withNoProgress[T](query: StreamingQueryUIData, body: => T, default: T): T = {
+ if (query.lastProgress != null) {
+ body
+ } else {
+ default
+ }
+ }
+
+ def getQueryName(query: StreamingQueryUIData): String = {
+ if (query.name == null || query.name.isEmpty) {
+ ""
+ } else {
+ query.name
+ }
+ }
+
+ def getQueryStatus(query: StreamingQueryUIData): String = {
+ if (query.isActive) {
+ "RUNNING"
+ } else {
+ query.exception.map(_ => "FAILED").getOrElse("FINISHED")
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 2f66dd3255b11..9d0f829ac9684 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.Waiters.Waiter
@@ -34,6 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.sql.streaming.ui.StreamingQueryStatusListener
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.util.JsonProtocol
@@ -47,7 +47,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
after {
spark.streams.active.foreach(_.stop())
assert(spark.streams.active.isEmpty)
- assert(spark.streams.listListeners().isEmpty)
+ // Skip check default `StreamingQueryStatusListener` which is for streaming UI.
+ assert(spark.streams.listListeners()
+ .filterNot(_.isInstanceOf[StreamingQueryStatusListener]).isEmpty)
// Make sure we don't leak any events to the next test
spark.sparkContext.listenerBus.waitUntilEmpty()
}
@@ -252,8 +254,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(newEvent.name === event.name)
}
- testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name"))
- testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null))
+ testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, "name", 1L))
+ testSerialization(new QueryStartedEvent(UUID.randomUUID, UUID.randomUUID, null, 1L))
}
test("QueryProgressEvent serialization") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index b6a6be2bb0312..6f00b528cb8bd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -312,6 +312,7 @@ object StreamingQueryStatusAndProgressSuite {
name = "myName",
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
+ batchDuration = 0L,
durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava),
eventTime = new java.util.HashMap(Map(
"max" -> "2016-12-05T20:54:20.827Z",
@@ -346,6 +347,7 @@ object StreamingQueryStatusAndProgressSuite {
name = null, // should not be present in the json
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
+ batchDuration = 0L,
durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).asJava),
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4121f499bd69c..77f5c856ff0f4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -466,7 +466,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
val streamingTriggerDF = spark.createDataset(1 to 10).toDF
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
- val progress = getFirstProgress(streamingInputDF.join(streamingInputDF, "value"))
+ val progress = getStreamingQuery(streamingInputDF.join(streamingInputDF, "value"))
+ .recentProgress.head
assert(progress.numInputRows === 20) // data is read multiple times in self-joins
assert(progress.sources.size === 1)
assert(progress.sources(0).numInputRows === 20)
@@ -479,7 +480,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
// Trigger input has 10 rows, static input has 2 rows,
// therefore after the first trigger, the calculated input rows should be 10
- val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value"))
+ val progress = getStreamingQuery(streamingInputDF.join(staticInputDF, "value"))
+ .recentProgress.head
assert(progress.numInputRows === 10)
assert(progress.sources.size === 1)
assert(progress.sources(0).numInputRows === 10)
@@ -492,7 +494,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
// After the first trigger, the calculated input rows should be 10
- val progress = getFirstProgress(streamingInputDF)
+ val progress = getStreamingQuery(streamingInputDF).recentProgress.head
assert(progress.numInputRows === 10)
assert(progress.sources.size === 1)
assert(progress.sources(0).numInputRows === 10)
@@ -1120,12 +1122,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
StreamingExecutionRelation(source, spark)
}
- /** Returns the query progress at the end of the first trigger of streaming DF */
- private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = {
+ /** Returns the query at the end of the first trigger of streaming DF */
+ private def getStreamingQuery(streamingDF: DataFrame): StreamingQuery = {
try {
val q = streamingDF.writeStream.format("memory").queryName("test").start()
q.processAllAvailable()
- q.recentProgress.head
+ q
} finally {
spark.streams.active.map(_.stop())
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
new file mode 100644
index 0000000000000..de43e470e8e13
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.{Locale, UUID}
+import javax.servlet.http.HttpServletRequest
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.BeforeAndAfter
+import scala.xml.Node
+
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+import org.apache.spark.sql.test.SharedSparkSession
+
+class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
+
+ test("correctly display streaming query page") {
+ val id = UUID.randomUUID()
+ val request = mock(classOf[HttpServletRequest])
+ val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
+ val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ when(tab.statusListener).thenReturn(statusListener)
+
+ val streamQuery = createStreamQueryUIData(id)
+ when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+ var html = renderStreamingQueryPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("active streaming queries (1)"))
+ assert(html.contains("completed streaming queries (0)"))
+
+ when(streamQuery.isActive).thenReturn(false)
+ when(streamQuery.exception).thenReturn(None)
+ html = renderStreamingQueryPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("active streaming queries (0)"))
+ assert(html.contains("completed streaming queries (1)"))
+ assert(html.contains("finished"))
+
+ when(streamQuery.isActive).thenReturn(false)
+ when(streamQuery.exception).thenReturn(Option("exception in query"))
+ html = renderStreamingQueryPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("active streaming queries (0)"))
+ assert(html.contains("completed streaming queries (1)"))
+ assert(html.contains("failed"))
+ assert(html.contains("exception in query"))
+ }
+
+ test("correctly display streaming query statistics page") {
+ val id = UUID.randomUUID()
+ val request = mock(classOf[HttpServletRequest])
+ val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
+ val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
+ when(request.getParameter("id")).thenReturn(id.toString)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+ when(tab.statusListener).thenReturn(statusListener)
+
+ val streamQuery = createStreamQueryUIData(id)
+ when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))
+ val html = renderStreamingQueryStatisticsPage(request, tab)
+ .toString().toLowerCase(Locale.ROOT)
+
+ assert(html.contains("name: query<"))
+ assert(html.contains("""{"x": 1001898000100, "y": 10.0}"""))
+ assert(html.contains("""{"x": 1001898000100, "y": 12.0}"""))
+ assert(html.contains("(3 completed batches)"))
+ }
+
+ private def createStreamQueryUIData(id: UUID): StreamingQueryUIData = {
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z")
+ when(progress.inputRowsPerSecond).thenReturn(10.0)
+ when(progress.processedRowsPerSecond).thenReturn(12.0)
+ when(progress.batchId).thenReturn(2)
+ when(progress.prettyJson).thenReturn("""{"a":1}""")
+
+ val streamQuery = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+ when(streamQuery.isActive).thenReturn(true)
+ when(streamQuery.name).thenReturn("query")
+ when(streamQuery.id).thenReturn(id)
+ when(streamQuery.runId).thenReturn(id)
+ when(streamQuery.submissionTime).thenReturn(1L)
+ when(streamQuery.lastProgress).thenReturn(progress)
+ when(streamQuery.recentProgress).thenReturn(Array(progress))
+ when(streamQuery.exception).thenReturn(None)
+
+ streamQuery
+ }
+
+ /**
+ * Render a stage page started with the given conf and return the HTML.
+ * This also runs a dummy execution page to populate the page with useful content.
+ */
+ private def renderStreamingQueryPage(
+ request: HttpServletRequest,
+ tab: StreamingQueryTab): Seq[Node] = {
+ val page = new StreamingQueryPage(tab)
+ page.render(request)
+ }
+
+ private def renderStreamingQueryStatisticsPage(
+ request: HttpServletRequest,
+ tab: StreamingQueryTab): Seq[Node] = {
+ val page = new StreamingQueryStatisticsPage(tab)
+ page.render(request)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
new file mode 100644
index 0000000000000..bd74ed340b408
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import java.util.UUID
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+
+import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryProgress, StreamTest}
+import org.apache.spark.sql.streaming
+
+class StreamingQueryStatusListenerSuite extends StreamTest {
+
+ test("onQueryStarted, onQueryProgress, onQueryTerminated") {
+ val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+
+ // hanlde query started event
+ val id = UUID.randomUUID()
+ val runId = UUID.randomUUID()
+ val startEvent = new StreamingQueryListener.QueryStartedEvent(id, runId, "test", 1L)
+ listener.onQueryStarted(startEvent)
+
+ // result checking
+ assert(listener.activeQueryStatus.size() == 1)
+ assert(listener.activeQueryStatus.get(runId).name == "test")
+
+ // handle query progress event
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(progress.id).thenReturn(id)
+ when(progress.runId).thenReturn(runId)
+ when(progress.timestamp).thenReturn("2001-10-01T01:00:00.100Z")
+ when(progress.inputRowsPerSecond).thenReturn(10.0)
+ when(progress.processedRowsPerSecond).thenReturn(12.0)
+ when(progress.batchId).thenReturn(2)
+ when(progress.prettyJson).thenReturn("""{"a":1}""")
+ val processEvent = new streaming.StreamingQueryListener.QueryProgressEvent(progress)
+ listener.onQueryProgress(processEvent)
+
+ // result checking
+ val activeQuery = listener.activeQueryStatus.get(runId)
+ assert(activeQuery.isActive)
+ assert(activeQuery.recentProgress.length == 1)
+ assert(activeQuery.lastProgress.id == id)
+ assert(activeQuery.lastProgress.runId == runId)
+ assert(activeQuery.lastProgress.timestamp == "2001-10-01T01:00:00.100Z")
+ assert(activeQuery.lastProgress.inputRowsPerSecond == 10.0)
+ assert(activeQuery.lastProgress.processedRowsPerSecond == 12.0)
+ assert(activeQuery.lastProgress.batchId == 2)
+ assert(activeQuery.lastProgress.prettyJson == """{"a":1}""")
+
+ // handle terminate event
+ val terminateEvent = new StreamingQueryListener.QueryTerminatedEvent(id, runId, None)
+ listener.onQueryTerminated(terminateEvent)
+
+ assert(!listener.inactiveQueryStatus.head.isActive)
+ assert(listener.inactiveQueryStatus.head.runId == runId)
+ assert(listener.inactiveQueryStatus.head.id == id)
+ }
+
+ test("same query start multiple times") {
+ val listener = new StreamingQueryStatusListener(spark.sqlContext.conf)
+
+ // handle first time start
+ val id = UUID.randomUUID()
+ val runId0 = UUID.randomUUID()
+ val startEvent0 = new StreamingQueryListener.QueryStartedEvent(id, runId0, "test", 1L)
+ listener.onQueryStarted(startEvent0)
+
+ // handle terminate event
+ val terminateEvent0 = new StreamingQueryListener.QueryTerminatedEvent(id, runId0, None)
+ listener.onQueryTerminated(terminateEvent0)
+
+ // handle second time start
+ val runId1 = UUID.randomUUID()
+ val startEvent1 = new StreamingQueryListener.QueryStartedEvent(id, runId1, "test", 1L)
+ listener.onQueryStarted(startEvent1)
+
+ // result checking
+ assert(listener.activeQueryStatus.size() == 1)
+ assert(listener.inactiveQueryStatus.length == 1)
+ assert(listener.activeQueryStatus.containsKey(runId1))
+ assert(listener.activeQueryStatus.get(runId1).id == id)
+ assert(listener.inactiveQueryStatus.head.runId == runId0)
+ assert(listener.inactiveQueryStatus.head.id == id)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala
new file mode 100644
index 0000000000000..46f2eadc05835
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UIUtilsSuite.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming.ui
+
+import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
+import org.scalatest.Matchers
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+
+class UIUtilsSuite extends SparkFunSuite with Matchers {
+ test("streaming query started with no batch completed") {
+ val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+ when(query.lastProgress).thenReturn(null)
+
+ assert(0 == UIUtils.withNoProgress(query, 1, 0))
+ }
+
+ test("streaming query started with at least one batch completed") {
+ val query = mock(classOf[StreamingQueryUIData], RETURNS_SMART_NULLS)
+ val progress = mock(classOf[StreamingQueryProgress], RETURNS_SMART_NULLS)
+ when(query.lastProgress).thenReturn(progress)
+
+ assert(1 == UIUtils.withNoProgress(query, 1, 0))
+ }
+}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
index adfda0c56585f..890a668275b81 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala
@@ -375,21 +375,7 @@ private[ui] class SqlStatsPagedTable(
} else {
errorMessage
})
- val details = if (isMultiline) {
- // scalastyle:off
-
- + details
- ++
-