Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.util.{Clock, SystemClock}

/**
* :: Experimental ::
Expand Down Expand Up @@ -175,6 +176,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
df: DataFrame,
sink: Sink,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock(),
outputMode: OutputMode = Append): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
Expand Down Expand Up @@ -206,8 +208,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
checkpointLocation,
logicalPlan,
sink,
outputMode,
trigger)
trigger,
triggerClock,
outputMode)
query.start()
activeQueries.put(name, query)
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
import org.apache.spark.util.UninterruptibleThread
import org.apache.spark.util.{Clock, UninterruptibleThread}

/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
Expand All @@ -49,8 +49,9 @@ class StreamExecution(
checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink,
val outputMode: OutputMode,
val trigger: Trigger)
val trigger: Trigger,
private[sql] val triggerClock: Clock,
val outputMode: OutputMode)
extends ContinuousQuery with Logging {

/** An monitor used to wait/notify when batches complete. */
Expand Down Expand Up @@ -83,7 +84,7 @@ class StreamExecution(
private val uniqueSources = sources.distinct

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t)
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
}

/** Defines the internal state of execution */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,22 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds")
}

/** Return the next multiple of intervalMs */
/** Return the next multiple of intervalMs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: comment style is off, we use javadoc style

*
* e.g. for intervalMs = 100
* nextBatchTime(0) = 100
* nextBatchTime(1) = 100
* ...
* nextBatchTime(99) = 100
* nextBatchTime(100) = 200
* nextBatchTime(101) = 200
* ...
* nextBatchTime(199) = 200
* nextBatchTime(200) = 300
*
* Note, this way, we'll get nextBatchTime(nextBatchTime(0)) = 200, rather than = 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment took me a while to understand, what do you think about this?

/**
 * Returns the start time in milliseconds for the next batch interval, given the current time.
 * Note that a batch interval is inclusive with respect to its start time, and thus calling
 * `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given 
 * an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`).
 */

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me update it with your much clearer verson! Thanks!

* */
def nextBatchTime(now: Long): Long = {
(now - 1) / intervalMs * intervalMs + intervalMs
now / intervalMs * intervalMs + intervalMs
Copy link
Member

@zsxwing zsxwing May 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote this method, I was trying to deal with one case: If a batch takes exactly intervalMs, we should run the next batch at once instead of sleeping intervalMs. This change will break it.

However, I forgot to handle the case that a batch takes 0ms. How about changing this line to:

        if (batchElapsedTimeMs == 0) {
          clock.waitTillTime(intervalMs)
        } else {
          clock.waitTillTime(nextBatchTime(batchEndTimeMs))
        }

Copy link
Contributor Author

@lw-lin lw-lin May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing thanks for clarifying on this! :-)

[1]
The issue is triggered when both batchElapsedTimeMs == 0 and batchEndTimeMs is multiple of intervalMS hold, e.g. batchStartTimeMs == 50 and batchEndTimeMS == 50 given intervalMS == 100 won't trigger the issue. So, we might have to do like this:

if (batchElapsedTimeMs == 0 && batchEndTimeMs % intervalMS == 0) {
  clock.waitTillTime(batchEndTimeMs + intervalMs)
} else {
  clock.waitTillTime(nextBatchTime(batchEndTimeMs))
}

For me It seems a little hard to interpret...

[2]

... deal with one case: If a batch takes exactly intervalMs, we should run the next batch at once instead of sleeping intervalMs

This is a good point! I've done some calculations based on your comments, and it seems we would still run the next batch at once when the last job takes exactly intervalMs?

prior to this path:

batch      | job
---------------------------------------------------------
[  0,  99] |
[100, 199] | job x starts at 100, stops at 199, takes 100
[200, 299] |

after this patch, it's still the same:

batch      | job
---------------------------------------------------------
[  0,  99] |
[100, 199] | job y starts at 100, stops at 199, takes 100
[200, 299] |

@zsxwing given the above [1] and [2], maybe we should simply change now - 1 to now?
Any thoughts please? Thanks! :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think your approach is better. Thanks for your clarifying.

}
}
58 changes: 42 additions & 16 deletions sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, ManualClock, Utils}

