From 90ed69285fc34ae43a0f454ceb25837618212e28 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 30 Apr 2016 09:32:48 +0800 Subject: [PATCH 1/3] fix an issue of nextBatchTime against ManualClock --- .../execution/streaming/TriggerExecutor.scala | 18 +++++++++++-- .../ProcessingTimeExecutorSuite.scala | 26 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index a1132d510685..414d663d26ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -65,8 +65,22 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") } - /** Return the next multiple of intervalMs */ + /** Return the next multiple of intervalMs + * + * e.g. for intervalMs = 100 + * nextBatchTime(0) = 100 + * nextBatchTime(1) = 100 + * ... + * nextBatchTime(99) = 100 + * nextBatchTime(100) = 200 + * nextBatchTime(101) = 200 + * ... + * nextBatchTime(199) = 200 + * nextBatchTime(200) = 300 + * + * Note, this way, we'll get nextBatchTime(nextBatchTime(0)) = 200, rather than = 0 + * */ def nextBatchTime(now: Long): Long = { - (now - 1) / intervalMs * intervalMs + intervalMs + now / intervalMs * intervalMs + intervalMs } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index dd5f92248bf5..2dfd6fbce7e2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -21,19 +21,41 @@ import java.util.concurrent.{CountDownLatch, TimeUnit} import org.apache.spark.SparkFunSuite import org.apache.spark.sql.ProcessingTime -import org.apache.spark.util.ManualClock +import org.apache.spark.util.{Clock, ManualClock, SystemClock} class ProcessingTimeExecutorSuite extends SparkFunSuite { test("nextBatchTime") { val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100)) + assert(processingTimeExecutor.nextBatchTime(0) === 100) assert(processingTimeExecutor.nextBatchTime(1) === 100) assert(processingTimeExecutor.nextBatchTime(99) === 100) - assert(processingTimeExecutor.nextBatchTime(100) === 100) + assert(processingTimeExecutor.nextBatchTime(100) === 200) assert(processingTimeExecutor.nextBatchTime(101) === 200) assert(processingTimeExecutor.nextBatchTime(150) === 200) } + private def testNextBatchTimeAgainstClock(clock: Clock) { + val IntervalMS = 100 + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(IntervalMS), clock) + + val ITERATION = 10 + var nextBatchTime: Long = 0 + for (it <- 1 to ITERATION) + nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime) + + // nextBatchTime should be 1000 + assert(nextBatchTime === IntervalMS * ITERATION) + } + + test("nextBatchTime against SystemClock") { + testNextBatchTimeAgainstClock(new SystemClock) + } + + test("nextBatchTime against ManualClock") { + testNextBatchTimeAgainstClock(new ManualClock) + } + private def testBatchTermination(intervalMs: Long): Unit = { var batchCounts = 0 val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) From 9d80b15e33151f5f87207d72f89f016c18a21b01 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 30 Apr 2016 09:33:48 +0800 Subject: [PATCH 2/3] Add support for testing against ProcessingTime(intervalMS>0) --- .../spark/sql/ContinuousQueryManager.scala | 7 ++- .../execution/streaming/StreamExecution.scala | 9 +-- .../org/apache/spark/sql/StreamTest.scala | 58 ++++++++++++++----- .../sql/streaming/ContinuousQuerySuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 8 +-- .../spark/sql/streaming/StreamSuite.scala | 24 ++++++-- .../streaming/StreamingAggregationSuite.scala | 6 +- .../util/ContinuousQueryListenerSuite.scala | 6 +- 8 files changed, 83 insertions(+), 37 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala index 9e2e2d0bc5bc..1b51c0ba51cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.util.ContinuousQueryListener +import org.apache.spark.util.{Clock, SystemClock} /** * :: Experimental :: @@ -175,6 +176,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) { df: DataFrame, sink: Sink, trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock(), outputMode: OutputMode = Append): ContinuousQuery = { activeQueriesLock.synchronized { if (activeQueries.contains(name)) { @@ -206,8 +208,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) { checkpointLocation, logicalPlan, sink, - outputMode, - trigger) + trigger, + triggerClock, + outputMode) query.start() activeQueries.put(name, query) query diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index fc18e5f065a0..46eaf354e022 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.ContinuousQueryListener import org.apache.spark.sql.util.ContinuousQueryListener._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{Clock, UninterruptibleThread} /** * Manages the execution of a streaming Spark SQL query that is occurring in a separate thread. @@ -49,8 +49,9 @@ class StreamExecution( checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, val sink: Sink, - val outputMode: OutputMode, - val trigger: Trigger) + val trigger: Trigger, + private[sql] val triggerClock: Clock, + val outputMode: OutputMode) extends ContinuousQuery with Logging { /** An monitor used to wait/notify when batches complete. */ @@ -83,7 +84,7 @@ class StreamExecution( private val uniqueSources = sources.distinct private val triggerExecutor = trigger match { - case t: ProcessingTime => ProcessingTimeExecutor(t) + case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) } /** Defines the internal state of execution */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index dff6acc94b3f..669c8b736000 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, ManualClock, Utils} /** * A framework for implementing tests for streaming queries and sources. @@ -142,7 +142,10 @@ trait StreamTest extends QueryTest with Timeouts { case object StopStream extends StreamAction with StreamMustBeRunning /** Starts the stream, resuming if data has already been processed. It must not be running. */ - case object StartStream extends StreamAction + case class StartStream(trigger: Trigger = null, triggerClock: Clock = null) extends StreamAction + + /** Advance the trigger clock's time manually. */ + case class AdvanceManualClock(timeToAdd: Long) extends StreamAction /** Signals that a failure is expected and should not kill the test. */ case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { @@ -199,8 +202,8 @@ trait StreamTest extends QueryTest with Timeouts { // If the test doesn't manually start the stream, we do it automatically at the beginning. val startedManually = - actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream) - val startedTest = if (startedManually) actions else StartStream +: actions + actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream]) + val startedTest = if (startedManually) actions else StartStream() +: actions def testActions = actions.zipWithIndex.map { case (a, i) => @@ -280,19 +283,35 @@ trait StreamTest extends QueryTest with Timeouts { try { startedTest.foreach { action => action match { - case StartStream => + case StartStream(_trigger, _triggerClock) => verify(currentStream == null, "stream already running") lastStream = currentStream currentStream = - sqlContext - .streams - .startQuery( - StreamExecution.nextName, - metadataRoot, - stream, - sink, - outputMode = outputMode) - .asInstanceOf[StreamExecution] + if (_trigger != null) { + // we pass in explicit trigger and triggerClock + sqlContext + .streams + .startQuery( + StreamExecution.nextName, + metadataRoot, + stream, + sink, + trigger = _trigger, + triggerClock = _triggerClock, + outputMode = outputMode) + .asInstanceOf[StreamExecution] + } else { + // we left out trigger and triggerClock as their default values + sqlContext + .streams + .startQuery( + StreamExecution.nextName, + metadataRoot, + stream, + sink, + outputMode = outputMode) + .asInstanceOf[StreamExecution] + } currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable): Unit = { @@ -301,6 +320,13 @@ trait StreamTest extends QueryTest with Timeouts { } }) + case AdvanceManualClock(timeToAdd) => + verify(currentStream != null, + "can not advance manual clock when a stream is not running") + verify(currentStream.triggerClock.isInstanceOf[ManualClock], + s"can not advance clock of type ${currentStream.triggerClock.getClass}") + currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + case StopStream => verify(currentStream != null, "can not stop a stream that is not running") try failAfter(streamingTimeout) { @@ -470,7 +496,7 @@ trait StreamTest extends QueryTest with Timeouts { addRandomData() case _ => // StartStream - actions += StartStream + actions += StartStream() running = true } } else { @@ -488,7 +514,7 @@ trait StreamTest extends QueryTest with Timeouts { } } } - if(!running) { actions += StartStream } + if(!running) { actions += StartStream() } addCheck() testStream(ds)(actions: _*) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index 3be0ea481dc5..f469cde6bef8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -45,7 +45,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext { TestAwaitTermination(ExpectNotBlocked), TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true), TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true), - StartStream, + StartStream(), AssertOnQuery(_.isActive === true), AddData(inputData, 0), ExpectFailure[SparkException], diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 6b1ecd08c13c..bc5c0c1f6933 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -268,7 +268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3"), StopStream, AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") @@ -292,7 +292,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { "{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}", src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddTextFileData( "{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}", @@ -385,7 +385,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3"), StopStream, AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") @@ -449,7 +449,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext { CheckAnswer("keep2", "keep3"), StopStream, AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), - StartStream, + StartStream(), CheckAnswer("keep2", "keep3", "keep5", "keep6"), AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 6f3149dbc503..bcd3cba55a55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql.streaming import org.apache.spark.sql._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.functions._ import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.apache.spark.util.ManualClock class StreamSuite extends StreamTest with SharedSQLContext { @@ -34,11 +34,11 @@ class StreamSuite extends StreamTest with SharedSQLContext { testStream(mapped)( AddData(inputData, 1, 2, 3), - StartStream, + StartStream(), CheckAnswer(2, 3, 4), StopStream, AddData(inputData, 4, 5, 6), - StartStream, + StartStream(), CheckAnswer(2, 3, 4, 5, 6, 7)) } @@ -70,7 +70,7 @@ class StreamSuite extends StreamTest with SharedSQLContext { CheckAnswer(1, 2, 3, 4, 5, 6), StopStream, AddData(inputData1, 7), - StartStream, + StartStream(), AddData(inputData2, 8), CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8)) } @@ -136,6 +136,22 @@ class StreamSuite extends StreamTest with SharedSQLContext { testStream(ds)() } } + + // This would fail for now -- error is "Timed out waiting for stream" + // Root cause is that data generated in batch 0 may not get processed in batch 1 + // Let's enable this after SPARK-14942: Reduce delay between batch construction and execution + ignore("minimize delay between batch construction and execution") { + val inputData = MemoryStream[Int] + testStream(inputData.toDS())( + StartStream(ProcessingTime("10 seconds"), new ManualClock), + /* -- batch 0 ----------------------- */ + AddData(inputData, 1), + AddData(inputData, 2), + AddData(inputData, 3), + AdvanceManualClock(10 * 1000), // 10 seconds + /* -- batch 1 ----------------------- */ + CheckAnswer(1, 2, 3)) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index fa3b122f6d2d..bdf40f5cd45d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -50,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext { AddData(inputData, 3, 2), CheckLastBatch((3, 2), (2, 1)), StopStream, - StartStream, + StartStream(), AddData(inputData, 3, 2, 1), CheckLastBatch((3, 3), (2, 2), (1, 1)), // By default we run in new tuple mode. @@ -113,10 +113,10 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext { .as[(Int, Long)] testStream(aggregated)( - StartStream, + StartStream(), AddData(inputData, 1, 2, 3, 4), ExpectFailure[SparkException](), - StartStream, + StartStream(), CheckLastBatch((1, 1), (2, 1), (3, 1), (4, 1)) ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala index 3498fe83d02e..60964bdab3c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala @@ -50,7 +50,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with val input = MemoryStream[Int] withListenerAdded(listener) { testStream(input.toDS)( - StartStream, + StartStream(), Assert("Incorrect query status in onQueryStarted") { val status = listener.startStatus assert(status != null) @@ -104,7 +104,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with def isListenerActive(listener: QueryStatusCollector): Boolean = { listener.reset() testStream(MemoryStream[Int].toDS)( - StartStream, + StartStream(), StopStream ) listener.startStatus != null @@ -135,7 +135,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with listener.reset() require(listener.startStatus === null) testStream(MemoryStream[Int].toDS)( - StartStream, + StartStream(), Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), StopStream, Assert { listener.checkAsyncErrors() } From bc899628fa67acfbe6bb3d8e2c0ba2aefc2422a6 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Wed, 4 May 2016 07:23:24 +0800 Subject: [PATCH 3/3] address comments --- .../execution/streaming/TriggerExecutor.scala | 21 +++----- .../org/apache/spark/sql/StreamTest.scala | 49 +++++++------------ .../ProcessingTimeExecutorSuite.scala | 19 +++---- 3 files changed, 31 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 414d663d26ed..569907b369a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -65,21 +65,12 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds") } - /** Return the next multiple of intervalMs - * - * e.g. for intervalMs = 100 - * nextBatchTime(0) = 100 - * nextBatchTime(1) = 100 - * ... - * nextBatchTime(99) = 100 - * nextBatchTime(100) = 200 - * nextBatchTime(101) = 200 - * ... - * nextBatchTime(199) = 200 - * nextBatchTime(200) = 300 - * - * Note, this way, we'll get nextBatchTime(nextBatchTime(0)) = 200, rather than = 0 - * */ + /** + * Returns the start time in milliseconds for the next batch interval, given the current time. + * Note that a batch interval is inclusive with respect to its start time, and thus calling + * `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given + * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`). + */ def nextBatchTime(now: Long): Long = { now / intervalMs * intervalMs + intervalMs } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 669c8b736000..6fb1aca769e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} /** * A framework for implementing tests for streaming queries and sources. @@ -138,11 +138,14 @@ trait StreamTest extends QueryTest with Timeouts { private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer" } - /** Stops the stream. It must currently be running. */ + /** Stops the stream. It must currently be running. */ case object StopStream extends StreamAction with StreamMustBeRunning - /** Starts the stream, resuming if data has already been processed. It must not be running. */ - case class StartStream(trigger: Trigger = null, triggerClock: Clock = null) extends StreamAction + /** Starts the stream, resuming if data has already been processed. It must not be running. */ + case class StartStream( + trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock) + extends StreamAction /** Advance the trigger clock's time manually. */ case class AdvanceManualClock(timeToAdd: Long) extends StreamAction @@ -283,35 +286,21 @@ trait StreamTest extends QueryTest with Timeouts { try { startedTest.foreach { action => action match { - case StartStream(_trigger, _triggerClock) => + case StartStream(trigger, triggerClock) => verify(currentStream == null, "stream already running") lastStream = currentStream currentStream = - if (_trigger != null) { - // we pass in explicit trigger and triggerClock - sqlContext - .streams - .startQuery( - StreamExecution.nextName, - metadataRoot, - stream, - sink, - trigger = _trigger, - triggerClock = _triggerClock, - outputMode = outputMode) - .asInstanceOf[StreamExecution] - } else { - // we left out trigger and triggerClock as their default values - sqlContext - .streams - .startQuery( - StreamExecution.nextName, - metadataRoot, - stream, - sink, - outputMode = outputMode) - .asInstanceOf[StreamExecution] - } + sqlContext + .streams + .startQuery( + StreamExecution.nextName, + metadataRoot, + stream, + sink, + trigger, + triggerClock, + outputMode = outputMode) + .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { override def uncaughtException(t: Thread, e: Throwable): Unit = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 2dfd6fbce7e2..7f99d303ba08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -35,25 +35,18 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite { assert(processingTimeExecutor.nextBatchTime(150) === 200) } - private def testNextBatchTimeAgainstClock(clock: Clock) { - val IntervalMS = 100 - val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(IntervalMS), clock) + test("calling nextBatchTime with the result of a previous call should return the next interval") { + val intervalMS = 100 + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS)) val ITERATION = 10 var nextBatchTime: Long = 0 - for (it <- 1 to ITERATION) + for (it <- 1 to ITERATION) { nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime) + } // nextBatchTime should be 1000 - assert(nextBatchTime === IntervalMS * ITERATION) - } - - test("nextBatchTime against SystemClock") { - testNextBatchTimeAgainstClock(new SystemClock) - } - - test("nextBatchTime against ManualClock") { - testNextBatchTimeAgainstClock(new ManualClock) + assert(nextBatchTime === intervalMS * ITERATION) } private def testBatchTermination(intervalMs: Long): Unit = {