Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,12 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val STREAMING_UI_ENABLED =
buildConf("spark.sql.streaming.ui.enabled")
.doc("Whether to run the structured streaming UI for the Spark application.")
.booleanConf
.createWithDefault(true)

val STREAMING_UI_INACTIVE_QUERY_RETENTION =
buildConf("spark.sql.streaming.ui.numInactiveQueries")
.doc("The number of inactive queries to retain for structured streaming ui.")
Expand Down Expand Up @@ -2255,6 +2261,8 @@ class SQLConf extends Serializable with Logging {

def isUnsupportedOperationCheckEnabled: Boolean = getConf(UNSUPPORTED_OPERATION_CHECK_ENABLED)

def isStreamingUIEnabled: Boolean = getConf(STREAMING_UI_ENABLED)

def streamingUIInactiveQueryRetention: Int = getConf(STREAMING_UI_INACTIVE_QUERY_RETENTION)

def streamingFileCommitProtocolClass: String = getConf(STREAMING_FILE_COMMIT_PROTOCOL_CLASS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,8 @@ abstract class StreamExecution(
}

// `postEvent` does not throw non fatal exception.
val submitTime = triggerClock.getTimeMillis()
postEvent(new QueryStartedEvent(id, runId, name, submitTime))
val submissionTime = triggerClock.getTimeMillis()
postEvent(new QueryStartedEvent(id, runId, name, submissionTime))

// Unblock starting thread
startLatch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.execution.CacheManager
Expand Down Expand Up @@ -145,14 +144,16 @@ private[sql] class SharedState(
* A [[StreamingQueryListener]] for structured streaming ui, it contains all streaming query ui
* data to show.
*/
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] =
if (conf.get(UI_ENABLED)) {
val statusListener = new StreamingQueryStatusListener(SQLConf.get)
lazy val streamingQueryStatusListener: Option[StreamingQueryStatusListener] = {
val sqlConf = SQLConf.get
if (sqlConf.isStreamingUIEnabled) {
val statusListener = new StreamingQueryStatusListener(sqlConf)
sparkContext.ui.foreach(new StreamingQueryTab(statusListener, _))
Some(statusListener)
} else {
None
}
}

/**
* A catalog that interacts with external systems.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ object StreamingQueryListener {
* @param id A unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
* @param submitTime The timestamp to start a query.
* @param submissionTime The timestamp to start a query.
* @since 2.1.0
*/
@Evolving
class QueryStartedEvent private[sql](
val id: UUID,
val runId: UUID,
val name: String,
val submitTime: Long) extends Event
val submissionTime: Long) extends Event

/**
* Event representing any progress updates in a query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}

class StreamingQueryPage(parent: StreamingQueryTab)
private[ui] class StreamingQueryPage(parent: StreamingQueryTab)
extends WebUIPage("") with Logging {
val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
df.setTimeZone(getTimeZone("UTC"))
Expand Down Expand Up @@ -61,11 +61,11 @@ class StreamingQueryPage(parent: StreamingQueryTab)
val name = UIUtils.getQueryName(query)
val status = UIUtils.getQueryStatus(query)
val duration = if (queryActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submitTime)
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
} else {
withNoProgress(query, {
val endTimeMs = query.lastProgress.timestamp
SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submitTime)
SparkUIUtils.formatDurationVerbose(df.parse(endTimeMs).getTime - query.submissionTime)
}, "-")
}

Expand All @@ -74,7 +74,7 @@ class StreamingQueryPage(parent: StreamingQueryTab)
<td> {status} </td>
<td> {query.id} </td>
<td> <a href={statisticsLink}> {query.runId} </a> </td>
<td> {SparkUIUtils.formatDate(query.submitTime)} </td>
<td> {SparkUIUtils.formatDate(query.submissionTime)} </td>
<td> {duration} </td>
<td> {withNoProgress(query, {
(query.recentProgress.map(p => withNumberInvalid(p.inputRowsPerSecond)).sum /
Expand All @@ -94,8 +94,8 @@ class StreamingQueryPage(parent: StreamingQueryTab)
.partition(_.isActive)
val activeQueryTables = if (activeQueries.nonEmpty) {
val headerRow = Seq(
"Query Name", "Status", "Id", "Run ID", "Submit Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Last Batch ID")
"Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch")

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = true),
activeQueries, true, None, Seq(null), false))
Expand All @@ -105,8 +105,8 @@ class StreamingQueryPage(parent: StreamingQueryTab)

val inactiveQueryTables = if (inactiveQueries.nonEmpty) {
val headerRow = Seq(
"Query Name", "Status", "Id", "Run ID", "Submit Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Last Batch ID", "Error")
"Name", "Status", "Id", "Run ID", "Submitted Time", "Duration", "Avg Input /sec",
"Avg Process /sec", "Lastest Batch", "Error")

Some(SparkUIUtils.listingTable(headerRow, generateDataRow(request, queryActive = false),
inactiveQueries, true, None, Seq(null), false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}

class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {
val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
df.setTimeZone(getTimeZone("UTC"))
Expand Down Expand Up @@ -82,7 +82,7 @@ class StreamingQueryStatisticsPage(parent: StreamingQueryTab)

def generateBasicInfo(query: StreamingQueryUIData): Seq[Node] = {
val duration = if (query.isActive) {
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submitTime)
SparkUIUtils.formatDurationVerbose(System.currentTimeMillis() - query.submissionTime)
} else {
withNoProgress(query, {
val end = query.lastProgress.timestamp
Expand All @@ -100,7 +100,7 @@ class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
</strong>
since
<strong>
{SparkUIUtils.formatDate(query.submitTime)}
{SparkUIUtils.formatDate(query.submissionTime)}
</strong>
(<strong>{numBatches}</strong> completed batches)
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.streaming.{StreamingQueryListener, StreamingQueryPro
* UI data for both active and inactive query.
* TODO: Add support for history server.
*/
class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {
private[sql] class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListener {

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
Expand All @@ -50,7 +50,7 @@ class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListe

override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
activeQueryStatus.putIfAbsent(event.runId,
new StreamingQueryUIData(event.name, event.id, event.runId, event.submitTime))
new StreamingQueryUIData(event.name, event.id, event.runId, event.submissionTime))
}

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
Expand All @@ -62,20 +62,19 @@ class StreamingQueryStatusListener(sqlConf: SQLConf) extends StreamingQueryListe
queryStatus.updateProcess(event.progress, streamingProgressRetention)
}

override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
override def onQueryTerminated(
event: StreamingQueryListener.QueryTerminatedEvent): Unit = synchronized {
val queryStatus = activeQueryStatus.remove(event.runId)
if (queryStatus != null) {
queryStatus.queryTerminated(event)
inactiveQueryStatus.synchronized {
inactiveQueryStatus += queryStatus
while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
inactiveQueryStatus.dequeue()
}
inactiveQueryStatus += queryStatus
while (inactiveQueryStatus.length >= inactiveQueryStatusRetention) {
inactiveQueryStatus.dequeue()
}
}
}

def allQueryStatus: Seq[StreamingQueryUIData] = inactiveQueryStatus.synchronized {
def allQueryStatus: Seq[StreamingQueryUIData] = synchronized {
activeQueryStatus.values().asScala.toSeq ++ inactiveQueryStatus
}
}
Expand All @@ -88,7 +87,7 @@ private[ui] class StreamingQueryUIData(
val name: String,
val id: UUID,
val runId: UUID,
val submitTime: Long) {
val submissionTime: Long) {

/** Holds the most recent query progress updates. */
private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package org.apache.spark.sql.streaming.ui
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, SparkUITab}

class StreamingQueryTab(val statusListener: StreamingQueryStatusListener, sparkUI: SparkUI)
extends SparkUITab(sparkUI, "StreamingQuery") with Logging {
private[sql] class StreamingQueryTab(
val statusListener: StreamingQueryStatusListener,
sparkUI: SparkUI) extends SparkUITab(sparkUI, "StreamingQuery") with Logging {

override val name = "Structured Streaming"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
when(streamQuery.name).thenReturn("query")
when(streamQuery.id).thenReturn(id)
when(streamQuery.runId).thenReturn(id)
when(streamQuery.submitTime).thenReturn(1L)
when(streamQuery.submissionTime).thenReturn(1L)
when(streamQuery.lastProgress).thenReturn(progress)
when(streamQuery.recentProgress).thenReturn(Array(progress))
when(streamQuery.exception).thenReturn(None)
Expand Down