From 4ace2bdeaf1586d5495a3cd60d5423aa6e376b59 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 13:29:50 -0800 Subject: [PATCH 1/4] Limit the number of StreamingQueryListener.StreamProgressEvent when there is no data --- .../execution/streaming/StreamExecution.scala | 15 ++++++- .../apache/spark/sql/internal/SQLConf.scala | 9 ++++ .../StreamingQueryListenerSuite.scala | 43 +++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 6d0e269d341e..087c417582a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -63,6 +63,8 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay + private val noDataReportInterval = sparkSession.sessionState.conf.streamingNoDataReportInterval + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ @@ -196,6 +198,9 @@ class StreamExecution( // While active, repeatedly attempt to run batches. SparkSession.setActiveSession(sparkSession) + // The timestamp we report an event that has no input data + var noDataReportTimestamp = Long.MinValue + triggerExecutor.execute(() => { startTrigger() @@ -218,7 +223,15 @@ class StreamExecution( // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) - postEvent(new QueryProgressEvent(lastProgress)) + if (dataAvailable) { + postEvent(new QueryProgressEvent(lastProgress)) + } else { + val now = triggerClock.getTimeMillis() + if (now - noDataReportInterval >= noDataReportTimestamp) { + noDataReportTimestamp = now + postEvent(new QueryProgressEvent(lastProgress)) + } + } if (dataAvailable) { // We'll increase currentBatchId after we complete processing current batch's data diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 200f0603e1ae..6b232a3baf10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -603,6 +603,13 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) + val STREAMING_NO_DATA_REPORT_INTERVAL = + SQLConfigBuilder("spark.sql.streaming.noDataReportInterval") + .internal() + .doc("How long to wait between two progress events when there is no data") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(10000L) + val STREAMING_METRICS_ENABLED = SQLConfigBuilder("spark.sql.streaming.metricsEnabled") .doc("Whether Dropwizard/Codahale metrics will be reported for active streaming queries.") @@ -684,6 +691,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) + def streamingNoDataReportInterval: Long = getConf(STREAMING_NO_DATA_REPORT_INTERVAL) + def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED) def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 07a13a48a18c..3cc9cd5f098a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -31,6 +31,7 @@ import org.scalatest.PrivateMethodTester._ import org.apache.spark.SparkException import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.util.JsonProtocol @@ -191,6 +192,48 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(queryQueryTerminated.exception === newQueryTerminated.exception) } + test("noDataReportInterval") { + withSQLConf(SQLConf.STREAMING_NO_DATA_REPORT_INTERVAL.key -> "100ms") { + @volatile var progressEventCount = 0 + + val listener = new StreamingQueryListener { + override def onQueryStarted(event: QueryStartedEvent): Unit = {} + + override def onQueryProgress(event: QueryProgressEvent): Unit = { + progressEventCount += 1 + } + + override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} + } + spark.streams.addListener(listener) + try { + val clock = new StreamManualClock() + val actions = mutable.ArrayBuffer[StreamAction]() + actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock) + actions += AssertOnQuery { _ => + // It should report at least one progress + eventually(timeout(streamingTimeout)) { + assert(progressEventCount > 0) + } + true + } + for (_ <- 1 to 100) { + actions += AdvanceManualClock(10) + actions += AssertOnQuery { _ => + // Sleep so that if the config `noDataReportInterval` doesn't work, it has enough time + // to report too many events. + Thread.sleep(10) + true + } + } + testStream(MemoryStream[Int].toDS)(actions: _*) + assert(progressEventCount <= 11) + } finally { + spark.streams.removeListener(listener) + } + } + } + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { // query-event-logs-version-2.0.0.txt has all types of events generated by // Structured Streaming in Spark 2.0.0. From 3425231fc30457018e342db1430b5c1e4b75e001 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 14:38:20 -0800 Subject: [PATCH 2/4] Address TD's comments --- .../sql/execution/streaming/StreamExecution.scala | 10 ++++++---- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../sql/streaming/StreamingQueryListenerSuite.scala | 8 +++++--- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 087c417582a8..953e2f557034 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -63,7 +63,7 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay - private val noDataReportInterval = sparkSession.sessionState.conf.streamingNoDataReportInterval + private val noDataEventInterval = sparkSession.sessionState.conf.streamingNoDataEventInterval /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. @@ -199,7 +199,7 @@ class StreamExecution( SparkSession.setActiveSession(sparkSession) // The timestamp we report an event that has no input data - var noDataReportTimestamp = Long.MinValue + var noDataEventTimestamp = Long.MinValue triggerExecutor.execute(() => { startTrigger() @@ -224,11 +224,13 @@ class StreamExecution( // Report trigger as finished and construct progress object. finishTrigger(dataAvailable) if (dataAvailable) { + // Reset noDataEventTimestamp if we processed any data + noDataEventTimestamp = Long.MinValue postEvent(new QueryProgressEvent(lastProgress)) } else { val now = triggerClock.getTimeMillis() - if (now - noDataReportInterval >= noDataReportTimestamp) { - noDataReportTimestamp = now + if (now - noDataEventInterval >= noDataEventTimestamp) { + noDataEventTimestamp = now postEvent(new QueryProgressEvent(lastProgress)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6b232a3baf10..56eb05bbbb35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -603,8 +603,8 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) - val STREAMING_NO_DATA_REPORT_INTERVAL = - SQLConfigBuilder("spark.sql.streaming.noDataReportInterval") + val STREAMING_NO_DATA_EVENT_INTERVAL = + SQLConfigBuilder("spark.sql.streaming.noDataEventInterval") .internal() .doc("How long to wait between two progress events when there is no data") .timeConf(TimeUnit.MILLISECONDS) @@ -691,7 +691,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) - def streamingNoDataReportInterval: Long = getConf(STREAMING_NO_DATA_REPORT_INTERVAL) + def streamingNoDataEventInterval: Long = getConf(STREAMING_NO_DATA_EVENT_INTERVAL) def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 3cc9cd5f098a..21b885c3297f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -192,8 +192,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(queryQueryTerminated.exception === newQueryTerminated.exception) } - test("noDataReportInterval") { - withSQLConf(SQLConf.STREAMING_NO_DATA_REPORT_INTERVAL.key -> "100ms") { + test("only one progress event per interval when no data") { + // This test will start a query but not push any data, and then check if we push too many events + withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") { @volatile var progressEventCount = 0 val listener = new StreamingQueryListener { @@ -220,13 +221,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { for (_ <- 1 to 100) { actions += AdvanceManualClock(10) actions += AssertOnQuery { _ => - // Sleep so that if the config `noDataReportInterval` doesn't work, it has enough time + // Sleep so that if the config `noDataEventInterval` doesn't work, it has enough time // to report too many events. Thread.sleep(10) true } } testStream(MemoryStream[Int].toDS)(actions: _*) + // 11 is the max value of the possible numbers of events. assert(progressEventCount <= 11) } finally { spark.streams.removeListener(listener) From f7b87553316c7ef151ff94bc891cd18ddea55b20 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 17:49:23 -0800 Subject: [PATCH 3/4] Address --- .../StreamingQueryListenerSuite.scala | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 21b885c3297f..6be3a2ed45a0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -47,6 +47,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(spark.streams.active.isEmpty) assert(addedListeners.isEmpty) // Make sure we don't leak any events to the next test + spark.sparkContext.listenerBus.waitUntilEmpty(10000) } testQuietly("single listener, check trigger events are generated correctly") { @@ -195,41 +196,39 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("only one progress event per interval when no data") { // This test will start a query but not push any data, and then check if we push too many events withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") { - @volatile var progressEventCount = 0 - + @volatile var numProgressEvent = 0 val listener = new StreamingQueryListener { override def onQueryStarted(event: QueryStartedEvent): Unit = {} - override def onQueryProgress(event: QueryProgressEvent): Unit = { - progressEventCount += 1 + numProgressEvent += 1 } - override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {} } spark.streams.addListener(listener) try { + val input = new MemoryStream[Int](0, sqlContext) { + @volatile var numTriggers = 0 + override def getOffset: Option[Offset] = { + numTriggers += 1 + super.getOffset + } + } val clock = new StreamManualClock() val actions = mutable.ArrayBuffer[StreamAction]() actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock) + for (_ <- 1 to 100) { + actions += AdvanceManualClock(10) + } actions += AssertOnQuery { _ => - // It should report at least one progress eventually(timeout(streamingTimeout)) { - assert(progressEventCount > 0) + assert(input.numTriggers > 100) // at least 100 triggers have occurred } true } - for (_ <- 1 to 100) { - actions += AdvanceManualClock(10) - actions += AssertOnQuery { _ => - // Sleep so that if the config `noDataEventInterval` doesn't work, it has enough time - // to report too many events. - Thread.sleep(10) - true - } - } - testStream(MemoryStream[Int].toDS)(actions: _*) + testStream(input.toDS)(actions: _*) + spark.sparkContext.listenerBus.waitUntilEmpty(10000) // 11 is the max value of the possible numbers of events. - assert(progressEventCount <= 11) + assert(numProgressEvent >= 1 && numProgressEvent <= 11) } finally { spark.streams.removeListener(listener) } From be3737f3a49325d20401402e65288b5c39be3bed Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 1 Dec 2016 21:11:57 -0800 Subject: [PATCH 4/4] Address more --- .../sql/execution/streaming/StreamExecution.scala | 11 ++++++----- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 7 ++++--- .../sql/streaming/StreamingQueryListenerSuite.scala | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 953e2f557034..8804c647a75c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -63,7 +63,8 @@ class StreamExecution( private val pollingDelayMs = sparkSession.sessionState.conf.streamingPollingDelay - private val noDataEventInterval = sparkSession.sessionState.conf.streamingNoDataEventInterval + private val noDataProgressEventInterval = + sparkSession.sessionState.conf.streamingNoDataProgressEventInterval /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. @@ -199,7 +200,7 @@ class StreamExecution( SparkSession.setActiveSession(sparkSession) // The timestamp we report an event that has no input data - var noDataEventTimestamp = Long.MinValue + var lastNoDataProgressEventTime = Long.MinValue triggerExecutor.execute(() => { startTrigger() @@ -225,12 +226,12 @@ class StreamExecution( finishTrigger(dataAvailable) if (dataAvailable) { // Reset noDataEventTimestamp if we processed any data - noDataEventTimestamp = Long.MinValue + lastNoDataProgressEventTime = Long.MinValue postEvent(new QueryProgressEvent(lastProgress)) } else { val now = triggerClock.getTimeMillis() - if (now - noDataEventInterval >= noDataEventTimestamp) { - noDataEventTimestamp = now + if (now - noDataProgressEventInterval >= lastNoDataProgressEventTime) { + lastNoDataProgressEventTime = now postEvent(new QueryProgressEvent(lastProgress)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 56eb05bbbb35..5b45df69e679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -603,8 +603,8 @@ object SQLConf { .timeConf(TimeUnit.MILLISECONDS) .createWithDefault(10L) - val STREAMING_NO_DATA_EVENT_INTERVAL = - SQLConfigBuilder("spark.sql.streaming.noDataEventInterval") + val STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL = + SQLConfigBuilder("spark.sql.streaming.noDataProgressEventInterval") .internal() .doc("How long to wait between two progress events when there is no data") .timeConf(TimeUnit.MILLISECONDS) @@ -691,7 +691,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def streamingPollingDelay: Long = getConf(STREAMING_POLLING_DELAY) - def streamingNoDataEventInterval: Long = getConf(STREAMING_NO_DATA_EVENT_INTERVAL) + def streamingNoDataProgressEventInterval: Long = + getConf(STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL) def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 6be3a2ed45a0..3086abf03cd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -195,7 +195,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("only one progress event per interval when no data") { // This test will start a query but not push any data, and then check if we push too many events - withSQLConf(SQLConf.STREAMING_NO_DATA_EVENT_INTERVAL.key -> "100ms") { + withSQLConf(SQLConf.STREAMING_NO_DATA_PROGRESS_EVENT_INTERVAL.key -> "100ms") { @volatile var numProgressEvent = 0 val listener = new StreamingQueryListener { override def onQueryStarted(event: QueryStartedEvent): Unit = {} @@ -228,7 +228,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testStream(input.toDS)(actions: _*) spark.sparkContext.listenerBus.waitUntilEmpty(10000) // 11 is the max value of the possible numbers of events. - assert(numProgressEvent >= 1 && numProgressEvent <= 11) + assert(numProgressEvent > 1 && numProgressEvent <= 11) } finally { spark.streams.removeListener(listener) }