Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase {
try {
input.addData("1", "2", "3")
verifyResult(writer) {
assert(writer.lastProgress.sink.numOutputRows == 3L)
assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L))
}
} finally {
writer.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ class MicroBatchExecution(
}
}

finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded
// Must be outside reportTimeTaken so it is recorded
finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed)

// Signal waiting threads. Note this must be after finishTrigger() to ensure all
// activities (progress generation, etc.) have completed before signaling.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ trait ProgressReporter extends Logging {
private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

// The timestamp we report an event that has no input data
private var lastNoDataProgressEventTime = Long.MinValue
// The timestamp we report an event that has not executed anything
private var lastNoExecutionProgressEventTime = Long.MinValue

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
Expand Down Expand Up @@ -142,8 +142,15 @@ trait ProgressReporter extends Logging {
logInfo(s"Streaming query made progress: $newProgress")
}

/** Finalizes the query progress and adds it to list of recent status updates. */
protected def finishTrigger(hasNewData: Boolean): Unit = {
/**
* Finalizes the query progress and adds it to list of recent status updates.
*
* @param hasNewData Whether the sources of this stream had new data for this trigger.
* @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
* perform stateful aggregations with timeouts can still run batches even
* though the sources don't have any new data.
*/
protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

Expand All @@ -170,9 +177,12 @@ trait ProgressReporter extends Logging {
)
}

val sinkProgress = SinkProgress(
sink.toString,
sinkCommitProgress.map(_.numOutputRows))
val sinkOutput = if (hasExecuted) {
sinkCommitProgress.map(_.numOutputRows)
} else {
sinkCommitProgress.map(_ => 0L)
}
val sinkProgress = SinkProgress(sink.toString, sinkOutput)
val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)

val newProgress = new StreamingQueryProgress(
Expand All @@ -189,14 +199,14 @@ trait ProgressReporter extends Logging {
sink = sinkProgress,
observedMetrics = new java.util.HashMap(observedMetrics.asJava))

if (hasNewData) {
if (hasExecuted) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also incorrect for no new data micro batches

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. We haven't recognized the bug because lastNoDataProgressEventTime is set to Long.MinValue which makes next no new data micro batch to update the progress immediately, which hides the bug. (If that's intentional, well, then it's too tricky and we should have commented here.)

Maybe we should also rename lastNoDataProgressEventTime as well as the fix changes the semantic?

And we may want to revisit that our intention is updating progress immediately whenever the batch has not run after any batch run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, that's why I'm facing issues... I understand better now

// Reset noDataEventTimestamp if we processed any data
lastNoDataProgressEventTime = Long.MinValue
lastNoExecutionProgressEventTime = Long.MinValue
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
lastNoExecutionProgressEventTime = now
updateProgress(newProgress)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,47 +202,68 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
}
}

def stateOperatorProgresses: Seq[StateOperatorProgress] = {
val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find this code readable, therefore removed the hack here

var progress = query.recentProgress.last

operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) }
if (progress.numInputRows == 0) {
// empty batch, merge metrics from previous batch as well
progress = query.recentProgress.takeRight(2).head
operatorProgress.zipWithIndex.foreach { case (sop, index) =>
// "numRowsUpdated" should be merged, as it could be updated in both batches.
// (for now it is only updated from previous batch, but things can be changed.)
// other metrics represent current status of state so picking up the latest values.
val newOperatorProgress = sop.copy(
sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated)
operatorProgress(index) = newOperatorProgress
}
}
// Pick the latest progress that actually ran a batch
def lastExecutedBatch: StreamingQueryProgress = {
query.recentProgress.filter(_.durationMs.containsKey("addBatch")).last
}

operatorProgress
def stateOperatorProgresses: Seq[StateOperatorProgress] = {
lastExecutedBatch.stateOperators
}
}

val clock = new StreamManualClock()

testStream(aggWithWatermark)(
// batchId 0
AddData(inputData, 15),
CheckAnswer(), // watermark = 5
StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
CheckAnswer(), // watermark = 0
AssertOnQuery { _.stateNodes.size === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

// batchId 1 without data
AdvanceManualClock(1000L), // watermark = 5
Execute { q => // wait for the no data batch to complete
Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Good to have) It might be good to have a new operation to consolidate waiting for "no data batch" and checking the answer (as they have same pattern except the desired batch ID).

Not mandatory to do it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it'd be nice to provide an inbuilt function for it if this pattern is used more over time

eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 1) }
},
CheckAnswer(),
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

// batchId 2 with data
AddData(inputData, 10, 12, 14),
CheckAnswer(), // watermark = 5
AssertOnQuery { _.stateNodes.size === 1 },
AdvanceManualClock(1000L), // watermark = 5
CheckAnswer(),
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

// batchId 3 with data
AddData(inputData, 25),
CheckAnswer((10, 3)), // watermark = 15
AssertOnQuery { _.stateNodes.size === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
AdvanceManualClock(1000L), // watermark = 5
CheckAnswer(),
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

// batchId 4 without data
AdvanceManualClock(1000L), // watermark = 15
Execute { q => // wait for the no data batch to complete
eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 4) }
},
CheckAnswer((10, 3)),
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
{ // State should have been cleaned if flag is set, otherwise should not have been cleaned
if (flag) assertNumStateRows(total = 1, updated = 1)
else assertNumStateRows(total = 7, updated = 1)
}
},
AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
assert(nextProgress.numInputRows === 0)
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
assert(nextProgress.sink.numOutputRows === 0)
}
} finally {
query.stop()
Expand Down