diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0dff1c2fe576..ea1f2ce3943b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -201,7 +201,7 @@ trait ProgressReporter extends Logging { if (hasExecuted) { // Reset noDataEventTimestamp if we processed any data - lastNoExecutionProgressEventTime = Long.MinValue + lastNoExecutionProgressEventTime = triggerClock.getTimeMillis() updateProgress(newProgress) } else { val now = triggerClock.getTimeMillis() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778aef5a7..51ddc7b49fcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -281,7 +281,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { if (flag) assertNumStateRows(total = 1, updated = 1) else assertNumStateRows(total = 7, updated = 1) }, - AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L) + AssertOnQuery { q => + eventually(timeout(streamingTimeout)) { + q.lastProgress.sink.numOutputRows == 0L + true + } + } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index e585b8a885c9..6e08b88f538d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -389,7 +389,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Structured Streaming in Spark 2.0.0. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 1) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") { @@ -397,14 +397,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Structured Streaming in Spark 2.0.1. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 1) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 5) } test("listener propagates observable metrics") { @@ -433,9 +433,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } try { + val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key spark.streams.addListener(listener) testStream(df, OutputMode.Append)( - StartStream(Trigger.ProcessingTime(100), triggerClock = clock), + StartStream( + Trigger.ProcessingTime(100), + triggerClock = clock, + Map(noDataProgressIntervalKey -> "100")), // Batch 1 AddData(inputData, 1, 2), AdvanceManualClock(100), @@ -464,7 +468,49 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def testReplayListenerBusWithBorkenEventJsons( + test("SPARK-31593: remove unnecessary streaming query progress update") { + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") { + @volatile var numProgressEvent = 0 + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = {} + override def onQueryProgress(event: QueryProgressEvent): Unit = { + numProgressEvent += 1 + } + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + spark.streams.addListener(listener) + + def checkProgressEvent(count: Int): StreamAction = { + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == count) + } + true + } + } + + try { + val input = new MemoryStream[Int](0, sqlContext) + val clock = new StreamManualClock() + val result = input.toDF().select("value") + testStream(result)( + StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock), + AddData(input, 10), + checkProgressEvent(1), + AdvanceManualClock(10), + checkProgressEvent(2), + AdvanceManualClock(90), + checkProgressEvent(2), + AdvanceManualClock(10), + checkProgressEvent(3) + ) + } finally { + spark.streams.removeListener(listener) + } + } + } + + private def testReplayListenerBusWithBrokenEventJsons( fileName: String, expectedEventSize: Int): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")