Skip to content

Commit e597ec6

Browse files
lw-linzsxwing
authored andcommitted
[SPARK-15022][SPARK-15023][SQL][STREAMING] Add support for testing against the ProcessingTime(intervalMS > 0) trigger and ManualClock
## What changes were proposed in this pull request? Currently in `StreamTest`, we have a `StartStream` which will start a streaming query against trigger `ProcessTime(intervalMS = 0)` and `SystemClock`. We also need to test cases against `ProcessTime(intervalMS > 0)`, which often requires `ManualClock`. This patch: - fixes an issue of `ProcessingTimeExecutor`, where for a batch it should run `batchRunner` only once but might run multiple times under certain conditions; - adds support for testing against the `ProcessingTime(intervalMS > 0)` trigger and `AdvanceManualClock`, by specifying them as fields for `StartStream`, and by adding an `AdvanceClock` action; - adds a test, which takes advantage of the new `StartStream` and `AdvanceManualClock`, to test against [PR#[SPARK-14942] Reduce delay between batch construction and execution ](#12725). ## How was this patch tested? N/A Author: Liwei Lin <[email protected]> Closes #12797 from lw-lin/add-trigger-test-support.
1 parent a456477 commit e597ec6

File tree

10 files changed

+89
-34
lines changed

10 files changed

+89
-34
lines changed

sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._
2525
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.util.ContinuousQueryListener
28+
import org.apache.spark.util.{Clock, SystemClock}
2829

2930
/**
3031
* :: Experimental ::
@@ -175,6 +176,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
175176
df: DataFrame,
176177
sink: Sink,
177178
trigger: Trigger = ProcessingTime(0),
179+
triggerClock: Clock = new SystemClock(),
178180
outputMode: OutputMode = Append): ContinuousQuery = {
179181
activeQueriesLock.synchronized {
180182
if (activeQueries.contains(name)) {
@@ -206,8 +208,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
206208
checkpointLocation,
207209
logicalPlan,
208210
sink,
209-
outputMode,
210-
trigger)
211+
trigger,
212+
triggerClock,
213+
outputMode)
211214
query.start()
212215
activeQueries.put(name, query)
213216
query

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._
3636
import org.apache.spark.sql.execution.QueryExecution
3737
import org.apache.spark.sql.util.ContinuousQueryListener
3838
import org.apache.spark.sql.util.ContinuousQueryListener._
39-
import org.apache.spark.util.{UninterruptibleThread, Utils}
39+
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
4040

4141
/**
4242
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
@@ -50,8 +50,9 @@ class StreamExecution(
5050
checkpointRoot: String,
5151
private[sql] val logicalPlan: LogicalPlan,
5252
val sink: Sink,
53-
val outputMode: OutputMode,
54-
val trigger: Trigger)
53+
val trigger: Trigger,
54+
private[sql] val triggerClock: Clock,
55+
val outputMode: OutputMode)
5556
extends ContinuousQuery with Logging {
5657

5758
/**
@@ -88,7 +89,7 @@ class StreamExecution(
8889
private val uniqueSources = sources.distinct
8990

9091
private val triggerExecutor = trigger match {
91-
case t: ProcessingTime => ProcessingTimeExecutor(t)
92+
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
9293
}
9394

9495
/** Defines the internal state of execution */

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,13 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
6565
s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds")
6666
}
6767

68-
/** Return the next multiple of intervalMs */
68+
/**
69+
* Returns the start time in milliseconds for the next batch interval, given the current time.
70+
* Note that a batch interval is inclusive with respect to its start time, and thus calling
71+
* `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given
72+
* an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`).
73+
*/
6974
def nextBatchTime(now: Long): Long = {
70-
(now - 1) / intervalMs * intervalMs + intervalMs
75+
now / intervalMs * intervalMs + intervalMs
7176
}
7277
}

sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
3838
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
3939
import org.apache.spark.sql.catalyst.util._
4040
import org.apache.spark.sql.execution.streaming._
41-
import org.apache.spark.util.Utils
41+
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
4242

4343
/**
4444
* A framework for implementing tests for streaming queries and sources.
@@ -138,11 +138,17 @@ trait StreamTest extends QueryTest with Timeouts {
138138
private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
139139
}
140140

141-
/** Stops the stream. It must currently be running. */
141+
/** Stops the stream. It must currently be running. */
142142
case object StopStream extends StreamAction with StreamMustBeRunning
143143

144-
/** Starts the stream, resuming if data has already been processed. It must not be running. */
145-
case object StartStream extends StreamAction
144+
/** Starts the stream, resuming if data has already been processed. It must not be running. */
145+
case class StartStream(
146+
trigger: Trigger = ProcessingTime(0),
147+
triggerClock: Clock = new SystemClock)
148+
extends StreamAction
149+
150+
/** Advance the trigger clock's time manually. */
151+
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction
146152

