diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 91a95871014f0..e7a65d74a440e 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -26,8 +26,6 @@ package org.apache.spark.util */ private[spark] class ManualClock(private var time: Long) extends Clock { - private var _isWaiting = false - /** * @return `ManualClock` with initial time 0 */ @@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @return current time reported by the clock when waiting finishes */ def waitTillTime(targetTime: Long): Long = synchronized { - _isWaiting = true - try { - while (time < targetTime) { - wait(10) - } - getTimeMillis() - } finally { - _isWaiting = false + while (time < targetTime) { + wait(10) } + getTimeMillis() } - - /** - * Returns whether there is any thread being blocked in `waitTillTime`. - */ - def isWaiting: Boolean = synchronized { _isWaiting } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index cdbad901dba8e..6bdf47901ae68 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -161,7 +161,7 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[Int] testStream(inputData.toDS())( - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new StreamManualClock), /* -- batch 0 ----------------------- */ // Add some data in batch 0 @@ -199,7 +199,7 @@ class StreamSuite extends StreamTest { /* Stop then restart the Stream */ StopStream, - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), /* -- batch 1 rerun ----------------- */ // this batch 1 would re-run because the latest batch id logged in offset log is 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 3b9d3786349ad..254f823bf54f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit) extends StreamAction + class StreamManualClock(time: Long = 0L) extends ManualClock(time) { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { + try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) + } finally { + waitStartTime = None + } + } + + def isStreamWaitingAt(time: Long): Boolean = synchronized { waitStartTime.contains(time) } + } + /** * Executes the specified actions on the given streaming DataFrame and provides helpful @@ -307,7 +322,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val testThread = Thread.currentThread() val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath val statusCollector = new QueryStatusCollector - + var manualClockExpectedTime = -1L try { spark.streams.addListener(statusCollector) startedTest.foreach { action => @@ -315,6 +330,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { action match { case StartStream(trigger, triggerClock) => verify(currentStream == null, "stream already running") + verify(triggerClock.isInstanceOf[SystemClock] + || triggerClock.isInstanceOf[StreamManualClock], + "Use either SystemClock or StreamManualClock to start the stream") + if (triggerClock.isInstanceOf[StreamManualClock]) { + manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis() + } lastStream = currentStream currentStream = spark @@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case AdvanceManualClock(timeToAdd) => verify(currentStream != null, "can not advance manual clock when a stream is not running") - verify(currentStream.triggerClock.isInstanceOf[ManualClock], + verify(currentStream.triggerClock.isInstanceOf[StreamManualClock], s"can not advance clock of type ${currentStream.triggerClock.getClass}") - val clock = currentStream.triggerClock.asInstanceOf[ManualClock] + val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] + assert(manualClockExpectedTime >= 0) // Make sure we don't advance ManualClock too early. See SPARK-16002. - eventually("ManualClock has not yet entered the waiting state") { - assert(clock.isWaiting) + eventually("StreamManualClock has not yet entered the waiting state") { + assert(clock.isStreamWaitingAt(manualClockExpectedTime)) } - currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + clock.advance(timeToAdd) + manualClockExpectedTime += timeToAdd + verify(clock.getTimeMillis() === manualClockExpectedTime, + s"Unexpected clock time after updating: " + + s"expecting $manualClockExpectedTime, current ${clock.getTimeMillis()}") case StopStream => verify(currentStream != null, "can not stop a stream that is not running") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9e0eefbc58aa5..623f66a778eac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - ignore("single listener, check trigger statuses") { + test("single listener, check trigger statuses") { import StreamingQueryListenerSuite._ - clock = new ManualClock() + clock = new StreamManualClock /** Custom MemoryStream that waits for manual clock to reach a time */ val inputData = new MemoryStream[Int](0, sqlContext) { @@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.get("triggerId") == "0") + assert(status.triggerDetails.containsKey("triggerId")) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") + assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId")) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")