-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31278][SS] Fix StreamingQuery output rows metric #28040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
7def4de
b57aa74
4655611
6e1f90f
62044dc
5fbbf41
d7792fd
d501127
68848d9
1cf1a3c
2c9ed55
3822aed
786d921
9bd4445
68bf147
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,8 +85,8 @@ trait ProgressReporter extends Logging { | |
| private val noDataProgressEventInterval = | ||
| sparkSession.sessionState.conf.streamingNoDataProgressEventInterval | ||
|
|
||
| // The timestamp we report an event that has no input data | ||
| private var lastNoDataProgressEventTime = Long.MinValue | ||
| // The timestamp we report an event | ||
| private var lastProgressEventTime = Long.MinValue | ||
|
|
||
| private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601 | ||
| timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC")) | ||
|
|
@@ -142,8 +142,15 @@ trait ProgressReporter extends Logging { | |
| logInfo(s"Streaming query made progress: $newProgress") | ||
| } | ||
|
|
||
| /** Finalizes the query progress and adds it to list of recent status updates. */ | ||
| protected def finishTrigger(hasNewData: Boolean): Unit = { | ||
| /** | ||
| * Finalizes the query progress and adds it to list of recent status updates. | ||
| * | ||
| * @param hasNewData Whether the sources of this stream had new data for this trigger. | ||
| * @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that | ||
| * perform stateful aggregations with timeouts can still run batches even | ||
| * though the sources don't have any new data. | ||
| */ | ||
| protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = { | ||
| assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null) | ||
| currentTriggerEndTimestamp = triggerClock.getTimeMillis() | ||
|
|
||
|
|
@@ -170,9 +177,8 @@ trait ProgressReporter extends Logging { | |
| ) | ||
| } | ||
|
|
||
| val sinkProgress = SinkProgress( | ||
| sink.toString, | ||
| sinkCommitProgress.map(_.numOutputRows)) | ||
| val sinkOutput = if (hasExecuted) sinkCommitProgress.map(_.numOutputRows) else Some(0L) | ||
brkyvz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| val sinkProgress = SinkProgress(sink.toString, sinkOutput) | ||
| val observedMetrics = extractObservedMetrics(hasNewData, lastExecution) | ||
|
|
||
| val newProgress = new StreamingQueryProgress( | ||
|
|
@@ -189,14 +195,14 @@ trait ProgressReporter extends Logging { | |
| sink = sinkProgress, | ||
| observedMetrics = new java.util.HashMap(observedMetrics.asJava)) | ||
|
|
||
| if (hasNewData) { | ||
| // Reset noDataEventTimestamp if we processed any data | ||
| lastNoDataProgressEventTime = Long.MinValue | ||
| if (hasExecuted) { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was also incorrect for no new data micro batches
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice finding. We haven't recognized the bug because lastNoDataProgressEventTime is set to Long.MinValue which makes next no new data micro batch to update the progress immediately, which hides the bug. (If that's intentional, well, then it's too tricky and we should have commented here.) Maybe we should also rename lastNoDataProgressEventTime as well as the fix changes the semantic? And we may want to revisit that our intention is updating progress immediately whenever the batch has not run after any batch run.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, that's why I'm facing issues... I understand better now |
||
| // Reset lastProgressEventTime if we processed any data | ||
| lastProgressEventTime = triggerClock.getTimeMillis() | ||
| updateProgress(newProgress) | ||
brkyvz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| val now = triggerClock.getTimeMillis() | ||
| if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { | ||
| lastNoDataProgressEventTime = now | ||
| if (now - noDataProgressEventInterval >= lastProgressEventTime) { | ||
| lastProgressEventTime = now | ||
| updateProgress(newProgress) | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -203,46 +203,53 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions { | |
| } | ||
|
|
||
| def stateOperatorProgresses: Seq[StateOperatorProgress] = { | ||
| val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]() | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't find this code readable, therefore removed the hack here |
||
| var progress = query.recentProgress.last | ||
|
|
||
| operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) } | ||
| if (progress.numInputRows == 0) { | ||
| // empty batch, merge metrics from previous batch as well | ||
| progress = query.recentProgress.takeRight(2).head | ||
| operatorProgress.zipWithIndex.foreach { case (sop, index) => | ||
| // "numRowsUpdated" should be merged, as it could be updated in both batches. | ||
| // (for now it is only updated from previous batch, but things can be changed.) | ||
| // other metrics represent current status of state so picking up the latest values. | ||
| val newOperatorProgress = sop.copy( | ||
| sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated) | ||
| operatorProgress(index) = newOperatorProgress | ||
| } | ||
| } | ||
|
|
||
| operatorProgress | ||
| query.recentProgress.last.stateOperators | ||
| } | ||
| } | ||
|
|
||
| val clock = new StreamManualClock() | ||
|
|
||
| testStream(aggWithWatermark)( | ||
| AddData(inputData, 15), | ||
| CheckAnswer(), // watermark = 5 | ||
| StartStream(Trigger.ProcessingTime("interval 1 second"), clock), | ||
| AdvanceManualClock(1000L), // triggers first batch | ||
| CheckAnswer(), // watermark = 0 | ||
| AssertOnQuery { _.stateNodes.size === 1 }, | ||
| AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, | ||
| AddData(inputData, 10, 12, 14), | ||
| AdvanceManualClock(1000L), // watermark = 5, runs no-data microbatch | ||
|
||
| AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 }, | ||
| AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, | ||
| AdvanceManualClock(1000L), // runs with new data from above | ||
| CheckAnswer(), // watermark = 5 | ||
| AssertOnQuery { _.stateNodes.size === 1 }, | ||
| AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }, | ||
| AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, | ||
| AddData(inputData, 25), | ||
| CheckAnswer((10, 3)), // watermark = 15 | ||
| AdvanceManualClock(1000L), // actually runs batch with data | ||
| CheckAnswer(), // watermark = 5, will update to 15 next batch | ||
| AssertOnQuery { _.stateNodes.size === 1 }, | ||
| AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 }, | ||
| AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 }, | ||
| AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, | ||
| AdvanceManualClock(1000L), // runs batch with no new data and watermark progresses | ||
|
||
| CheckAnswer(), // watermark = 15, but nothing yet | ||
|
||
| AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 }, | ||
brkyvz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 } | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 }, | ||
| AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 }, | ||
| AdvanceManualClock(1000L), // trigger no-data microbatch, with new watermark | ||
| CheckAnswer((10, 3)), // watermark = 15 | ||
| AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 }, | ||
| AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }, | ||
| AssertOnQuery { _.lastProgress.sink.numOutputRows == 1 } | ||
| ) | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.