Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 3 additions & 15 deletions core/src/main/scala/org/apache/spark/util/ManualClock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ package org.apache.spark.util
*/
private[spark] class ManualClock(private var time: Long) extends Clock {

private var _isWaiting = false

/**
* @return `ManualClock` with initial time 0
*/
Expand Down Expand Up @@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
* @return current time reported by the clock when waiting finishes
*/
def waitTillTime(targetTime: Long): Long = synchronized {
_isWaiting = true
try {
while (time < targetTime) {
wait(10)
}
getTimeMillis()
} finally {
_isWaiting = false
while (time < targetTime) {
wait(10)
}
getTimeMillis()
}

/**
* Returns whether there is any thread being blocked in `waitTillTime`.
*/
def isWaiting: Boolean = synchronized { _isWaiting }
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class StreamSuite extends StreamTest {

val inputData = MemoryStream[Int]
testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock),
StartStream(ProcessingTime("10 seconds"), new StreamManualClock),

/* -- batch 0 ----------------------- */
// Add some data in batch 0
Expand Down Expand Up @@ -199,7 +199,7 @@ class StreamSuite extends StreamTest {

/* Stop then restart the Stream */
StopStream,
StartStream(ProcessingTime("10 seconds"), new ManualClock),
StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),

/* -- batch 1 rerun ----------------- */
// this batch 1 would re-run because the latest batch id logged in offset log is 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
extends StreamAction

class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
private var waitStartTime: Option[Long] = None

override def waitTillTime(targetTime: Long): Long = synchronized {
try {
waitStartTime = Some(getTimeMillis())
super.waitTillTime(targetTime)
} finally {
waitStartTime = None
}
}

def isStreamWaitingAt(time: Long): Boolean = synchronized { waitStartTime.contains(time) }
}


/**
* Executes the specified actions on the given streaming DataFrame and provides helpful
Expand Down Expand Up @@ -307,14 +322,20 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val testThread = Thread.currentThread()
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
val statusCollector = new QueryStatusCollector

var manualClockExpectedTime = -1L
try {
spark.streams.addListener(statusCollector)
startedTest.foreach { action =>
logInfo(s"Processing test stream action: $action")
action match {
case StartStream(trigger, triggerClock) =>
verify(currentStream == null, "stream already running")
verify(triggerClock.isInstanceOf[SystemClock]
|| triggerClock.isInstanceOf[StreamManualClock],
"Use either SystemClock or StreamManualClock to start the stream")
if (triggerClock.isInstanceOf[StreamManualClock]) {
manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis()
}
lastStream = currentStream
currentStream =
spark
Expand All @@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case AdvanceManualClock(timeToAdd) =>
verify(currentStream != null,
"can not advance manual clock when a stream is not running")
verify(currentStream.triggerClock.isInstanceOf[ManualClock],
verify(currentStream.triggerClock.isInstanceOf[StreamManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
val clock = currentStream.triggerClock.asInstanceOf[ManualClock]
val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
assert(manualClockExpectedTime >= 0)
// Make sure we don't advance ManualClock too early. See SPARK-16002.
eventually("ManualClock has not yet entered the waiting state") {
assert(clock.isWaiting)
eventually("StreamManualClock has not yet entered the waiting state") {
assert(clock.isStreamWaitingAt(manualClockExpectedTime))
}
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
clock.advance(timeToAdd)
manualClockExpectedTime += timeToAdd
verify(clock.getTimeMillis() === manualClockExpectedTime,
s"Unexpected clock time after updating: " +
s"expecting $manualClockExpectedTime, current ${clock.getTimeMillis()}")

case StopStream =>
verify(currentStream != null, "can not stop a stream that is not running")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Make sure we don't leak any events to the next test
}

ignore("single listener, check trigger statuses") {
test("single listener, check trigger statuses") {
import StreamingQueryListenerSuite._
clock = new ManualClock()
clock = new StreamManualClock

/** Custom MemoryStream that waits for manual clock to reach a time */
val inputData = new MemoryStream[Int](0, sqlContext) {
Expand Down Expand Up @@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress
assert(status.triggerDetails.get("triggerId") == "0")
assert(status.triggerDetails.containsKey("triggerId"))
assert(status.triggerDetails.get("isTriggerActive") === "false")
assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")

Expand All @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")

assert(status.sourceStatuses.length === 1)
assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0")
assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
Expand Down