Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ trait ProgressReporter extends Logging {

if (hasExecuted) {
// Reset noDataEventTimestamp if we processed any data
lastNoExecutionProgressEventTime = Long.MinValue
lastNoExecutionProgressEventTime = triggerClock.getTimeMillis()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When reset "lastNoExecutionProgressEventTime =Long.MinValue", it will make "now - noDataProgressEventInterval >= lastNoExecutionProgressEventTime" always true when there is no new data. Then progress reporter will report an empty progress.

updateProgress(newProgress)
} else {
val now = triggerClock.getTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
}

test("test no-data flag") {
val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key
val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key
val noDataBatchEnableKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key

def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") {
def testWithFlag(flag: Boolean): Unit = withClue(s"with $noDataBatchEnableKey = $flag") {
val inputData = MemoryStream[Int]
val result = inputData.toDS()
.withColumn("eventTime", $"value".cast("timestamp"))
Expand All @@ -270,7 +271,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
.select($"eventTime".cast("long").as[Long])

testStream(result, Append)(
StartStream(additionalConfs = Map(flagKey -> flag.toString)),
StartStream(additionalConfs = Map(
noDataBatchEnableKey -> flag.toString,
// set `STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL` a small value to
// report an `empty` progress when no data come.
noDataProgressIntervalKey -> "1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way for fixing the failed UTs seems a little flaky. As your approach deletes empty progress generally, so here you set STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL to a small enough value to generate an empty one as expected, I think we can do better in either way:

  • Add comments to explain how small STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL needs to be set here is enough, seems less than 10 seconds is OK?
  • Delete the empty progress checking with explanation in comments.

Copy link
Contributor

@HeartSaVioR HeartSaVioR May 31, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's either use the manual clock or fix the UT like the way it doesn't depend on STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.

For latter like we can set noDataProgressIntervalKey to 1000000 and remove the last assertion, which isn't actually testing the behavior of deduplication. Even better, you can still keep the last assertion for only when flag == true, which has a meaning of verification that state cleanup in empty input batch doesn't produce new outputs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPARK-31928 is filed to address the flakiness of this test on the master branch - that said, if we can make sure this test become no longer flaky, that should be nice.

),
AddData(inputData, 10, 11, 12, 13, 14, 15),
CheckAnswer(10, 11, 12, 13, 14, 15),
assertNumStateRows(total = 6, updated = 6),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,22 +389,22 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
// in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 5)
}

test("listener propagates observable metrics") {
Expand Down Expand Up @@ -433,9 +433,15 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}

try {
val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key
spark.streams.addListener(listener)
testStream(df, OutputMode.Append)(
StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
StartStream(
Trigger.ProcessingTime(100),
triggerClock = clock,
// set `STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL` a small value to
// report an `empty` progress when no data come.
Map(noDataProgressIntervalKey -> "1")),
// Batch 1
AddData(inputData, 1, 2),
AdvanceManualClock(100),
Expand Down Expand Up @@ -464,7 +470,59 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

private def testReplayListenerBusWithBorkenEventJsons(
test("SPARK-31593: remove unnecessary streaming query progress update") {
withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") {
@volatile var numProgressEvent = 0
val listener = new StreamingQueryListener {
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
numProgressEvent += 1
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
spark.streams.addListener(listener)
try {
val input = new MemoryStream[Int](0, sqlContext)
val clock = new StreamManualClock()
val result = input.toDF().select("value")
testStream(result)(
StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
AddData(input, 10),
AssertOnQuery { _ =>
eventually(Timeout(streamingTimeout)) {
assert(numProgressEvent == 1)
}
true
},
AdvanceManualClock(10),
AssertOnQuery { _ =>
eventually(Timeout(streamingTimeout)) {
assert(numProgressEvent == 2)
}
true
},
AdvanceManualClock(90),
AssertOnQuery { _ =>
eventually(Timeout(streamingTimeout)) {
assert(numProgressEvent == 2)
}
true
},
AdvanceManualClock(10),
AssertOnQuery { _ =>
eventually(Timeout(streamingTimeout)) {
assert(numProgressEvent == 3)
}
true
}
)
} finally {
spark.streams.removeListener(listener)
}
}
}

private def testReplayListenerBusWithBrokenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
Expand Down