147153
/** Signals that a failure is expected and should not kill the test. */
148154
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
@@ -199,8 +205,8 @@ trait StreamTest extends QueryTest with Timeouts {
199205

200206
// If the test doesn't manually start the stream, we do it automatically at the beginning.
201207
val startedManually =
202-
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
203-
val startedTest = if (startedManually) actions else StartStream +: actions
208+
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream])
209+
val startedTest = if (startedManually) actions else StartStream() +: actions
204210

205211
def testActions = actions.zipWithIndex.map {
206212
case (a, i) =>
@@ -280,7 +286,7 @@ trait StreamTest extends QueryTest with Timeouts {
280286
try {
281287
startedTest.foreach { action =>
282288
action match {
283-
case StartStream =>
289+
case StartStream(trigger, triggerClock) =>
284290
verify(currentStream == null, "stream already running")
285291
lastStream = currentStream
286292
currentStream =
@@ -291,6 +297,8 @@ trait StreamTest extends QueryTest with Timeouts {
291297
metadataRoot,
292298
stream,
293299
sink,
300+
trigger,
301+
triggerClock,
294302
outputMode = outputMode)
295303
.asInstanceOf[StreamExecution]
296304
currentStream.microBatchThread.setUncaughtExceptionHandler(
@@ -301,6 +309,13 @@ trait StreamTest extends QueryTest with Timeouts {
301309
}
302310
})
303311

312+
case AdvanceManualClock(timeToAdd) =>
313+
verify(currentStream != null,
314+
"can not advance manual clock when a stream is not running")
315+
verify(currentStream.triggerClock.isInstanceOf[ManualClock],
316+
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
317+
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
318+
304319
case StopStream =>
305320
verify(currentStream != null, "can not stop a stream that is not running")
306321
try failAfter(streamingTimeout) {
@@ -470,7 +485,7 @@ trait StreamTest extends QueryTest with Timeouts {
470485
addRandomData()
471486

472487
case _ => // StartStream
473-
actions += StartStream
488+
actions += StartStream()
474489
running = true
475490
}
476491
} else {
@@ -488,7 +503,7 @@ trait StreamTest extends QueryTest with Timeouts {
488503
}
489504
}
490505
}
491-
if(!running) { actions += StartStream }
506+
if(!running) { actions += StartStream() }
492507
addCheck()
493508
testStream(ds)(actions: _*)
494509
}

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,34 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
2121

2222
import org.apache.spark.SparkFunSuite
2323
import org.apache.spark.sql.ProcessingTime
24-
import org.apache.spark.util.ManualClock
24+
import org.apache.spark.util.{Clock, ManualClock, SystemClock}
2525

2626
class ProcessingTimeExecutorSuite extends SparkFunSuite {
2727

2828
test("nextBatchTime") {
2929
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
30+
assert(processingTimeExecutor.nextBatchTime(0) === 100)
3031
assert(processingTimeExecutor.nextBatchTime(1) === 100)
3132
assert(processingTimeExecutor.nextBatchTime(99) === 100)
32-
assert(processingTimeExecutor.nextBatchTime(100) === 100)
33+
assert(processingTimeExecutor.nextBatchTime(100) === 200)
3334
assert(processingTimeExecutor.nextBatchTime(101) === 200)
3435
assert(processingTimeExecutor.nextBatchTime(150) === 200)
3536
}
3637

38+
test("calling nextBatchTime with the result of a previous call should return the next interval") {
39+
val intervalMS = 100
40+
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS))
41+
42+
val ITERATION = 10
43+
var nextBatchTime: Long = 0
44+
for (it <- 1 to ITERATION) {
45+
nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime)
46+
}
47+
48+
// nextBatchTime should be 1000
49+
assert(nextBatchTime === intervalMS * ITERATION)
50+
}
51+
3752
private def testBatchTermination(intervalMs: Long): Unit = {
3853
var batchCounts = 0
3954
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))

sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
4545
TestAwaitTermination(ExpectNotBlocked),
4646
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true),
4747
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true),
48-
StartStream,
48+
StartStream(),
4949
AssertOnQuery(_.isActive === true),
5050
AddData(inputData, 0),
5151
ExpectFailure[SparkException],

sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
268268
CheckAnswer("keep2", "keep3"),
269269
StopStream,
270270
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
271-
StartStream,
271+
StartStream(),
272272
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
273273
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
274274
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
@@ -292,7 +292,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
292292
"{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
293293
src,
294294
tmp),
295-
StartStream,
295+
StartStream(),
296296
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
297297
AddTextFileData(
298298
"{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
@@ -385,7 +385,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
385385
CheckAnswer("keep2", "keep3"),
386386
StopStream,
387387
AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
388-
StartStream,
388+
StartStream(),
389389
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
390390
AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
391391
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
@@ -449,7 +449,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
449449
CheckAnswer("keep2", "keep3"),
450450
StopStream,
451451
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
452-
StartStream,
452+
StartStream(),
453453
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
454454
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
455455
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ package org.apache.spark.sql.streaming
1919

2020
import org.apache.spark.sql._
2121
import org.apache.spark.sql.execution.streaming._
22-
import org.apache.spark.sql.functions._
2322
import org.apache.spark.sql.sources.StreamSourceProvider
2423
import org.apache.spark.sql.test.SharedSQLContext
2524
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
25+
import org.apache.spark.util.ManualClock
2626

2727
class StreamSuite extends StreamTest with SharedSQLContext {
2828

@@ -34,11 +34,11 @@ class StreamSuite extends StreamTest with SharedSQLContext {
3434

3535
testStream(mapped)(
3636
AddData(inputData, 1, 2, 3),
37-
StartStream,
37+
StartStream(),
3838
CheckAnswer(2, 3, 4),
3939
StopStream,
4040
AddData(inputData, 4, 5, 6),
41-
StartStream,
41+
StartStream(),
4242
CheckAnswer(2, 3, 4, 5, 6, 7))
4343
}
4444

@@ -70,7 +70,7 @@ class StreamSuite extends StreamTest with SharedSQLContext {
7070
CheckAnswer(1, 2, 3, 4, 5, 6),
7171
StopStream,
7272
AddData(inputData1, 7),
73-
StartStream,
73+
StartStream(),
7474
AddData(inputData2, 8),
7575
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8))
7676
}
@@ -136,6 +136,22 @@ class StreamSuite extends StreamTest with SharedSQLContext {
136136
testStream(ds)()
137137
}
138138
}
139+
140+
// This would fail for now -- error is "Timed out waiting for stream"
141+
// Root cause is that data generated in batch 0 may not get processed in batch 1
142+
// Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
143+
ignore("minimize delay between batch construction and execution") {
144+
val inputData = MemoryStream[Int]
145+
testStream(inputData.toDS())(
146+
StartStream(ProcessingTime("10 seconds"), new ManualClock),
147+
/* -- batch 0 ----------------------- */
148+
AddData(inputData, 1),
149+
AddData(inputData, 2),
150+
AddData(inputData, 3),
151+
AdvanceManualClock(10 * 1000), // 10 seconds
152+
/* -- batch 1 ----------------------- */
153+
CheckAnswer(1, 2, 3))
154+
}
139155
}
140156

141157
/**

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
5050
AddData(inputData, 3, 2),
5151
CheckLastBatch((3, 2), (2, 1)),
5252
StopStream,
53-
StartStream,
53+
StartStream(),
5454
AddData(inputData, 3, 2, 1),
5555
CheckLastBatch((3, 3), (2, 2), (1, 1)),
5656
// By default we run in new tuple mode.
@@ -113,10 +113,10 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
113113
.as[(Int, Long)]
114114

115115
testStream(aggregated)(
116-
StartStream,
116+
StartStream(),
117117
AddData(inputData, 1, 2, 3, 4),
118118
ExpectFailure[SparkException](),
119-
StartStream,
119+
StartStream(),
120120
CheckLastBatch((1, 1), (2, 1), (3, 1), (4, 1))
121121
)
122122
}

sql/core/src/test/scala/org/apache/spark/sql/util/ContinuousQueryListenerSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
4848
val input = MemoryStream[Int]
4949
withListenerAdded(listener) {
5050
testStream(input.toDS)(
51-
StartStream,
51+
StartStream(),
5252
Assert("Incorrect query status in onQueryStarted") {
5353
val status = listener.startStatus
5454
assert(status != null)
@@ -102,7 +102,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
102102
def isListenerActive(listener: QueryStatusCollector): Boolean = {
103103
listener.reset()
104104
testStream(MemoryStream[Int].toDS)(
105-
StartStream,
105+
StartStream(),
106106
StopStream
107107
)
108108
listener.startStatus != null
@@ -133,7 +133,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
133133
listener.reset()
134134
require(listener.startStatus === null)
135135
testStream(MemoryStream[Int].toDS)(
136-
StartStream,
136+
StartStream(),
137137
Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
138138
StopStream,
139139
Assert { listener.checkAsyncErrors() }

0 commit comments

Comments
 (0)