/**
* A framework for implementing tests for streaming queries and sources.
Expand Down Expand Up @@ -142,7 +142,10 @@ trait StreamTest extends QueryTest with Timeouts {
case object StopStream extends StreamAction with StreamMustBeRunning

/** Starts the stream, resuming if data has already been processed. It must not be running. */
case object StartStream extends StreamAction
case class StartStream(trigger: Trigger = null, triggerClock: Clock = null) extends StreamAction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use the same default values of StreamExecution. Then you don't need to handle the null case.

Copy link
Contributor Author

@lw-lin lw-lin May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer of nulls was intended to delegate the default values of StreamExecution into these tests, so that we don't have to set the same default values in many places and maintain their consistency. But since it seems very unlikely that we would change the default values, so I've removed the nulls layer and followed your comments.

Thanks!


/** Advance the trigger clock's time manually. */
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction

/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
Expand Down Expand Up @@ -199,8 +202,8 @@ trait StreamTest extends QueryTest with Timeouts {

// If the test doesn't manually start the stream, we do it automatically at the beginning.
val startedManually =
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
val startedTest = if (startedManually) actions else StartStream +: actions
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream])
val startedTest = if (startedManually) actions else StartStream() +: actions

def testActions = actions.zipWithIndex.map {
case (a, i) =>
Expand Down Expand Up @@ -280,19 +283,35 @@ trait StreamTest extends QueryTest with Timeouts {
try {
startedTest.foreach { action =>
action match {
case StartStream =>
case StartStream(_trigger, _triggerClock) =>
verify(currentStream == null, "stream already running")
lastStream = currentStream
currentStream =
sqlContext
.streams
.startQuery(
StreamExecution.nextName,
metadataRoot,
stream,
sink,
outputMode = outputMode)
.asInstanceOf[StreamExecution]
if (_trigger != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is minor, but why the _underscores here? Its also a little confusing that we have these layers of default arguments. Should we get rid of the defaults in startQuery and only have them in this test code?

// we pass in explicit trigger and triggerClock
sqlContext
.streams
.startQuery(
StreamExecution.nextName,
metadataRoot,
stream,
sink,
trigger = _trigger,
triggerClock = _triggerClock,
outputMode = outputMode)
.asInstanceOf[StreamExecution]
} else {
// we left out trigger and triggerClock as their default values
sqlContext
.streams
.startQuery(
StreamExecution.nextName,
metadataRoot,
stream,
sink,
outputMode = outputMode)
.asInstanceOf[StreamExecution]
}
currentStream.microBatchThread.setUncaughtExceptionHandler(
new UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
Expand All @@ -301,6 +320,13 @@ trait StreamTest extends QueryTest with Timeouts {
}
})

case AdvanceManualClock(timeToAdd) =>
verify(currentStream != null,
"can not advance manual clock when a stream is not running")
verify(currentStream.triggerClock.isInstanceOf[ManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)

case StopStream =>
verify(currentStream != null, "can not stop a stream that is not running")
try failAfter(streamingTimeout) {
Expand Down Expand Up @@ -470,7 +496,7 @@ trait StreamTest extends QueryTest with Timeouts {
addRandomData()

case _ => // StartStream
actions += StartStream
actions += StartStream()
running = true
}
} else {
Expand All @@ -488,7 +514,7 @@ trait StreamTest extends QueryTest with Timeouts {
}
}
}
if(!running) { actions += StartStream }
if(!running) { actions += StartStream() }
addCheck()
testStream(ds)(actions: _*)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,41 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.ProcessingTime
import org.apache.spark.util.ManualClock
import org.apache.spark.util.{Clock, ManualClock, SystemClock}

class ProcessingTimeExecutorSuite extends SparkFunSuite {

test("nextBatchTime") {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
assert(processingTimeExecutor.nextBatchTime(0) === 100)
assert(processingTimeExecutor.nextBatchTime(1) === 100)
assert(processingTimeExecutor.nextBatchTime(99) === 100)
assert(processingTimeExecutor.nextBatchTime(100) === 100)
assert(processingTimeExecutor.nextBatchTime(100) === 200)
assert(processingTimeExecutor.nextBatchTime(101) === 200)
assert(processingTimeExecutor.nextBatchTime(150) === 200)
}

private def testNextBatchTimeAgainstClock(clock: Clock) {
val IntervalMS = 100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lowercase first letter for variables.

Copy link
Contributor Author

@lw-lin lw-lin May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure; let me fix this

val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(IntervalMS), clock)

val ITERATION = 10
var nextBatchTime: Long = 0
for (it <- 1 to ITERATION)
nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime)

// nextBatchTime should be 1000
assert(nextBatchTime === IntervalMS * ITERATION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this checking that isn't checked by test("nextBatchTime") above?

}

test("nextBatchTime against SystemClock") {
testNextBatchTimeAgainstClock(new SystemClock)
}

test("nextBatchTime against ManualClock") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note the ProcessingTimeExecutor issue would fail this test without this patch, but would pass with this patch.

testNextBatchTimeAgainstClock(new ManualClock)
}

private def testBatchTermination(intervalMs: Long): Unit = {
var batchCounts = 0
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
TestAwaitTermination(ExpectNotBlocked),
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true),
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true),
StartStream,
StartStream(),
AssertOnQuery(_.isActive === true),
AddData(inputData, 0),
ExpectFailure[SparkException],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
Expand All @@ -292,7 +292,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
"{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
src,
tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(
"{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
Expand Down Expand Up @@ -385,7 +385,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
Expand Down Expand Up @@ -449,7 +449,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.sql.streaming

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ManualClock

class StreamSuite extends StreamTest with SharedSQLContext {

Expand All @@ -34,11 +34,11 @@ class StreamSuite extends StreamTest with SharedSQLContext {

testStream(mapped)(
AddData(inputData, 1, 2, 3),
StartStream,
StartStream(),
CheckAnswer(2, 3, 4),
StopStream,
AddData(inputData, 4, 5, 6),
StartStream,
StartStream(),
CheckAnswer(2, 3, 4, 5, 6, 7))
}

Expand Down Expand Up @@ -70,7 +70,7 @@ class StreamSuite extends StreamTest with SharedSQLContext {
CheckAnswer(1, 2, 3, 4, 5, 6),
StopStream,
AddData(inputData1, 7),
StartStream,
StartStream(),
AddData(inputData2, 8),
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8))
}
Expand Down Expand Up @@ -136,6 +136,22 @@ class StreamSuite extends StreamTest with SharedSQLContext {
testStream(ds)()
}
}

// This would fail for now -- error is "Timed out waiting for stream"
// Root cause is that data generated in batch 0 may not get processed in batch 1
// Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
ignore("minimize delay between batch construction and execution") {
val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock),
/* -- batch 0 ----------------------- */
AddData(inputData, 1),
AddData(inputData, 2),
AddData(inputData, 3),
AdvanceManualClock(10 * 1000), // 10 seconds
/* -- batch 1 ----------------------- */
CheckAnswer(1, 2, 3))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above test takes advantage of the new StartStream and AdvanceManualClock action.
It's testing against SPARK-14942: Reduce delay between batch construction and execution with a manually timed executor.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
AddData(inputData, 3, 2),
CheckLastBatch((3, 2), (2, 1)),
StopStream,
StartStream,
StartStream(),
AddData(inputData, 3, 2, 1),
CheckLastBatch((3, 3), (2, 2), (1, 1)),
// By default we run in new tuple mode.
Expand Down Expand Up @@ -113,10 +113,10 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
.as[(Int, Long)]

testStream(aggregated)(
StartStream,
StartStream(),
AddData(inputData, 1, 2, 3, 4),
ExpectFailure[SparkException](),
StartStream,
StartStream(),
CheckLastBatch((1, 1), (2, 1), (3, 1), (4, 1))
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
val input = MemoryStream[Int]
withListenerAdded(listener) {
testStream(input.toDS)(
StartStream,
StartStream(),
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
Expand Down Expand Up @@ -104,7 +104,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
def isListenerActive(listener: QueryStatusCollector): Boolean = {
listener.reset()
testStream(MemoryStream[Int].toDS)(
StartStream,
StartStream(),
StopStream
)
listener.startStatus != null
Expand Down Expand Up @@ -135,7 +135,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
listener.reset()
require(listener.startStatus === null)
testStream(MemoryStream[Int].toDS)(
StartStream,
StartStream(),
Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
StopStream,
Assert { listener.checkAsyncErrors() }
Expand Down