Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions core/src/main/scala/org/apache/spark/util/ManualClock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package org.apache.spark.util
private[spark] class ManualClock(private var time: Long) extends Clock {

private var _isWaiting = false
private var _readyForFirstPeek = false

/**
* @return `ManualClock` with initial time 0
Expand All @@ -43,14 +44,19 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
*/
def setTime(timeToSet: Long): Unit = synchronized {
time = timeToSet
_readyForFirstPeek = false
notifyAll()
}

/**
* Note: if we don't want to advance too early, we should place a `isWaitingAndReadyForFirstPeek`
* guard before we call this. See SPARK-16002. See also `[sql] StreamTest#AdvanceManualClock`.
*
* @param timeToAdd time (in milliseconds) to add to the clock's time
*/
def advance(timeToAdd: Long): Unit = synchronized {
time += timeToAdd
_readyForFirstPeek = false
notifyAll()
}

Expand All @@ -60,18 +66,21 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
*/
def waitTillTime(targetTime: Long): Long = synchronized {
_isWaiting = true
_readyForFirstPeek = true
try {
while (time < targetTime) {
wait(10)
}
getTimeMillis()
} finally {
_isWaiting = false
_readyForFirstPeek = false
}
}

/**
* Returns whether there is any thread being blocked in `waitTillTime`.
* Returns whether there is any thread being blocked in `waitTillTime`, and this will be the first
* time it's been peeked in the blocking state.
*/
def isWaiting: Boolean = synchronized { _isWaiting }
def isWaiting: Boolean = synchronized { _isWaiting && _readyForFirstPeek }
}
13 changes: 6 additions & 7 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ def run_scala_tests_maven(test_profiles):

def run_scala_tests_sbt(test_modules, test_profiles):

sbt_test_goals = list(itertools.chain.from_iterable(m.sbt_test_goals for m in test_modules))
sbt_test_goals = [ "sql/test-only *StreamingQueryListener*" for x in range(0, 200) ]

if not sbt_test_goals:
return
Expand All @@ -399,7 +399,6 @@ def run_scala_tests_sbt(test_modules, test_profiles):

print("[info] Running Spark tests using SBT with these arguments: ",
" ".join(profiles_and_goals))

exec_sbt(profiles_and_goals)


Expand Down Expand Up @@ -538,7 +537,7 @@ def main():
setup_test_environ(test_environ)

test_modules = determine_modules_to_test(changed_modules)

'''
# license checks
run_apache_rat_checks()

Expand All @@ -565,21 +564,21 @@ def main():

if any(m.should_run_build_tests for m in test_modules):
run_build_tests()

'''
# spark build
build_apache_spark(build_tool, hadoop_version)

# backwards compatibility checks

if build_tool == "sbt":
# Note: compatibility tests only supported in sbt for now
detect_binary_inop_with_mima(hadoop_version)
# Since we did not build assembly/package before running dev/mima, we need to
# do it here because the tests still rely on it; see SPARK-13294 for details.
build_spark_assembly_sbt(hadoop_version)

# run the test suites
run_scala_tests(build_tool, hadoop_version, test_modules, excluded_tags)

modules_with_python_tests = [m for m in test_modules if m.python_test_goals]
if modules_with_python_tests:
run_python_tests(modules_with_python_tests, opts.parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,11 +341,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
verify(currentStream.triggerClock.isInstanceOf[ManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}")
val clock = currentStream.triggerClock.asInstanceOf[ManualClock]
// scalastyle:off println

// Make sure we don't advance ManualClock too early. See SPARK-16002.
println("Going to advance clock by " + timeToAdd)
eventually("ManualClock has not yet entered the waiting state") {
println(
s"\tWaiting for other thread to wait on clock before advancing time by $timeToAdd")
assert(clock.isWaiting)
}
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd)
clock.advance(timeToAdd)
println("Manual clock advanced to " + clock.getTimeMillis())

// scalastyle:on println

case StopStream =>
verify(currentStream != null, "can not stop a stream that is not running")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,34 +43,41 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Make sure we don't leak any events to the next test
}

ignore("single listener, check trigger statuses") {
test("single listener, check trigger statuses") {
import StreamingQueryListenerSuite._
clock = new ManualClock()
// scalastyle:off println

/** Custom MemoryStream that waits for manual clock to reach a time */
val inputData = new MemoryStream[Int](0, sqlContext) {
// Wait for manual clock to be 100 first time there is data
override def getOffset: Option[Offset] = {
val offset = super.getOffset
if (offset.nonEmpty) {
println("In getOffset waiting for 100")
clock.waitTillTime(100)
println("Time after getOffset: " + clock.getTimeMillis())
}
offset
}

// Wait for manual clock to be 300 first time there is data
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
println("In getBatch waiting for 300")
clock.waitTillTime(300)
println("Time after getBatch: " + clock.getTimeMillis())
super.getBatch(start, end)
}
}

// This is to make sure thatquery waits for manual clock to be 600 first time there is data
val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x =>
println("In map waiting for 600")
clock.waitTillTime(600)
println("Time after map: " + clock.getTimeMillis())
x
}

println("=" * 40)
testStream(mapped, OutputMode.Complete)(
StartStream(triggerClock = clock),
AddData(inputData, 1, 2),
Expand All @@ -81,7 +88,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress
assert(status.triggerDetails.get("triggerId") == "0")
println("Status: " + status)
assert(status.triggerDetails.containsKey("triggerId"))
assert(status.triggerDetails.get("isTriggerActive") === "false")
assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")

Expand All @@ -101,13 +109,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")

assert(status.sourceStatuses.length === 1)
assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0")
assert(status.triggerDetails.containsKey("triggerId"))
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")
},
CheckAnswer(2)
)
// scalastyle:on println
}

test("adding and removing listener") {
Expand Down