From 42011805002ed6d6cc418b9f3b4cacba670ac11f Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 28 Apr 2020 18:00:10 +0800 Subject: [PATCH 1/4] Remove unnecessary streaming query progress update --- .../apache/spark/sql/execution/streaming/ProgressReporter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 0dff1c2fe576..ea1f2ce3943b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -201,7 +201,7 @@ trait ProgressReporter extends Logging { if (hasExecuted) { // Reset noDataEventTimestamp if we processed any data - lastNoExecutionProgressEventTime = Long.MinValue + lastNoExecutionProgressEventTime = triggerClock.getTimeMillis() updateProgress(newProgress) } else { val now = triggerClock.getTimeMillis() From 4a55b25b76066e4fac009479fa59a67305f81440 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Sat, 9 May 2020 12:32:42 +0800 Subject: [PATCH 2/4] update ut --- .../StreamingDeduplicationSuite.scala | 12 +++- .../StreamingQueryListenerSuite.scala | 69 +++++++++++++++++-- 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index f63778aef5a7..06695f3427f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -259,9 +259,10 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { } test("test no-data flag") { - val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key + val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key + val noDataBatchEnableKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key - def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") { + def testWithFlag(flag: Boolean): Unit = withClue(s"with $noDataBatchEnableKey = $flag") { val inputData = MemoryStream[Int] val result = inputData.toDS() .withColumn("eventTime", $"value".cast("timestamp")) @@ -270,7 +271,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .select($"eventTime".cast("long").as[Long]) testStream(result, Append)( - StartStream(additionalConfs = Map(flagKey -> flag.toString)), + StartStream(additionalConfs = Map( + noDataBatchEnableKey -> flag.toString, + // set `STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL` a small value to + // report an `empty` progress when no data come. + noDataProgressIntervalKey -> "1") + ), AddData(inputData, 10, 11, 12, 13, 14, 15), CheckAnswer(10, 11, 12, 13, 14, 15), assertNumStateRows(total = 6, updated = 6), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index e585b8a885c9..574fed78b253 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -389,7 +389,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Structured Streaming in Spark 2.0.0. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.0.txt", 1) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") { @@ -397,14 +397,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Structured Streaming in Spark 2.0.1. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.1.txt", 1) } testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) + testReplayListenerBusWithBrokenEventJsons("query-event-logs-version-2.0.2.txt", 5) } test("listener propagates observable metrics") { @@ -433,9 +433,15 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } try { + val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key spark.streams.addListener(listener) testStream(df, OutputMode.Append)( - StartStream(Trigger.ProcessingTime(100), triggerClock = clock), + StartStream( + Trigger.ProcessingTime(100), + triggerClock = clock, + // set `STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL` a small value to + // report an `empty` progress when no data come. + Map(noDataProgressIntervalKey -> "1")), // Batch 1 AddData(inputData, 1, 2), AdvanceManualClock(100), @@ -464,7 +470,60 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def testReplayListenerBusWithBorkenEventJsons( + test("SPARK-31593: remove unnecessary streaming query progress update") { + // This test will start a query but not push any data, and then check if we push too many events + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") { + @volatile var numProgressEvent = 0 + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = {} + override def onQueryProgress(event: QueryProgressEvent): Unit = { + numProgressEvent += 1 + } + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + spark.streams.addListener(listener) + try { + val input = new MemoryStream[Int](0, sqlContext) + val clock = new StreamManualClock() + val result = input.toDF().select("value") + testStream(result)( + StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock), + AddData(input, 10), + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == 1) + } + true + }, + AdvanceManualClock(10), + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == 2) + } + true + }, + AdvanceManualClock(90), + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == 2) + } + true + }, + AdvanceManualClock(10), + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == 3) + } + true + } + ) + } finally { + spark.streams.removeListener(listener) + } + } + } + + private def testReplayListenerBusWithBrokenEventJsons( fileName: String, expectedEventSize: Int): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") From c3db6cfaaa2260e7f11436e84fc2576ad4f8dde1 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Sat, 9 May 2020 13:55:40 +0800 Subject: [PATCH 3/4] fix codestyle --- .../spark/sql/streaming/StreamingQueryListenerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 574fed78b253..c011d170d8f9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -471,7 +471,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("SPARK-31593: remove unnecessary streaming query progress update") { - // This test will start a query but not push any data, and then check if we push too many events withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100") { @volatile var numProgressEvent = 0 val listener = new StreamingQueryListener { @@ -522,7 +521,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } } - + private def testReplayListenerBusWithBrokenEventJsons( fileName: String, expectedEventSize: Int): Unit = { From 847934117aebe3aa295b5875ce27822cec38ac04 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Wed, 3 Jun 2020 15:54:47 +0800 Subject: [PATCH 4/4] fix comments --- .../StreamingDeduplicationSuite.scala | 19 ++++----- .../StreamingQueryListenerSuite.scala | 42 +++++++------------ 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala index 06695f3427f5..51ddc7b49fcd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingDeduplicationSuite.scala @@ -259,10 +259,9 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { } test("test no-data flag") { - val noDataProgressIntervalKey = SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key - val noDataBatchEnableKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key + val flagKey = SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key - def testWithFlag(flag: Boolean): Unit = withClue(s"with $noDataBatchEnableKey = $flag") { + def testWithFlag(flag: Boolean): Unit = withClue(s"with $flagKey = $flag") { val inputData = MemoryStream[Int] val result = inputData.toDS() .withColumn("eventTime", $"value".cast("timestamp")) @@ -271,12 +270,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { .select($"eventTime".cast("long").as[Long]) testStream(result, Append)( - StartStream(additionalConfs = Map( - noDataBatchEnableKey -> flag.toString, - // set `STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL` a small value to - // report an `empty` progress when no data come. - noDataProgressIntervalKey -> "1") - ), + StartStream(additionalConfs = Map(flagKey -> flag.toString)), AddData(inputData, 10, 11, 12, 13, 14, 15), CheckAnswer(10, 11, 12, 13, 14, 15), assertNumStateRows(total = 6, updated = 6), @@ -287,7 +281,12 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest { if (flag) assertNumStateRows(total = 1, updated = 1) else assertNumStateRows(total = 7, updated = 1) }, - AssertOnQuery(q => q.lastProgress.sink.numOutputRows == 0L) + AssertOnQuery { q => + eventually(timeout(streamingTimeout)) { + q.lastProgress.sink.numOutputRows == 0L + true + } + } ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index c011d170d8f9..6e08b88f538d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -439,9 +439,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { StartStream( Trigger.ProcessingTime(100), triggerClock = clock, - // set `STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL` a small value to - // report an `empty` progress when no data come. - Map(noDataProgressIntervalKey -> "1")), + Map(noDataProgressIntervalKey -> "100")), // Batch 1 AddData(inputData, 1, 2), AdvanceManualClock(100), @@ -481,6 +479,16 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} } spark.streams.addListener(listener) + + def checkProgressEvent(count: Int): StreamAction = { + AssertOnQuery { _ => + eventually(Timeout(streamingTimeout)) { + assert(numProgressEvent == count) + } + true + } + } + try { val input = new MemoryStream[Int](0, sqlContext) val clock = new StreamManualClock() @@ -488,33 +496,13 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testStream(result)( StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock), AddData(input, 10), - AssertOnQuery { _ => - eventually(Timeout(streamingTimeout)) { - assert(numProgressEvent == 1) - } - true - }, + checkProgressEvent(1), AdvanceManualClock(10), - AssertOnQuery { _ => - eventually(Timeout(streamingTimeout)) { - assert(numProgressEvent == 2) - } - true - }, + checkProgressEvent(2), AdvanceManualClock(90), - AssertOnQuery { _ => - eventually(Timeout(streamingTimeout)) { - assert(numProgressEvent == 2) - } - true - }, + checkProgressEvent(2), AdvanceManualClock(10), - AssertOnQuery { _ => - eventually(Timeout(streamingTimeout)) { - assert(numProgressEvent == 3) - } - true - } + checkProgressEvent(3) ) } finally { spark.streams.removeListener(listener)