Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.util.{Clock, SystemClock}

/**
* :: Experimental ::
Expand Down Expand Up @@ -175,6 +176,7 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
df: DataFrame,
sink: Sink,
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock(),
outputMode: OutputMode = Append): ContinuousQuery = {
activeQueriesLock.synchronized {
if (activeQueries.contains(name)) {
Expand Down Expand Up @@ -206,8 +208,9 @@ class ContinuousQueryManager(sparkSession: SparkSession) {
checkpointLocation,
logicalPlan,
sink,
outputMode,
trigger)
trigger,
triggerClock,
outputMode)
query.start()
activeQueries.put(name, query)
query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.ContinuousQueryListener
import org.apache.spark.sql.util.ContinuousQueryListener._
import org.apache.spark.util.{UninterruptibleThread, Utils}
import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}

/**
* Manages the execution of a streaming Spark SQL query that is occurring in a separate thread.
Expand All @@ -50,8 +50,9 @@ class StreamExecution(
checkpointRoot: String,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink,
val outputMode: OutputMode,
val trigger: Trigger)
val trigger: Trigger,
private[sql] val triggerClock: Clock,
val outputMode: OutputMode)
extends ContinuousQuery with Logging {

/**
Expand Down Expand Up @@ -88,7 +89,7 @@ class StreamExecution(
private val uniqueSources = sources.distinct

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t)
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
}

/** Defines the internal state of execution */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds")
}

/** Return the next multiple of intervalMs */
/**
* Returns the start time in milliseconds for the next batch interval, given the current time.
* Note that a batch interval is inclusive with respect to its start time, and thus calling
* `nextBatchTime` with the result of a previous call should return the next interval. (i.e. given
* an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`).
Copy link
Member

@zsxwing zsxwing May 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: nextBatchTime(nextBatchTime(0)) = 200 -> nextBatchTime(nextBatchTime(0)) = 100

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextBatchTime(0) = 100, so nextBatchTime(nextBatchTime(0)) = 200?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. Sorry for the mistake.

*/
def nextBatchTime(now: Long): Long = {
(now - 1) / intervalMs * intervalMs + intervalMs
now / intervalMs * intervalMs + intervalMs
Copy link
Member

@zsxwing zsxwing May 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I wrote this method, I was trying to deal with one case: If a batch takes exactly intervalMs, we should run the next batch at once instead of sleeping intervalMs. This change will break it.

However, I forgot to handle the case that a batch takes 0ms. How about changing this line to:

        if (batchElapsedTimeMs == 0) {
          clock.waitTillTime(intervalMs)
        } else {
          clock.waitTillTime(nextBatchTime(batchEndTimeMs))
        }

Copy link
Contributor Author

@lw-lin lw-lin May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zsxwing thanks for clarifying on this! :-)

[1]
The issue is triggered when both batchElapsedTimeMs == 0 and batchEndTimeMs is multiple of intervalMS hold, e.g. batchStartTimeMs == 50 and batchEndTimeMS == 50 given intervalMS == 100 won't trigger the issue. So, we might have to do like this:

if (batchElapsedTimeMs == 0 && batchEndTimeMs % intervalMS == 0) {
  clock.waitTillTime(batchEndTimeMs + intervalMs)
} else {
  clock.waitTillTime(nextBatchTime(batchEndTimeMs))
}

For me It seems a little hard to interpret...

[2]

... deal with one case: If a batch takes exactly intervalMs, we should run the next batch at once instead of sleeping intervalMs

This is a good point! I've done some calculations based on your comments, and it seems we would still run the next batch at once when the last job takes exactly intervalMs?

prior to this path:

batch      | job
---------------------------------------------------------
[  0,  99] |
[100, 199] | job x starts at 100, stops at 199, takes 100
[200, 299] |

after this patch, it's still the same:

batch      | job
---------------------------------------------------------
[  0,  99] |
[100, 199] | job y starts at 100, stops at 199, takes 100
[200, 299] |

@zsxwing given the above [1] and [2], maybe we should simply change now - 1 to now?
Any thoughts please? Thanks! :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I think your approach is better. Thanks for your clarifying.

}
}
33 changes: 24 additions & 9 deletions sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.util.Utils
import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}

