From 5bc47b639ede049f44ad4f47a88d26219fea6193 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Sat, 15 Oct 2016 10:21:58 +0800 Subject: [PATCH 1/3] Fix flaky test --- .../scala/org/apache/spark/util/ManualClock.scala | 13 +++++++++++-- .../org/apache/spark/sql/streaming/StreamTest.scala | 5 +++-- .../sql/streaming/StreamingQueryListenerSuite.scala | 6 +++--- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 91a95871014f0..51c6344660e6d 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -27,6 +27,7 @@ package org.apache.spark.util private[spark] class ManualClock(private var time: Long) extends Clock { private var _isWaiting = false + private var _readyForFirstPeek = false /** * @return `ManualClock` with initial time 0 @@ -43,14 +44,19 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def setTime(timeToSet: Long): Unit = synchronized { time = timeToSet + _readyForFirstPeek = false notifyAll() } /** + * Note: if we don't want to advance too early, we should place a `isWaitingAndReadyForFirstPeek` + * guard before we call this. See SPARK-16002. See also `[sql] StreamTest#AdvanceManualClock`. + * * @param timeToAdd time (in milliseconds) to add to the clock's time */ def advance(timeToAdd: Long): Unit = synchronized { time += timeToAdd + _readyForFirstPeek = false notifyAll() } @@ -60,6 +66,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def waitTillTime(targetTime: Long): Long = synchronized { _isWaiting = true + _readyForFirstPeek = true try { while (time < targetTime) { wait(10) @@ -67,11 +74,13 @@ private[spark] class ManualClock(private var time: Long) extends Clock { getTimeMillis() } finally { _isWaiting = false + _readyForFirstPeek = false } } /** - * Returns whether there is any thread being blocked in `waitTillTime`. + * Returns whether there is any thread being blocked in `waitTillTime`, and this will be the first + * time it's been peeked in the blocking state. */ - def isWaiting: Boolean = synchronized { _isWaiting } + def isWaitingAndReadyForFirstPeek: Boolean = synchronized { _isWaiting && _readyForFirstPeek } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 3b9d3786349ad..a2c17282cd695 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -342,8 +342,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { s"can not advance clock of type ${currentStream.triggerClock.getClass}") val clock = currentStream.triggerClock.asInstanceOf[ManualClock] // Make sure we don't advance ManualClock too early. See SPARK-16002. - eventually("ManualClock has not yet entered the waiting state") { - assert(clock.isWaiting) + eventually("ManualClock has not yet entered the waiting state, or the clock has been " + + "advanced too early") { + assert(clock.isWaitingAndReadyForFirstPeek) } currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9e0eefbc58aa5..b2a4eedfacf55 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -43,7 +43,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - ignore("single listener, check trigger statuses") { + test(s"single listener, check trigger statuses") { import StreamingQueryListenerSuite._ clock = new ManualClock() @@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.get("triggerId") == "0") + assert(Seq("-1", "0").contains(status.triggerDetails.get("triggerId"))) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") + assert(Seq("-1", "0").contains(status.sourceStatuses(0).triggerDetails.get("triggerId"))) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") From eb59a98146f30163675cec3b52f69fedd7a234fc Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Mon, 17 Oct 2016 21:15:40 +0800 Subject: [PATCH 2/3] Revert "Fix flaky test" This reverts commit 5bc47b639ede049f44ad4f47a88d26219fea6193. --- .../scala/org/apache/spark/util/ManualClock.scala | 13 ++----------- .../org/apache/spark/sql/streaming/StreamTest.scala | 5 ++--- .../sql/streaming/StreamingQueryListenerSuite.scala | 6 +++--- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 51c6344660e6d..91a95871014f0 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -27,7 +27,6 @@ package org.apache.spark.util private[spark] class ManualClock(private var time: Long) extends Clock { private var _isWaiting = false - private var _readyForFirstPeek = false /** * @return `ManualClock` with initial time 0 @@ -44,19 +43,14 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def setTime(timeToSet: Long): Unit = synchronized { time = timeToSet - _readyForFirstPeek = false notifyAll() } /** - * Note: if we don't want to advance too early, we should place a `isWaitingAndReadyForFirstPeek` - * guard before we call this. See SPARK-16002. See also `[sql] StreamTest#AdvanceManualClock`. - * * @param timeToAdd time (in milliseconds) to add to the clock's time */ def advance(timeToAdd: Long): Unit = synchronized { time += timeToAdd - _readyForFirstPeek = false notifyAll() } @@ -66,7 +60,6 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def waitTillTime(targetTime: Long): Long = synchronized { _isWaiting = true - _readyForFirstPeek = true try { while (time < targetTime) { wait(10) @@ -74,13 +67,11 @@ private[spark] class ManualClock(private var time: Long) extends Clock { getTimeMillis() } finally { _isWaiting = false - _readyForFirstPeek = false } } /** - * Returns whether there is any thread being blocked in `waitTillTime`, and this will be the first - * time it's been peeked in the blocking state. + * Returns whether there is any thread being blocked in `waitTillTime`. */ - def isWaitingAndReadyForFirstPeek: Boolean = synchronized { _isWaiting && _readyForFirstPeek } + def isWaiting: Boolean = synchronized { _isWaiting } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index a2c17282cd695..3b9d3786349ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -342,9 +342,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { s"can not advance clock of type ${currentStream.triggerClock.getClass}") val clock = currentStream.triggerClock.asInstanceOf[ManualClock] // Make sure we don't advance ManualClock too early. See SPARK-16002. - eventually("ManualClock has not yet entered the waiting state, or the clock has been " + - "advanced too early") { - assert(clock.isWaitingAndReadyForFirstPeek) + eventually("ManualClock has not yet entered the waiting state") { + assert(clock.isWaiting) } currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index b2a4eedfacf55..9e0eefbc58aa5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -43,7 +43,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - test(s"single listener, check trigger statuses") { + ignore("single listener, check trigger statuses") { import StreamingQueryListenerSuite._ clock = new ManualClock() @@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(Seq("-1", "0").contains(status.triggerDetails.get("triggerId"))) + assert(status.triggerDetails.get("triggerId") == "0") assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(Seq("-1", "0").contains(status.sourceStatuses(0).triggerDetails.get("triggerId"))) + assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") From 7ae7782cdede0c3f2a3db0a09401cf0d682a264f Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Mon, 17 Oct 2016 19:53:46 +0800 Subject: [PATCH 3/3] Fix flaky test again --- .../org/apache/spark/util/ManualClock.scala | 12 +-- .../apache/spark/util/ManualClockSuite.scala | 82 +++++++++++++++++++ .../spark/sql/streaming/StreamSuite.scala | 16 ++-- .../spark/sql/streaming/StreamTest.scala | 8 +- .../StreamingQueryListenerSuite.scala | 12 +-- 5 files changed, 107 insertions(+), 23 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/ManualClockSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 91a95871014f0..0f2c60357f1ec 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -26,7 +26,7 @@ package org.apache.spark.util */ private[spark] class ManualClock(private var time: Long) extends Clock { - private var _isWaiting = false + private var timeWhenWaitStarted: Option[Long] = None /** * @return `ManualClock` with initial time 0 @@ -59,19 +59,21 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @return current time reported by the clock when waiting finishes */ def waitTillTime(targetTime: Long): Long = synchronized { - _isWaiting = true + timeWhenWaitStarted = Some(time) try { while (time < targetTime) { wait(10) } getTimeMillis() } finally { - _isWaiting = false + timeWhenWaitStarted = None } } /** - * Returns whether there is any thread being blocked in `waitTillTime`. + * Returns whether there is any thread being blocked on the given time in `waitTillTime`. */ - def isWaiting: Boolean = synchronized { _isWaiting } + def isThreadWaitingAt(timeWhenWaitStarted: Long): Boolean = synchronized { + this.timeWhenWaitStarted.contains(timeWhenWaitStarted) + } } diff --git a/core/src/test/scala/org/apache/spark/util/ManualClockSuite.scala b/core/src/test/scala/org/apache/spark/util/ManualClockSuite.scala new file mode 100644 index 0000000000000..c5b2a1fcaca87 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/ManualClockSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import java.util.concurrent.CountDownLatch + +import scala.util.control.NonFatal + +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.exceptions.TestFailedException +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.internal.Logging +import org.apache.spark.SparkFunSuite + +class ManualClockSuite extends SparkFunSuite with Logging { + + // this test takes 3 seconds + test("ManualClock - isThreadWaitingAt() & advance() are working") { + + val clock = new ManualClock() + + val latch = new CountDownLatch(1) + + new Thread(new Runnable { + override def run(): Unit = { + // verify that the clock should advance from 0 to 10 (in a very short time); otherwise fail + // this test + try { + Eventually.eventually(Timeout(3.seconds)) { + assert(clock.isThreadWaitingAt(0)) + } + clock.advance(10) + } + catch { + case NonFatal(e) => fail(e) + } + + // verify that the clock should not advance any more, because no thread is waiting at 20 + val e = intercept[TestFailedException] { + // this waiting for timeout takes most of this test's time + Eventually.eventually(Timeout(3.seconds)) { + assert(clock.isThreadWaitingAt(20)) + } + } + assert(e.getMessage().contains( + "Last failure message: clock.isThreadWaitingAt(20L) was false")) + + // allow the main thread to proceed + latch.countDown() + } + }).start() + + // verify that the clock should be advanced to 10 (in a very short time) + Eventually.eventually(Timeout(3.seconds)) { + assert(clock.waitTillTime(10) === 10) + } + + // prevent this main thread to finish too early, so that we can verify the + // clock-advance-thread would timeout as expected + Eventually.eventually(Timeout(10.seconds)) { + latch.await() + } + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index cdbad901dba8e..ba5286416ba60 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -166,7 +166,7 @@ class StreamSuite extends StreamTest { /* -- batch 0 ----------------------- */ // Add some data in batch 0 AddData(inputData, 1, 2, 3), - AdvanceManualClock(10 * 1000), // 10 seconds + AdvanceManualClock(0, 10 * 1000), // 10 seconds /* -- batch 1 ----------------------- */ // Check the results of batch 0 @@ -176,7 +176,7 @@ class StreamSuite extends StreamTest { CheckSinkLatestBatchId(0), // Add some data in batch 1 AddData(inputData, 4, 5, 6), - AdvanceManualClock(10 * 1000), + AdvanceManualClock(10 * 1000, 10 * 1000), /* -- batch _ ----------------------- */ // Check the results of batch 1 @@ -185,9 +185,9 @@ class StreamSuite extends StreamTest { CheckOffsetLogLatestBatchId(1), CheckSinkLatestBatchId(1), - AdvanceManualClock(10 * 1000), - AdvanceManualClock(10 * 1000), - AdvanceManualClock(10 * 1000), + AdvanceManualClock(20 * 1000, 10 * 1000), + AdvanceManualClock(30 * 1000, 10 * 1000), + AdvanceManualClock(40 * 1000, 10 * 1000), /* -- batch __ ---------------------- */ // Check the results of batch 1 again; this is to make sure that, when there's no new data, @@ -199,11 +199,11 @@ class StreamSuite extends StreamTest { /* Stop then restart the Stream */ StopStream, - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new ManualClock(60 * 1000)), /* -- batch 1 rerun ----------------- */ // this batch 1 would re-run because the latest batch id logged in offset log is 1 - AdvanceManualClock(10 * 1000), + AdvanceManualClock(60 * 1000, 10 * 1000), /* -- batch 2 ----------------------- */ // Check the results of batch 1 @@ -213,7 +213,7 @@ class StreamSuite extends StreamTest { CheckSinkLatestBatchId(1), // Add some data in batch 2 AddData(inputData, 7, 8, 9), - AdvanceManualClock(10 * 1000), + AdvanceManualClock(70 * 1000, 10 * 1000), /* -- batch 3 ----------------------- */ // Check the results of batch 2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 3b9d3786349ad..90aee9d1c8513 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -165,7 +165,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { extends StreamAction /** Advance the trigger clock's time manually. */ - case class AdvanceManualClock(timeToAdd: Long) extends StreamAction + case class AdvanceManualClock(timeWhenWaitStarted: Long, timeToAdd: Long) extends StreamAction /** Signals that a failure is expected and should not kill the test. */ case class ExpectFailure[T <: Throwable : ClassTag]() extends StreamAction { @@ -335,7 +335,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } }) - case AdvanceManualClock(timeToAdd) => + case AdvanceManualClock(timeWhenWaitStarted, timeToAdd) => verify(currentStream != null, "can not advance manual clock when a stream is not running") verify(currentStream.triggerClock.isInstanceOf[ManualClock], @@ -343,9 +343,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val clock = currentStream.triggerClock.asInstanceOf[ManualClock] // Make sure we don't advance ManualClock too early. See SPARK-16002. eventually("ManualClock has not yet entered the waiting state") { - assert(clock.isWaiting) + assert(clock.isThreadWaitingAt(timeWhenWaitStarted)) } - currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + clock.advance(timeToAdd) case StopStream => verify(currentStream != null, "can not stop a stream that is not running") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9e0eefbc58aa5..5e77204949746 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -43,7 +43,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - ignore("single listener, check trigger statuses") { + test("single listener, check trigger statuses") { import StreamingQueryListenerSuite._ clock = new ManualClock() @@ -74,14 +74,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testStream(mapped, OutputMode.Complete)( StartStream(triggerClock = clock), AddData(inputData, 1, 2), - AdvanceManualClock(100), // unblock getOffset, will block on getBatch - AdvanceManualClock(200), // unblock getBatch, will block on computation - AdvanceManualClock(300), // unblock computation + AdvanceManualClock(0, 100), // unblock getOffset, will block on getBatch + AdvanceManualClock(100, 200), // unblock getBatch, will block on computation + AdvanceManualClock(300, 300), // unblock computation AssertOnQuery { _ => clock.getTimeMillis() === 600 }, AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.get("triggerId") == "0") + assert(status.triggerDetails.containsKey("triggerId")) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") + assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId")) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")