diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 58c265d0a850..3cc99d6690fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -46,7 +46,7 @@ class MicroBatchExecution( deleteCheckpointOnStop: Boolean) extends StreamExecution( sparkSession, name, checkpointRoot, analyzedPlan, sink, - trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + trigger, triggerClock, outputMode, deleteCheckpointOnStop) with MicroBatchProgressReporter { @volatile protected var sources: Seq[SparkDataStream] = Seq.empty diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchProgressReporter.scala new file mode 100644 index 000000000000..ad9b81871299 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchProgressReporter.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation} +import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, SparkDataStream} +import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, StreamingQueryProgress} + +trait MicroBatchProgressReporter extends ProgressReporter { + + private val currentDurationsMs = new mutable.HashMap[String, Long]() + private val recordDurationMs = new mutable.HashMap[String, Long]() + + private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _ + private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _ + + // TODO: Restore this from the checkpoint when possible. + private var lastTriggerStartTimestamp = -1L + + // Local timestamps and counters. + private var currentTriggerStartTimestamp = -1L + private var currentTriggerEndTimestamp = -1L + + private val noDataProgressEventInterval = + sparkSession.sessionState.conf.streamingNoDataProgressEventInterval + + // The timestamp we report an event that has no input data + private var lastNoDataProgressEventTime = Long.MinValue + + /** Begins recording statistics about query progress for a given trigger. */ + override protected def startTrigger(): Unit = { + logDebug("Starting Trigger Calculation") + lastTriggerStartTimestamp = currentTriggerStartTimestamp + currentTriggerStartTimestamp = triggerClock.getTimeMillis() + currentTriggerStartOffsets = null + currentTriggerEndOffsets = null + currentDurationsMs.clear() + } + + /** Finalizes the query progress and adds it to list of recent status updates. */ + protected def finishTrigger(hasNewData: Boolean): Unit = { + assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) + currentTriggerEndTimestamp = triggerClock.getTimeMillis() + + val executionStats = extractExecutionStats(hasNewData) + val processingTimeSec = + (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND + + val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { + (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND + } else { + Double.NaN + } + logDebug(s"Execution stats: $executionStats") + + val sourceProgress = sources.distinct.map { source => + val numRecords = executionStats.inputRows.getOrElse(source, 0L) + new SourceProgress( + description = source.toString, + startOffset = currentTriggerStartOffsets.get(source).orNull, + endOffset = currentTriggerEndOffsets.get(source).orNull, + numInputRows = numRecords, + inputRowsPerSecond = numRecords / inputTimeSec, + processedRowsPerSecond = numRecords / processingTimeSec + ) + } + + val sinkProgress = SinkProgress( + sink.toString, + sinkCommitProgress.map(_.numOutputRows)) + + val newProgress = new StreamingQueryProgress( + id = id, + runId = runId, + name = name, + timestamp = formatTimestamp(currentTriggerStartTimestamp), + batchId = currentBatchId, + durationMs = new java.util.HashMap( + (currentDurationsMs ++ recordDurationMs).toMap.mapValues(long2Long).asJava), + eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), + stateOperators = executionStats.stateOperators.toArray, + sources = sourceProgress.toArray, + sink = sinkProgress) + + if (hasNewData) { + // Reset noDataEventTimestamp if we processed any data + lastNoDataProgressEventTime = Long.MinValue + updateProgress(newProgress) + } else { + val now = triggerClock.getTimeMillis() + if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { + lastNoDataProgressEventTime = now + updateProgress(newProgress) + } + } + + currentStatus = currentStatus.copy(isTriggerActive = false) + } + + /** Extract number of input sources for each streaming source in plan */ + protected def extractSourceToNumInputRows(t: Option[Any] = None) + : Map[SparkDataStream, Long] = { + + def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = { + tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source + } + + val onlyDataSourceV2Sources = { + // Check whether the streaming query's logical plan has only V2 micro-batch data sources + val allStreamingLeaves = logicalPlan.collect { + case s: StreamingDataSourceV2Relation => + s.stream.isInstanceOf[MicroBatchStream] || s.stream.isInstanceOf[ContinuousStream] + case _: StreamingExecutionRelation => false + } + allStreamingLeaves.forall(_ == true) + } + + if (onlyDataSourceV2Sources) { + // It's possible that multiple DataSourceV2ScanExec instances may refer to the same source + // (can happen with self-unions or self-joins). This means the source is scanned multiple + // times in the query, we should count the numRows for each scan. + val sourceToInputRowsTuples = lastExecution.executedPlan.collect { + case s: MicroBatchScanExec => + val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L) + val source = s.stream.asInstanceOf[SparkDataStream] + source -> numRows + } + logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t")) + sumRows(sourceToInputRowsTuples) + } else { + + // Since V1 source do not generate execution plan leaves that directly link with source that + // generated it, we can only do a best-effort association between execution plan leaves to the + // sources. This is known to fail in a few cases, see SPARK-24050. + // + // We want to associate execution plan leaves to sources that generate them, so that we match + // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. + // Consider the translation from the streaming logical plan to the final executed plan. + // + // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan + // + // 1. We keep track of streaming sources associated with each leaf in trigger's logical plan + // - Each logical plan leaf will be associated with a single streaming source. + // - There can be multiple logical plan leaves associated with a streaming source. + // - There can be leaves not associated with any streaming source, because they were + // generated from a batch source (e.g. stream-batch joins) + // + // 2. Assuming that the executed plan has same number of leaves in the same order as that of + // the trigger logical plan, we associate executed plan leaves with corresponding + // streaming sources. + // + // 3. For each source, we sum the metrics of the associated execution plan leaves. + // + val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => + logicalPlan.collectLeaves().map { leaf => leaf -> source } + } + val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming + val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() + if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { + val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { + case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } + } + val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) => + val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) + source -> numRows + } + sumRows(sourceToInputRowsTuples) + } else { + if (!metricWarningLogged) { + def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}" + + logWarning( + "Could not report metrics as number leaves in trigger logical plan did not match that" + + s" of the execution plan:\n" + + s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + + s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") + metricWarningLogged = true + } + Map.empty + } + } + } + + /** + * Record the offsets range this trigger will process. Call this before updating + * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded. + */ + override protected def recordTriggerOffsets( + from: StreamProgress, + to: StreamProgress, + epochId: Long = currentBatchId): Unit = { + currentTriggerStartOffsets = from.mapValues(_.json) + currentTriggerEndOffsets = to.mapValues(_.json) + } + + /** Records the duration of running `body` for the next query progress update. */ + override protected def reportTimeTaken[T]( + triggerDetailKey: String, + epochId: Long = currentBatchId)(body: => T): T = { + addOrUpdateTime(triggerDetailKey, currentDurationsMs)(body) + } + + /** + * Records the duration of running `body` which will not be cleared when start a new trigger. + */ + override protected def recordTimeTaken[T](triggerDetailKey: String)(body: => T): T = { + addOrUpdateTime(triggerDetailKey, recordDurationMs)(body) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 6cb75083d0c0..b3db5205ca44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.streaming import java.text.SimpleDateFormat import java.util.{Date, UUID} -import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.internal.Logging @@ -28,9 +27,9 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.{MicroBatchScanExec, StreamingDataSourceV2Relation, StreamWriterCommitProgress} +import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.sources.v2.Table -import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, SparkDataStream} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream import org.apache.spark.sql.streaming._ import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent import org.apache.spark.util.Clock @@ -65,29 +64,15 @@ trait ProgressReporter extends Logging { protected def sparkSession: SparkSession protected def postEvent(event: StreamingQueryListener.Event): Unit - // Local timestamps and counters. - private var currentTriggerStartTimestamp = -1L - private var currentTriggerEndTimestamp = -1L - private var currentTriggerStartOffsets: Map[SparkDataStream, String] = _ - private var currentTriggerEndOffsets: Map[SparkDataStream, String] = _ - // TODO: Restore this from the checkpoint when possible. - private var lastTriggerStartTimestamp = -1L - - private val currentDurationsMs = new mutable.HashMap[String, Long]() - /** Flag that signals whether any error with input metrics have already been logged */ - private var metricWarningLogged: Boolean = false + protected var metricWarningLogged: Boolean = false /** Holds the most recent query progress updates. Accesses must lock on the queue itself. */ private val progressBuffer = new mutable.Queue[StreamingQueryProgress]() - private val noDataProgressEventInterval = - sparkSession.sessionState.conf.streamingNoDataProgressEventInterval - - // The timestamp we report an event that has no input data - private var lastNoDataProgressEventTime = Long.MinValue + protected val numProgressRetention = sparkSession.sqlContext.conf.streamingProgressRetention - private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 + protected val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 timestampFormat.setTimeZone(getTimeZone("UTC")) @volatile @@ -98,6 +83,14 @@ trait ProgressReporter extends Logging { isTriggerActive = false) } + protected def startTrigger(): Unit + + protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress, epochId: Long): Unit + + protected def reportTimeTaken[T](triggerDetailKey: String, epochId: Long)(body: => T): T + + protected def recordTimeTaken[T](triggerDetailKey: String)(body: => T): T + /** Returns the current status of the query. */ def status: StreamingQueryStatus = currentStatus @@ -111,29 +104,10 @@ trait ProgressReporter extends Logging { progressBuffer.lastOption.orNull } - /** Begins recording statistics about query progress for a given trigger. */ - protected def startTrigger(): Unit = { - logDebug("Starting Trigger Calculation") - lastTriggerStartTimestamp = currentTriggerStartTimestamp - currentTriggerStartTimestamp = triggerClock.getTimeMillis() - currentTriggerStartOffsets = null - currentTriggerEndOffsets = null - currentDurationsMs.clear() - } - - /** - * Record the offsets range this trigger will process. Call this before updating - * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded. - */ - protected def recordTriggerOffsets(from: StreamProgress, to: StreamProgress): Unit = { - currentTriggerStartOffsets = from.mapValues(_.json) - currentTriggerEndOffsets = to.mapValues(_.json) - } - - private def updateProgress(newProgress: StreamingQueryProgress): Unit = { + protected def updateProgress(newProgress: StreamingQueryProgress): Unit = { progressBuffer.synchronized { progressBuffer += newProgress - while (progressBuffer.length >= sparkSession.sqlContext.conf.streamingProgressRetention) { + while (progressBuffer.length >= numProgressRetention) { progressBuffer.dequeue() } } @@ -141,79 +115,33 @@ trait ProgressReporter extends Logging { logInfo(s"Streaming query made progress: $newProgress") } - /** Finalizes the query progress and adds it to list of recent status updates. */ - protected def finishTrigger(hasNewData: Boolean): Unit = { - assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) - currentTriggerEndTimestamp = triggerClock.getTimeMillis() - - val executionStats = extractExecutionStats(hasNewData) - val processingTimeSec = - (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND - - val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { - (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND - } else { - Double.NaN - } - logDebug(s"Execution stats: $executionStats") - - val sourceProgress = sources.distinct.map { source => - val numRecords = executionStats.inputRows.getOrElse(source, 0L) - new SourceProgress( - description = source.toString, - startOffset = currentTriggerStartOffsets.get(source).orNull, - endOffset = currentTriggerEndOffsets.get(source).orNull, - numInputRows = numRecords, - inputRowsPerSecond = numRecords / inputTimeSec, - processedRowsPerSecond = numRecords / processingTimeSec - ) - } - - val sinkProgress = SinkProgress( - sink.toString, - sinkCommitProgress.map(_.numOutputRows)) - - val newProgress = new StreamingQueryProgress( - id = id, - runId = runId, - name = name, - timestamp = formatTimestamp(currentTriggerStartTimestamp), - batchId = currentBatchId, - durationMs = new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).asJava), - eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), - stateOperators = executionStats.stateOperators.toArray, - sources = sourceProgress.toArray, - sink = sinkProgress) - - if (hasNewData) { - // Reset noDataEventTimestamp if we processed any data - lastNoDataProgressEventTime = Long.MinValue - updateProgress(newProgress) - } else { - val now = triggerClock.getTimeMillis() - if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { - lastNoDataProgressEventTime = now - updateProgress(newProgress) - } - } + protected def formatTimestamp(millis: Long): String = { + timestampFormat.format(new Date(millis)) + } - currentStatus = currentStatus.copy(isTriggerActive = false) + /** Updates the message returned in `status`. */ + protected def updateStatusMessage(message: String): Unit = { + currentStatus = currentStatus.copy(message = message) } - /** Extract statistics about stateful operators from the executed query plan. */ - private def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { - if (lastExecution == null) return Nil - // lastExecution could belong to one of the previous triggers if `!hasNewData`. - // Walking the plan again should be inexpensive. - lastExecution.executedPlan.collect { - case p if p.isInstanceOf[StateStoreWriter] => - val progress = p.asInstanceOf[StateStoreWriter].getProgress() - if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0) - } + protected def addOrUpdateTime[T]( + triggerDetailKey: String, + durationMs: mutable.HashMap[String, Long])(body: => T): T = { + val startTime = triggerClock.getTimeMillis() + val result = body + val endTime = triggerClock.getTimeMillis() + val timeTaken = math.max(endTime - startTime, 0) + + val previousTime = durationMs.getOrElse(triggerDetailKey, 0L) + durationMs.put(triggerDetailKey, previousTime + timeTaken) + logDebug(s"$triggerDetailKey took $timeTaken ms") + result } /** Extracts statistics from the most recent query execution. */ - private def extractExecutionStats(hasNewData: Boolean): ExecutionStats = { + protected def extractExecutionStats( + hasNewData: Boolean, + extraInfos: Option[Any] = None): ExecutionStats = { val hasEventTime = logicalPlan.collect { case e: EventTimeWatermark => e }.nonEmpty val watermarkTimestamp = if (hasEventTime) Map("watermark" -> formatTimestamp(offsetSeqMetadata.batchWatermarkMs)) @@ -226,7 +154,7 @@ trait ProgressReporter extends Logging { return ExecutionStats(Map.empty, stateOperators, watermarkTimestamp) } - val numInputRows = extractSourceToNumInputRows() + val numInputRows = extractSourceToNumInputRows(extraInfos) val eventTimeStats = lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec if e.eventTimeStats.value.count > 0 => @@ -240,107 +168,18 @@ trait ProgressReporter extends Logging { ExecutionStats(numInputRows, stateOperators, eventTimeStats) } - /** Extract number of input sources for each streaming source in plan */ - private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = { - - def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = { - tuples.groupBy(_._1).mapValues(_.map(_._2).sum) // sum up rows for each source - } + protected def extractSourceToNumInputRows(extraInfos: Option[Any] = None) + : Map[SparkDataStream, Long] - val onlyDataSourceV2Sources = { - // Check whether the streaming query's logical plan has only V2 micro-batch data sources - val allStreamingLeaves = logicalPlan.collect { - case s: StreamingDataSourceV2Relation => s.stream.isInstanceOf[MicroBatchStream] - case _: StreamingExecutionRelation => false - } - allStreamingLeaves.forall(_ == true) - } - - if (onlyDataSourceV2Sources) { - // It's possible that multiple DataSourceV2ScanExec instances may refer to the same source - // (can happen with self-unions or self-joins). This means the source is scanned multiple - // times in the query, we should count the numRows for each scan. - val sourceToInputRowsTuples = lastExecution.executedPlan.collect { - case s: MicroBatchScanExec => - val numRows = s.metrics.get("numOutputRows").map(_.value).getOrElse(0L) - val source = s.stream - source -> numRows - } - logDebug("Source -> # input rows\n\t" + sourceToInputRowsTuples.mkString("\n\t")) - sumRows(sourceToInputRowsTuples) - } else { - - // Since V1 source do not generate execution plan leaves that directly link with source that - // generated it, we can only do a best-effort association between execution plan leaves to the - // sources. This is known to fail in a few cases, see SPARK-24050. - // - // We want to associate execution plan leaves to sources that generate them, so that we match - // the their metrics (e.g. numOutputRows) to the sources. To do this we do the following. - // Consider the translation from the streaming logical plan to the final executed plan. - // - // streaming logical plan (with sources) <==> trigger's logical plan <==> executed plan - // - // 1. We keep track of streaming sources associated with each leaf in trigger's logical plan - // - Each logical plan leaf will be associated with a single streaming source. - // - There can be multiple logical plan leaves associated with a streaming source. - // - There can be leaves not associated with any streaming source, because they were - // generated from a batch source (e.g. stream-batch joins) - // - // 2. Assuming that the executed plan has same number of leaves in the same order as that of - // the trigger logical plan, we associate executed plan leaves with corresponding - // streaming sources. - // - // 3. For each source, we sum the metrics of the associated execution plan leaves. - // - val logicalPlanLeafToSource = newData.flatMap { case (source, logicalPlan) => - logicalPlan.collectLeaves().map { leaf => leaf -> source } - } - val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // includes non-streaming - val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() - if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { - val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { - case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } - } - val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) => - val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) - source -> numRows - } - sumRows(sourceToInputRowsTuples) - } else { - if (!metricWarningLogged) { - def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), ${seq.mkString(", ")}" - - logWarning( - "Could not report metrics as number leaves in trigger logical plan did not match that" + - s" of the execution plan:\n" + - s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" + - s"execution plan leaves: ${toString(allExecPlanLeaves)}\n") - metricWarningLogged = true - } - Map.empty - } + /** Extract statistics about stateful operators from the executed query plan. */ + protected def extractStateOperatorMetrics(hasNewData: Boolean): Seq[StateOperatorProgress] = { + if (lastExecution == null) return Nil + // lastExecution could belong to one of the previous triggers if `!hasNewData`. + // Walking the plan again should be inexpensive. + lastExecution.executedPlan.collect { + case p if p.isInstanceOf[StateStoreWriter] => + val progress = p.asInstanceOf[StateStoreWriter].getProgress() + if (hasNewData) progress else progress.copy(newNumRowsUpdated = 0) } } - - /** Records the duration of running `body` for the next query progress update. */ - protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = { - val startTime = triggerClock.getTimeMillis() - val result = body - val endTime = triggerClock.getTimeMillis() - val timeTaken = math.max(endTime - startTime, 0) - - val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L) - currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken) - logDebug(s"$triggerDetailKey took $timeTaken ms") - result - } - - private def formatTimestamp(millis: Long): String = { - timestampFormat.format(new Date(millis)) - } - - /** Updates the message returned in `status`. */ - protected def updateStatusMessage(message: String): Unit = { - currentStatus = currentStatus.copy(message = message) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 82708a331b0e..e13e7b567cac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -25,6 +25,7 @@ import java.util.function.UnaryOperator import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.SparkEnv +import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -50,13 +51,15 @@ class ContinuousExecution( deleteCheckpointOnStop: Boolean) extends StreamExecution( sparkSession, name, checkpointRoot, analyzedPlan, sink, - trigger, triggerClock, outputMode, deleteCheckpointOnStop) { + trigger, triggerClock, outputMode, deleteCheckpointOnStop) with ContinuousProgressReporter { @volatile protected var sources: Seq[ContinuousStream] = Seq() // For use only in test harnesses. private[sql] var currentEpochCoordinatorId: String = _ + protected var epochEndpoint: RpcEndpointRef = _ + // Throwable that caused the execution to fail private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null) @@ -179,7 +182,7 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - reportTimeTaken("queryPlanning") { + recordTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionForQuery, withNewSources, @@ -212,7 +215,7 @@ class ContinuousExecution( trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString) // Use the parent Spark session for the endpoint since it's where this query ID is registered. - val epochEndpoint = EpochCoordinatorRef.create( + epochEndpoint = EpochCoordinatorRef.create( logicalPlan.write, stream, this, @@ -252,7 +255,7 @@ class ContinuousExecution( epochUpdateThread.start() updateStatusMessage("Running") - reportTimeTaken("runContinuous") { + recordTimeTaken("runContinuous") { SQLExecution.withNewExecutionId(sparkSessionForQuery, lastExecution) { lastExecution.executedPlan.execute() } @@ -326,36 +329,39 @@ class ContinuousExecution( * Mark the specified epoch as committed. All readers must have reported end offsets for the epoch * before this is called. */ - def commit(epoch: Long): Unit = { + def commit(epoch: Long, epochStats: EpochStats): Unit = { updateStatusMessage(s"Committing epoch $epoch") assert(sources.length == 1, "only one continuous source supported currently") assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit") - synchronized { - // Record offsets before updating `committedOffsets` - recordTriggerOffsets(from = committedOffsets, to = availableOffsets) - if (queryExecutionThread.isAlive) { - commitLog.add(epoch, CommitMetadata()) - val offset = - sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) - committedOffsets ++= Seq(sources(0) -> offset) - sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset]) - } else { - return + reportTimeTaken("walCommit", epoch) { + synchronized { + // Record offsets before updating `committedOffsets` + recordTriggerOffsets(from = committedOffsets, to = availableOffsets, epoch) + if (queryExecutionThread.isAlive) { + commitLog.add(epoch, CommitMetadata()) + val offset = + sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json) + committedOffsets ++= Seq(sources(0) -> offset) + sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset]) + } else { + return + } } - } - // Since currentBatchId increases independently in cp mode, the current committed epoch may - // be far behind currentBatchId. It is not safe to discard the metadata with thresholdBatchId - // computed based on currentBatchId. As minLogEntriesToMaintain is used to keep the minimum - // number of batches that must be retained and made recoverable, so we should keep the - // specified number of metadata that have been committed. - if (minLogEntriesToMaintain <= epoch) { - offsetLog.purge(epoch + 1 - minLogEntriesToMaintain) - commitLog.purge(epoch + 1 - minLogEntriesToMaintain) + // Since currentBatchId increases independently in cp mode, the current committed epoch may + // be far behind currentBatchId. It is not safe to discard the metadata with thresholdBatchId + // computed based on currentBatchId. As minLogEntriesToMaintain is used to keep the minimum + // number of batches that must be retained and made recoverable, so we should keep the + // specified number of metadata that have been committed. + if (minLogEntriesToMaintain <= epoch) { + offsetLog.purge(epoch + 1 - minLogEntriesToMaintain) + commitLog.purge(epoch + 1 - minLogEntriesToMaintain) + } } + finishTrigger(true, epoch, epochStats) awaitProgressLock.lock() try { awaitProgressLockCondition.signalAll() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousProgressReporter.scala new file mode 100644 index 000000000000..0b37d43dfa8f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousProgressReporter.scala @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.util.DateTimeUtils._ +import org.apache.spark.sql.execution.streaming.{ ProgressReporter, StreamProgress} +import org.apache.spark.sql.sources.v2.reader.streaming.SparkDataStream +import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, StreamingQueryProgress} + +trait ContinuousProgressReporter extends ProgressReporter { + + protected def epochEndpoint: RpcEndpointRef + + private var earliestEpochId: Long = -1 + + private val currentDurationsMs = + new mutable.HashMap[Long, (Long, mutable.HashMap[String, Long])]() + private val recordDurationMs = new mutable.HashMap[String, Long]() + + private val currentTriggerStartOffsets: mutable.HashMap[Long, Map[SparkDataStream, String]] = + new mutable.HashMap[Long, Map[SparkDataStream, String]]() + private val currentTriggerEndOffsets: mutable.HashMap[Long, Map[SparkDataStream, String]] = + new mutable.HashMap[Long, Map[SparkDataStream, String]]() + + // TODO: Restore this from the checkpoint when possible. + private var lastTriggerStartTimestamp = -1L + + // Local timestamps and counters. + private var currentTriggerStartTimestamp = -1L + + private val noDataProgressEventInterval = + sparkSession.sessionState.conf.streamingNoDataProgressEventInterval + + // The timestamp we report an event that has no input data + private var lastNoDataProgressEventTime = Long.MinValue + + /** Begins recording statistics about query progress for a given trigger. */ + override protected def startTrigger(): Unit = { + logDebug("Starting Trigger Calculation") + if (earliestEpochId == -1) { + earliestEpochId = currentBatchId + } + checkQueueBoundaries() + lastTriggerStartTimestamp = currentTriggerStartTimestamp + currentTriggerStartTimestamp = triggerClock.getTimeMillis() + currentDurationsMs.put(currentBatchId, + (currentTriggerStartTimestamp, new mutable.HashMap[String, Long]())) + } + + /** Finalizes the query progress and adds it to list of recent status updates. */ + protected def finishTrigger(hasNewData: Boolean, epochId: Long, epochStats: EpochStats): Unit = { + assert(currentTriggerStartOffsets.get(epochId).isDefined + && currentTriggerEndOffsets.get(epochId).isDefined + && currentDurationsMs.get(epochId).isDefined) + val currentTriggerStartTimestamp = currentDurationsMs(epochId)._1 + val currentTriggerEndTimestamp = triggerClock.getTimeMillis() + + val executionStats = extractExecutionStats(hasNewData, Some(epochStats)) + val processingTimeSec = + (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND + + val inputTimeSec = if (lastTriggerStartTimestamp >= 0) { + (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / MILLIS_PER_SECOND + } else { + Double.NaN + } + logDebug(s"Execution stats: $executionStats") + + val sourceProgress = sources.distinct.map { source => + val numRecords = executionStats.inputRows.getOrElse(source, 0L) + new SourceProgress( + description = source.toString, + startOffset = currentTriggerStartOffsets(epochId).get(source).orNull, + endOffset = currentTriggerEndOffsets(epochId).get(source).orNull, + numInputRows = numRecords, + inputRowsPerSecond = numRecords / inputTimeSec, + processedRowsPerSecond = numRecords / processingTimeSec + ) + } + + val sinkProgress = SinkProgress( + sink.toString, + sinkCommitProgress.map(_.numOutputRows)) + + val newProgress = new StreamingQueryProgress( + id = id, + runId = runId, + name = name, + timestamp = formatTimestamp(currentTriggerStartTimestamp), + batchId = epochId, + durationMs = new java.util.HashMap( + (currentDurationsMs(epochId)._2 ++ recordDurationMs).toMap.mapValues(long2Long).asJava), + eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), + stateOperators = executionStats.stateOperators.toArray, + sources = sourceProgress.toArray, + sink = sinkProgress) + + if (hasNewData) { + // Reset noDataEventTimestamp if we processed any data + lastNoDataProgressEventTime = Long.MinValue + updateProgress(newProgress) + } else { + val now = triggerClock.getTimeMillis() + if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { + lastNoDataProgressEventTime = now + updateProgress(newProgress) + } + } + + currentStatus = currentStatus.copy(isTriggerActive = false) + } + + protected def extractSourceToNumInputRows(t: Option[Any] = None) + : Map[SparkDataStream, Long] = { + require(t.isDefined && t.get.isInstanceOf[EpochStats]) + Map(sources(0) -> t.get.asInstanceOf[EpochStats].inputRows) + } + + /** + * Record the offsets range this trigger will process. Call this before updating + * `committedOffsets` in `StreamExecution` to make sure that the correct range is recorded. + */ + override protected def recordTriggerOffsets( + from: StreamProgress, + to: StreamProgress, + epochId: Long): Unit = { + checkQueueBoundaries() + currentTriggerStartOffsets.put(epochId, from.mapValues(_.json)) + currentTriggerEndOffsets.put(epochId, to.mapValues(_.json)) + } + + /** Records the duration of running `body` for the next query progress update. */ + def reportTimeTaken[T](triggerDetailKey: String, epochId: Long)(body: => T): T = { + checkQueueBoundaries() + val durations = currentDurationsMs.getOrElseUpdate(epochId, + (triggerClock.getTimeMillis(), new mutable.HashMap[String, Long]())) + addOrUpdateTime(triggerDetailKey, durations._2)(body) + } + + /** + * Records the duration of running `body` for once time, which will not be cleared + * when start a new trigger. + */ + protected def recordTimeTaken[T](triggerDetailKey: String)(body: => T): T = { + addOrUpdateTime(triggerDetailKey, recordDurationMs)(body) + } + + private def checkQueueBoundaries(): Unit = { + if (currentDurationsMs.size > numProgressRetention) { + currentDurationsMs.remove(earliestEpochId) + } + + if (currentTriggerStartOffsets.size > numProgressRetention) { + currentTriggerStartOffsets.remove(earliestEpochId) + } + + if (currentTriggerEndOffsets.size > numProgressRetention) { + currentTriggerEndOffsets.remove(earliestEpochId) + } + + earliestEpochId += 1 + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala index 65c5fc63c2f4..537530bf1f46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousQueuedDataReader.scala @@ -70,6 +70,8 @@ class ContinuousQueuedDataReader( dataReaderThread.setDaemon(true) dataReaderThread.start() + private var numRowsInEpoch = 0L + context.addTaskCompletionListener[Unit](_ => { this.close() }) @@ -113,9 +115,11 @@ class ContinuousQueuedDataReader( currentEntry match { case EpochMarker => epochCoordEndpoint.send(ReportPartitionOffset( - partitionIndex, EpochTracker.getCurrentEpoch.get, currentOffset)) + partitionIndex, EpochTracker.getCurrentEpoch.get, currentOffset, numRowsInEpoch)) + numRowsInEpoch = 0L null case ContinuousRow(row, offset) => + numRowsInEpoch += 1L currentOffset = offset row } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala index decf524f7167..cb03bda0fbe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala @@ -51,7 +51,7 @@ private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorM * has acknowledged these messages. */ private[sql] case class SetReaderPartitions(numPartitions: Int) extends EpochCoordinatorMessage -case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage +private[sql] case class SetWriterPartitions(numPartitions: Int) extends EpochCoordinatorMessage // Partition task messages /** @@ -71,8 +71,8 @@ private[sql] case class CommitPartitionEpoch( private[sql] case class ReportPartitionOffset( partitionId: Int, epoch: Long, - offset: PartitionOffset) extends EpochCoordinatorMessage - + offset: PartitionOffset, + numRows: Long) extends EpochCoordinatorMessage /** Helper object used to create reference to [[EpochCoordinator]]. */ private[sql] object EpochCoordinatorRef extends Logging { @@ -139,6 +139,8 @@ private[continuous] class EpochCoordinator( // (epoch, partition) -> offset private val partitionOffsets = mutable.Map[(Long, Int), PartitionOffset]() + private val partitionInputRows = + mutable.Map[(Long, Int), Long]() private var lastCommittedEpoch = startEpoch - 1 // Remembers epochs that have to wait for previous epochs to be committed first. @@ -181,6 +183,7 @@ private[continuous] class EpochCoordinator( } for (k <- partitionOffsets.keys.filter { case (e, _) => e < lastCommittedEpoch }) { partitionOffsets.remove(k) + partitionInputRows.remove(k) } } } @@ -202,7 +205,12 @@ private[continuous] class EpochCoordinator( // Sequencing is important here. We must commit to the writer before recording the commit // in the query, or we will end up dropping the commit if we restart in the middle. writeSupport.commit(epoch, messages.toArray) - query.commit(epoch) + query.commit(epoch, createEpochStats(epoch)) + } + + private def createEpochStats(epoch: Long): EpochStats = { + val inputRows = partitionInputRows.filter(_._1._1 == epoch).values.sum + EpochStats(epoch, inputRows, numReaderPartitions, numWriterPartitions) } override def receive: PartialFunction[Any, Unit] = { @@ -218,8 +226,9 @@ private[continuous] class EpochCoordinator( checkProcessingQueueBoundaries() } - case ReportPartitionOffset(partitionId, epoch, offset) => + case ReportPartitionOffset(partitionId, epoch, offset, numRows) => partitionOffsets.put((epoch, partitionId), offset) + partitionInputRows.put((epoch, partitionId), numRows) val thisEpochOffsets = partitionOffsets.collect { case ((e, _), o) if e == epoch => o } if (thisEpochOffsets.size == numReaderPartitions) { @@ -230,7 +239,7 @@ private[continuous] class EpochCoordinator( checkProcessingQueueBoundaries() } - private def checkProcessingQueueBoundaries() = { + private def checkProcessingQueueBoundaries(): Unit = { if (partitionOffsets.size > epochBacklogQueueSize) { query.stopInNewThread(new IllegalStateException("Size of the partition offset queue has " + "exceeded its maximum")) @@ -268,3 +277,9 @@ private[continuous] class EpochCoordinator( context.reply(()) } } + +private[sql] case class EpochStats( + epoch: Long, + inputRows: Long, + numReaderPartitions: Int, + numWriterPartitions: Int) extends Serializable diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala index e3498db4194e..28203c4128ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/EpochCoordinatorSuite.scala @@ -76,7 +76,7 @@ class EpochCoordinatorSuite // so that mocks would have been acted upon by the time verification happens makeSynchronousCall() - verifyCommit(1) + verifyCommit(1, EpochStats(1, 0, 2, 3)) } test("single epoch, all but one writer partition has committed") { @@ -90,7 +90,7 @@ class EpochCoordinatorSuite makeSynchronousCall() - verifyNoCommitFor(1) + verifyNoCommitFor(1, EpochStats(1, 0, 2, 3)) } test("single epoch, all but one reader partition has reported an offset") { @@ -104,7 +104,7 @@ class EpochCoordinatorSuite makeSynchronousCall() - verifyNoCommitFor(1) + verifyNoCommitFor(1, EpochStats(1, 0, 2, 3)) } test("consequent epochs, messages for epoch (k + 1) arrive after messages for epoch k") { @@ -123,7 +123,7 @@ class EpochCoordinatorSuite makeSynchronousCall() - verifyCommitsInOrderOf(List(1, 2)) + verifyCommitsInOrderOf(List(1, 2), 2, 2) } test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") { @@ -144,7 +144,7 @@ class EpochCoordinatorSuite makeSynchronousCall() - verifyCommitsInOrderOf(List(1, 2)) + verifyCommitsInOrderOf(List(1, 2), 2, 2) } test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") { @@ -165,7 +165,7 @@ class EpochCoordinatorSuite makeSynchronousCall() - verifyCommitsInOrderOf(List(1, 2, 3, 4)) + verifyCommitsInOrderOf(List(1, 2, 3, 4), 1, 1) } test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") { @@ -189,7 +189,7 @@ class EpochCoordinatorSuite makeSynchronousCall() - verifyCommitsInOrderOf(List(1, 2, 3, 4, 5)) + verifyCommitsInOrderOf(List(1, 2, 3, 4, 5), 1, 1) } test("several epochs, max epoch backlog reached by partitionOffsets") { @@ -205,7 +205,7 @@ class EpochCoordinatorSuite makeSynchronousCall() for (i <- 1 to epochBacklogQueueSize + 1) { - verifyNoCommitFor(i) + verifyNoCommitFor(i, EpochStats(i, 0, 1, 1)) } verifyStoppedWithException("Size of the partition offset queue has exceeded its maximum") } @@ -223,7 +223,7 @@ class EpochCoordinatorSuite makeSynchronousCall() for (i <- 1 to epochBacklogQueueSize + 1) { - verifyNoCommitFor(i) + verifyNoCommitFor(i, EpochStats(i, 0, 1, 1)) } verifyStoppedWithException("Size of the partition commit queue has exceeded its maximum") } @@ -247,7 +247,7 @@ class EpochCoordinatorSuite makeSynchronousCall() for (i <- 1 to epochBacklogQueueSize + 2) { - verifyNoCommitFor(i) + verifyNoCommitFor(i, EpochStats(i, 0, 2, 2)) } verifyStoppedWithException("Size of the epoch queue has exceeded its maximum") } @@ -267,25 +267,25 @@ class EpochCoordinatorSuite private def reportPartitionOffset(partitionId: Int, epoch: Long): Unit = { val dummyOffset: PartitionOffset = mock[PartitionOffset] - epochCoordinator.send(ReportPartitionOffset(partitionId, epoch, dummyOffset)) + epochCoordinator.send(ReportPartitionOffset(partitionId, epoch, dummyOffset, 0L)) } private def makeSynchronousCall(): Unit = { epochCoordinator.askSync[Long](GetCurrentEpoch) } - private def verifyCommit(epoch: Long): Unit = { + private def verifyCommit(epoch: Long, epochStats: EpochStats): Unit = { orderVerifier.verify(writeSupport).commit(eqTo(epoch), any()) - orderVerifier.verify(query).commit(epoch) + orderVerifier.verify(query).commit(epoch, epochStats) } - private def verifyNoCommitFor(epoch: Long): Unit = { + private def verifyNoCommitFor(epoch: Long, epochStats: EpochStats): Unit = { verify(writeSupport, never()).commit(eqTo(epoch), any()) - verify(query, never()).commit(epoch) + verify(query, never()).commit(epoch, epochStats) } - private def verifyCommitsInOrderOf(epochs: Seq[Long]): Unit = { - epochs.foreach(verifyCommit) + private def verifyCommitsInOrderOf(epochs: Seq[Long], numReader: Int, numWriter: Int): Unit = { + epochs.foreach(epoch => verifyCommit(epoch, EpochStats(epoch, 0, numReader, numWriter))) } private def verifyStoppedWithException(msg: String): Unit = {