/**
* A framework for implementing tests for streaming queries and sources.
Expand Down Expand Up @@ -138,11 +138,17 @@ trait StreamTest extends QueryTest with Timeouts {
private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
}

/** Stops the stream. It must currently be running. */
/** Stops the stream. It must currently be running. */
case object StopStream extends StreamAction with StreamMustBeRunning

/** Starts the stream, resuming if data has already been processed. It must not be running. */
case object StartStream extends StreamAction
/** Starts the stream, resuming if data has already been processed. It must not be running. */
case class StartStream(
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock)
extends StreamAction

/** Advance the trigger clock's time manually. */
case class AdvanceManualClock(timeToAdd: Long) extends StreamAction

/** Signals that a failure is expected and should not kill the test. */
case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction {
Expand Down Expand Up @@ -199,8 +205,8 @@ trait StreamTest extends QueryTest with Timeouts {

// If the test doesn't manually start the stream, we do it automatically at the beginning.
val startedManually =
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
val startedTest = if (startedManually) actions else StartStream +: actions
actions.takeWhile(!_.isInstanceOf[StreamMustBeRunning]).exists(_.isInstanceOf[StartStream])
val startedTest = if (startedManually) actions else StartStream() +: actions

def testActions = actions.zipWithIndex.map {
case (a, i) =>
Expand Down Expand Up @@ -280,7 +286,7 @@ trait StreamTest extends QueryTest with Timeouts {
try {
startedTest.foreach { action =>
action match {
case StartStream =>
case StartStream(trigger, triggerClock) =>
verify(currentStream == null, "stream already running")
lastStream = currentStream
currentStream =
Expand All @@ -291,6 +297,8 @@ trait StreamTest extends QueryTest with Timeouts {
metadataRoot,
stream,
sink,
trigger,
triggerClock,
outputMode = outputMode)
.asInstanceOf[StreamExecution]
currentStream.microBatchThread.setUncaughtExceptionHandler(
Expand All @@ -301,6 +309,13 @@ trait StreamTest extends QueryTest with Timeouts {
}
})

case AdvanceManualClock(timeToAdd) =>
verify(currentStream != null,
"can not advance manual clock when a stream is not running")
verify(currentStream.triggerClock.isInstanceOf[ManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)

case StopStream =>
verify(currentStream != null, "can not stop a stream that is not running")
try failAfter(streamingTimeout) {
Expand Down Expand Up @@ -470,7 +485,7 @@ trait StreamTest extends QueryTest with Timeouts {
addRandomData()

case _ => // StartStream
actions += StartStream
actions += StartStream()
running = true
}
} else {
Expand All @@ -488,7 +503,7 @@ trait StreamTest extends QueryTest with Timeouts {
}
}
}
if(!running) { actions += StartStream }
if(!running) { actions += StartStream() }
addCheck()
testStream(ds)(actions: _*)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,34 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.ProcessingTime
import org.apache.spark.util.ManualClock
import org.apache.spark.util.{Clock, ManualClock, SystemClock}

class ProcessingTimeExecutorSuite extends SparkFunSuite {

test("nextBatchTime") {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
assert(processingTimeExecutor.nextBatchTime(0) === 100)
assert(processingTimeExecutor.nextBatchTime(1) === 100)
assert(processingTimeExecutor.nextBatchTime(99) === 100)
assert(processingTimeExecutor.nextBatchTime(100) === 100)
assert(processingTimeExecutor.nextBatchTime(100) === 200)
assert(processingTimeExecutor.nextBatchTime(101) === 200)
assert(processingTimeExecutor.nextBatchTime(150) === 200)
}

test("calling nextBatchTime with the result of a previous call should return the next interval") {
val intervalMS = 100
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS))

val ITERATION = 10
var nextBatchTime: Long = 0
for (it <- 1 to ITERATION) {
nextBatchTime = processingTimeExecutor.nextBatchTime(nextBatchTime)
}

// nextBatchTime should be 1000
assert(nextBatchTime === intervalMS * ITERATION)
}

private def testBatchTermination(intervalMs: Long): Unit = {
var batchCounts = 0
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ContinuousQuerySuite extends StreamTest with SharedSQLContext {
TestAwaitTermination(ExpectNotBlocked),
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true),
TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true),
StartStream,
StartStream(),
AssertOnQuery(_.isActive === true),
AddData(inputData, 0),
ExpectFailure[SparkException],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
Expand All @@ -292,7 +292,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
"{'value': 'drop4'}\n{'value': 'keep5'}\n{'value': 'keep6'}",
src,
tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData(
"{'value': 'drop7'}\n{'value': 'keep8'}\n{'value': 'keep9'}",
Expand Down Expand Up @@ -385,7 +385,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddParquetFileData(Seq("drop4", "keep5", "keep6"), src, tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddParquetFileData(Seq("drop7", "keep8", "keep9"), src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
Expand Down Expand Up @@ -449,7 +449,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
CheckAnswer("keep2", "keep3"),
StopStream,
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
StartStream,
StartStream(),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.sql.streaming

import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.ManualClock

class StreamSuite extends StreamTest with SharedSQLContext {

Expand All @@ -34,11 +34,11 @@ class StreamSuite extends StreamTest with SharedSQLContext {

testStream(mapped)(
AddData(inputData, 1, 2, 3),
StartStream,
StartStream(),
CheckAnswer(2, 3, 4),
StopStream,
AddData(inputData, 4, 5, 6),
StartStream,
StartStream(),
CheckAnswer(2, 3, 4, 5, 6, 7))
}

Expand Down Expand Up @@ -70,7 +70,7 @@ class StreamSuite extends StreamTest with SharedSQLContext {
CheckAnswer(1, 2, 3, 4, 5, 6),
StopStream,
AddData(inputData1, 7),
StartStream,
StartStream(),
AddData(inputData2, 8),
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8))
}
Expand Down Expand Up @@ -136,6 +136,22 @@ class StreamSuite extends StreamTest with SharedSQLContext {
testStream(ds)()
}
}

// This would fail for now -- error is "Timed out waiting for stream"
// Root cause is that data generated in batch 0 may not get processed in batch 1
// Let's enable this after SPARK-14942: Reduce delay between batch construction and execution
ignore("minimize delay between batch construction and execution") {
val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock),
/* -- batch 0 ----------------------- */
AddData(inputData, 1),
AddData(inputData, 2),
AddData(inputData, 3),
AdvanceManualClock(10 * 1000), // 10 seconds
/* -- batch 1 ----------------------- */
CheckAnswer(1, 2, 3))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above test takes advantage of the new StartStream and AdvanceManualClock action.
It's testing against SPARK-14942: Reduce delay between batch construction and execution with a manually timed executor.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
AddData(inputData, 3, 2),
CheckLastBatch((3, 2), (2, 1)),
StopStream,
StartStream,
StartStream(),
AddData(inputData, 3, 2, 1),
CheckLastBatch((3, 3), (2, 2), (1, 1)),
// By default we run in new tuple mode.
Expand Down Expand Up @@ -113,10 +113,10 @@ class StreamingAggregationSuite extends StreamTest with SharedSQLContext {
.as[(Int, Long)]

testStream(aggregated)(
StartStream,
StartStream(),
AddData(inputData, 1, 2, 3, 4),
ExpectFailure[SparkException](),
StartStream,
StartStream(),
CheckLastBatch((1, 1), (2, 1), (3, 1), (4, 1))
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
val input = MemoryStream[Int]
withListenerAdded(listener) {
testStream(input.toDS)(
StartStream,
StartStream(),
Assert("Incorrect query status in onQueryStarted") {
val status = listener.startStatus
assert(status != null)
Expand Down Expand Up @@ -102,7 +102,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
def isListenerActive(listener: QueryStatusCollector): Boolean = {
listener.reset()
testStream(MemoryStream[Int].toDS)(
StartStream,
StartStream(),
StopStream
)
listener.startStatus != null
Expand Down Expand Up @@ -133,7 +133,7 @@ class ContinuousQueryListenerSuite extends StreamTest with SharedSQLContext with
listener.reset()
require(listener.startStatus === null)
testStream(MemoryStream[Int].toDS)(
StartStream,
StartStream(),
Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"),
StopStream,
Assert { listener.checkAsyncErrors() }
Expand Down