Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class KafkaSinkMicroBatchStreamingSuite extends KafkaSinkStreamingSuiteBase {
try {
input.addData("1", "2", "3")
verifyResult(writer) {
assert(writer.lastProgress.sink.numOutputRows == 3L)
assert(writer.recentProgress.exists(_.sink.numOutputRows == 3L))
}
} finally {
writer.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ class MicroBatchExecution(
}
}

finishTrigger(currentBatchHasNewData) // Must be outside reportTimeTaken so it is recorded
// Must be outside reportTimeTaken so it is recorded
finishTrigger(currentBatchHasNewData, isCurrentBatchConstructed)

// Signal waiting threads. Note this must be after finishTrigger() to ensure all
// activities (progress generation, etc.) have completed before signaling.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ trait ProgressReporter extends Logging {
private val noDataProgressEventInterval =
sparkSession.sessionState.conf.streamingNoDataProgressEventInterval

// The timestamp we report an event that has no input data
private var lastNoDataProgressEventTime = Long.MinValue
// The timestamp we report an event that has not executed anything
private var lastNoExecutionProgressEventTime = Long.MinValue

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
Expand Down Expand Up @@ -142,8 +142,15 @@ trait ProgressReporter extends Logging {
logInfo(s"Streaming query made progress: $newProgress")
}

/** Finalizes the query progress and adds it to list of recent status updates. */
protected def finishTrigger(hasNewData: Boolean): Unit = {
/**
* Finalizes the query progress and adds it to list of recent status updates.
*
* @param hasNewData Whether the sources of this stream had new data for this trigger.
* @param hasExecuted Whether any batch was executed during this trigger. Streaming queries that
* perform stateful aggregations with timeouts can still run batches even
* though the sources don't have any new data.
*/
protected def finishTrigger(hasNewData: Boolean, hasExecuted: Boolean): Unit = {
assert(currentTriggerStartOffsets != null && currentTriggerEndOffsets != null)
currentTriggerEndTimestamp = triggerClock.getTimeMillis()

Expand All @@ -170,9 +177,12 @@ trait ProgressReporter extends Logging {
)
}

val sinkProgress = SinkProgress(
sink.toString,
sinkCommitProgress.map(_.numOutputRows))
val sinkOutput = if (hasExecuted) {
sinkCommitProgress.map(_.numOutputRows)
} else {
sinkCommitProgress.map(_ => 0L)
}
val sinkProgress = SinkProgress(sink.toString, sinkOutput)
val observedMetrics = extractObservedMetrics(hasNewData, lastExecution)

val newProgress = new StreamingQueryProgress(
Expand All @@ -189,14 +199,14 @@ trait ProgressReporter extends Logging {
sink = sinkProgress,
observedMetrics = new java.util.HashMap(observedMetrics.asJava))

if (hasNewData) {
if (hasExecuted) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was also incorrect for no new data micro batches

Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice finding. We haven't recognized the bug because lastNoDataProgressEventTime is set to Long.MinValue which makes next no new data micro batch to update the progress immediately, which hides the bug. (If that's intentional, well, then it's too tricky and we should have commented here.)

Maybe we should also rename lastNoDataProgressEventTime as well as the fix changes the semantic?

And we may want to revisit that our intention is updating progress immediately whenever the batch has not run after any batch run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, that's why I'm facing issues... I understand better now

// Reset noDataEventTimestamp if we processed any data
lastNoDataProgressEventTime = Long.MinValue
lastNoExecutionProgressEventTime = Long.MinValue
updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) {
lastNoDataProgressEventTime = now
if (now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime) {
lastNoExecutionProgressEventTime = now
updateProgress(newProgress)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,47 +202,55 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with Assertions {
}
}

def stateOperatorProgresses: Seq[StateOperatorProgress] = {
val operatorProgress = mutable.ArrayBuffer[StateOperatorProgress]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find this code readable, therefore removed the hack here

var progress = query.recentProgress.last

operatorProgress ++= progress.stateOperators.map { op => op.copy(op.numRowsUpdated) }
if (progress.numInputRows == 0) {
// empty batch, merge metrics from previous batch as well
progress = query.recentProgress.takeRight(2).head
operatorProgress.zipWithIndex.foreach { case (sop, index) =>
// "numRowsUpdated" should be merged, as it could be updated in both batches.
// (for now it is only updated from previous batch, but things can be changed.)
// other metrics represent current status of state so picking up the latest values.
val newOperatorProgress = sop.copy(
sop.numRowsUpdated + progress.stateOperators(index).numRowsUpdated)
operatorProgress(index) = newOperatorProgress
}
}
// Pick the latest progress that actually ran a batch
def lastExecutedBatch: StreamingQueryProgress = {
query.recentProgress.filter(_.durationMs.containsKey("addBatch")).last
}

operatorProgress
def stateOperatorProgresses: Seq[StateOperatorProgress] = {
lastExecutedBatch.stateOperators
}
}

val clock = new StreamManualClock()

testStream(aggWithWatermark)(
StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas This test tests append mode output for no-data microbatches

AddData(inputData, 15),
CheckAnswer(), // watermark = 5
AdvanceManualClock(1000L), // triggers first batch
CheckAnswer(), // watermark = 0
AssertOnQuery { _.stateNodes.size === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
AddData(inputData, 10, 12, 14),
AdvanceManualClock(1000L), // watermark = 0, runs with the just added data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove watermark here in comment as you've done with further AdvanceManualClock

CheckAnswer(), // watermark = 5
AssertOnQuery { _.stateNodes.size === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
AddData(inputData, 25),
CheckAnswer((10, 3)), // watermark = 15
AdvanceManualClock(1000L), // actually runs batch with data
CheckAnswer(), // watermark = 5, will update to 15 next batch
AssertOnQuery { _.stateNodes.size === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
AdvanceManualClock(1000L), // runs batch with no new data and watermark progresses
Copy link
Contributor

@HeartSaVioR HeartSaVioR Mar 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also something hard to understand (requires two trigger intervals instead of one - ideally zero - to run no-data microbatch) but yes this is OFF-TOPIC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not off-topic actually because (i) not understanding this correctly can lead to flaky tests, and (ii) I was afraid that fixes made in this PR actually changed the semantic behavior of no data batches. But that is not the case. I tested in this unit test myself. I think all the confusion is starting from the fact that you dont need to advance manual clock after StartStream to trigger the first batch. So the first AdvanceManualClock not really necessary. Rather what it is doing is advancing the clock thus allowing the 2nd batch to be automatically triggered as soon as the first batch finishes. This is what is leading to the confusion on why is the second batch not picking up the new data ... that is because the next batch has been unblocked already (i.e., before AddData(10, 12, 14)) with the first AdvancedManualClock. This weird asynchronousness despite using the manual clock makes the test incomprehensible and is also a perfect recipe for flakiness.

Copy link
Contributor

@tdas tdas Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brkyvz here is my proposed test. @HeartSaVioR please take a look and see whether this is more understandable.

    testStream(aggWithWatermark)(
      // batchId 0
      AddData(inputData, 15),
      StartStream(Trigger.ProcessingTime("interval 1 second"), clock),
      CheckAnswer(), // watermark = 0
      AssertOnQuery { _.stateNodes.size === 1 },
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 1 without data
      AdvanceManualClock(1000L), // watermark = 5
      Execute { q =>             // wait for the no data batch to complete
        eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 1) }
      },
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 1 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 2 with data
      AddData(inputData, 10, 12, 14),
      AdvanceManualClock(1000L), // watermark = 5
      CheckAnswer(),
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 3 with data
      AddData(inputData, 25),
      AdvanceManualClock(1000L), // watermark = 5
      CheckAnswer(),
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },

      // batchId 4 without data
      AdvanceManualClock(1000L), // watermark = 15
      Execute { q =>             // wait for the no data batch to complete
        eventually(timeout(streamingTimeout)) { assert(q.lastProgress.batchId === 4) }
      },
      AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 0 },
      AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
      AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 }
    )
  }

Copy link
Contributor

@HeartSaVioR HeartSaVioR Apr 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas

I think all the confusion is starting from the fact that you dont need to advance manual clock after StartStream to trigger the first batch.
Rather what it is doing is advancing the clock thus allowing the 2nd batch to be automatically triggered as soon as the first batch finishes.
This weird asynchronousness despite using the manual clock makes the test incomprehensible and is also a perfect recipe for flakiness.

Ah, nice finding. Great analysis. That's what I've missed (and very confusing behavior TBH). The proposal looks great and provides better understanding. I have comments for new proposal but since the proposal is reflected in PR, I'll comment directly to the PR.

CheckAnswer(), // watermark = 15, but nothing yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this will flaky. CheckAnswer() works reliably only when there is new data to process because it waits for the new data's offset to be reported as processed. Here there is no new data in the no-data-batch, so its possible that this CheckAnswer wont wait for the no-data-batch to finish before starting the last progress checks.

Instead its more reliable (probably) to use eventually, where you check that the lastprogress has the expected batchId.

AssertOnQuery { _.lastProgress.sink.numOutputRows == 0 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsUpdated === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 }
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 3 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 0 },
AdvanceManualClock(1000L), // trigger no-data microbatch, with new watermark
CheckAnswer((10, 3)), // watermark = 15
AssertOnQuery { _.stateNodes.head.metrics("numOutputRows").value === 1 },
AssertOnQuery { _.stateOperatorProgresses.head.numRowsTotal === 2 },
AssertOnQuery { _.lastExecutedBatch.sink.numOutputRows == 1 }
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
{ // State should have been cleaned if flag is set, otherwise should not have been cleaned
if (flag) assertNumStateRows(total = 1, updated = 1)
else assertNumStateRows(total = 7, updated = 1)
}
},
AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
assert(nextProgress.numInputRows === 0)
assert(nextProgress.stateOperators.head.numRowsTotal === 2)
assert(nextProgress.stateOperators.head.numRowsUpdated === 0)
assert(nextProgress.sink.numOutputRows === 0)
}
} finally {
query.stop()
Expand Down