diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a36ae7bd35fea..70799817bffc5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1127,6 +1127,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val STREAMING_UI_ENABLED = + buildConf("spark.sql.streaming.ui.enabled") + .doc("Whether to run the structured streaming UI for the Spark application.") + .booleanConf + .createWithDefault(true) + val STREAMING_UI_INACTIVE_QUERY_RETENTION = buildConf("spark.sql.streaming.ui.numInactiveQueries") .doc("The number of inactive queries to retain for structured streaming ui.") @@ -2255,6 +2261,8 @@ class SQLConf extends Serializable with Logging { def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED) + def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED) + def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION) def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 8b52da191ac34..ed908a8bad483 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -307,8 +307,8 @@ abstract class StreamExecution( } // `postEvent` does not throw non fatal exception. - val submitTime = triggerClock.getTimeMillis() - postEvent(new QueryStartedEvent(id, runId, name, submitTime)) + val submissionTime = triggerClock.getTimeMillis() + postEvent(new QueryStartedEvent(id, runId, name, submissionTime)) // Unblock starting thread startLatch.countDown() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 71a626540bd45..fefd72dcf1752 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager @@ -145,14 +144,16 @@ private[sql] class SharedState( * A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui * data to show. */ - lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = - if (conf.get(UI_ENABLED)) { - val statusListener = new StreamingQueryStatusListener(SQLConf.get) + lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = { + val sqlConf = SQLConf.get + if (sqlConf.isStreamingUIEnabled) { + val statusListener = new StreamingQueryStatusListener(sqlConf) sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _)) Some(statusListener) } else { None } + } /** * A catalog that interacts with external systems. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala index 710aaa0f852c2..dd842cd1a3e99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -82,7 +82,7 @@ object StreamingQueryListener { * @param id A unique query id that persists across restarts. See `StreamingQuery.id()`. * @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`. * @param name User-specified name of the query, null if not specified. - * @param submitTime The timestamp to start a query. + * @param submissionTime The timestamp to start a query. * @since 2.1.0 */ @Evolving @@ -90,7 +90,7 @@ object StreamingQueryListener { val id: UUID, val runId: UUID, val name: String, - val submitTime: Long) extends Event + val submissionTime: Long) extends Event /** * Event representing any progress updates in a query. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala index 0fe8190612e4e..650f64fe1688c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPage.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.streaming.ui.UIUtils._ import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} -class StreamingQueryPage(parent: StreamingQueryTab) +private[ui] class StreamingQueryPage(parent: StreamingQueryTab) extends WebUIPage("") with Logging { val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") df.setTimeZone(getTimeZone("UTC")) @@ -61,11 +61,11 @@ class StreamingQueryPage(parent: StreamingQueryTab) val name = UIUtils.getQueryName(query) val status = UIUtils.getQueryStatus(query) val duration = if (queryActive) { - SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submitTime) + SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime) } else { withNoProgress(query, { val endTimeMs = query.lastProgress.timestamp - SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submitTime) + SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime) }, "-") } @@ -74,7 +74,7 @@ class StreamingQueryPage(parent: StreamingQueryTab) {status} {query.id} {query.runId} - {SparkUIUtils.formatDate(query.submitTime)} + {SparkUIUtils.formatDate(query.submissionTime)} {duration} {withNoProgress(query, { (query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum / @@ -94,8 +94,8 @@ class StreamingQueryPage(parent: StreamingQueryTab) .partition(_.isActive) val activeQueryTables = if (activeQueries.nonEmpty) { val headerRow = Seq( - "Query Name", "Status", "Id", "Run ID", "Submit Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Last Batch ID") + "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec", + "Avg Process /sec", "Lastest Batch") Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true), activeQueries, true, None, Seq(null), false)) @@ -105,8 +105,8 @@ class StreamingQueryPage(parent: StreamingQueryTab) val inactiveQueryTables = if (inactiveQueries.nonEmpty) { val headerRow = Seq( - "Query Name", "Status", "Id", "Run ID", "Submit Time", "Duration", "Avg Input /sec", - "Avg Process /sec", "Last Batch ID", "Error") + "Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec", + "Avg Process /sec", "Lastest Batch", "Error") Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false), inactiveQueries, true, None, Seq(null), false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala index 453bdc6bfc99d..56672ce328bff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone import org.apache.spark.sql.streaming.ui.UIUtils._ import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage} -class StreamingQueryStatisticsPage(parent: StreamingQueryTab) +private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) extends WebUIPage("statistics") with Logging { val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") df.setTimeZone(getTimeZone("UTC")) @@ -82,7 +82,7 @@ class StreamingQueryStatisticsPage(parent: StreamingQueryTab) def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = { val duration = if (query.isActive) { - SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submitTime) + SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime) } else { withNoProgress(query, { val end = query.lastProgress.timestamp @@ -100,7 +100,7 @@ class StreamingQueryStatisticsPage(parent: StreamingQueryTab) since - {SparkUIUtils.formatDate(query.submitTime)} + {SparkUIUtils.formatDate(query.submissionTime)} ({numBatches} completed batches) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 8a0a7b247ec1f..db085dbe87ec4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro * UI data for both active and inactive query. * TODO: Add support for history server. */ -class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener { +private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener { private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) @@ -50,7 +50,7 @@ class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListe override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { activeQueryStatus.putIfAbsent(event.runId, - new StreamingQueryUIData(event.name, event.id, event.runId, event.submitTime)) + new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime)) } override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { @@ -62,20 +62,19 @@ class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListe queryStatus.updateProcess(event.progress, streamingProgressRetention) } - override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = { + override def onQueryTerminated( + event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized { val queryStatus = activeQueryStatus.remove(event.runId) if (queryStatus != null) { queryStatus.queryTerminated(event) - inactiveQueryStatus.synchronized { - inactiveQueryStatus += queryStatus - while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) { - inactiveQueryStatus.dequeue() - } + inactiveQueryStatus += queryStatus + while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) { + inactiveQueryStatus.dequeue() } } } - def allQueryStatus: Seq[StreamingQueryUIData] = inactiveQueryStatus.synchronized { + def allQueryStatus: Seq[StreamingQueryUIData] = synchronized { activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus } } @@ -88,7 +87,7 @@ private[ui] class StreamingQueryUIData( val name: String, val id: UUID, val runId: UUID, - val submitTime: Long) { + val submissionTime: Long) { /** Holds the most recent query progress updates. */ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala index 5f6c7236959c2..f909cfd97514e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryTab.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.streaming.ui import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} -class StreamingQueryTab(val statusListener: StreamingQueryStatusListener, sparkUI: SparkUI) - extends SparkUITab(sparkUI, "StreamingQuery") with Logging { +private[sql] class StreamingQueryTab( + val statusListener: StreamingQueryStatusListener, + sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging { override val name = "Structured Streaming" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala index 0daa69dd16ead..de43e470e8e13 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryPageSuite.scala @@ -97,7 +97,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter { when(streamQuery.name).thenReturn("query") when(streamQuery.id).thenReturn(id) when(streamQuery.runId).thenReturn(id) - when(streamQuery.submitTime).thenReturn(1L) + when(streamQuery.submissionTime).thenReturn(1L) when(streamQuery.lastProgress).thenReturn(progress) when(streamQuery.recentProgress).thenReturn(Array(progress)) when(streamQuery.exception).thenReturn(None)