Skip to content

Commit edc96d8

Browse files
zsxwingtdas
authored andcommitted
[SPARK-4813][Streaming] Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
Used `Condition` to rewrite `ContextWaiter` because it provides a convenient API `awaitNanos` for timeout. Author: zsxwing <[email protected]> Closes #3661 from zsxwing/SPARK-4813 and squashes the following commits: 52247f5 [zsxwing] Add explicit unit type be42bcf [zsxwing] Update as per review suggestion e06bd4f [zsxwing] Fix the issue that ContextWaiter didn't handle 'spurious wakeup' (cherry picked from commit 6a89782) Signed-off-by: Tathagata Das <[email protected]>
1 parent 7a24541 commit edc96d8

File tree

1 file changed

+48
-15
lines changed

1 file changed

+48
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,63 @@
1717

1818
package org.apache.spark.streaming
1919

20+
import java.util.concurrent.TimeUnit
21+
import java.util.concurrent.locks.ReentrantLock
22+
2023
private[streaming] class ContextWaiter {
24+
25+
private val lock = new ReentrantLock()
26+
private val condition = lock.newCondition()
27+
28+
// Guarded by "lock"
2129
private var error: Throwable = null
22-
private var stopped: Boolean = false
2330

24-
def notifyError(e: Throwable) = synchronized {
25-
error = e
26-
notifyAll()
27-
}
31+
// Guarded by "lock"
32+
private var stopped: Boolean = false
2833

29-
def notifyStop() = synchronized {
30-
stopped = true
31-
notifyAll()
34+
def notifyError(e: Throwable): Unit = {
35+
lock.lock()
36+
try {
37+
error = e
38+
condition.signalAll()
39+
} finally {
40+
lock.unlock()
41+
}
3242
}
3343

34-
def waitForStopOrError(timeout: Long = -1) = synchronized {
35-
// If already had error, then throw it
36-
if (error != null) {
37-
throw error
44+
def notifyStop(): Unit = {
45+
lock.lock()
46+
try {
47+
stopped = true
48+
condition.signalAll()
49+
} finally {
50+
lock.unlock()
3851
}
52+
}
3953

40-
// If not already stopped, then wait
41-
if (!stopped) {
42-
if (timeout < 0) wait() else wait(timeout)
54+
/**
55+
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
56+
* `false` if the waiting time detectably elapsed before return from the method.
57+
*/
58+
def waitForStopOrError(timeout: Long = -1): Boolean = {
59+
lock.lock()
60+
try {
61+
if (timeout < 0) {
62+
while (!stopped && error == null) {
63+
condition.await()
64+
}
65+
} else {
66+
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
67+
while (!stopped && error == null && nanos > 0) {
68+
nanos = condition.awaitNanos(nanos)
69+
}
70+
}
71+
// If already had error, then throw it
4372
if (error != null) throw error
73+
// already stopped or timeout
74+
stopped
75+
} finally {
76+
lock.unlock()
4477
}
4578
}
4679
}

0 commit comments

Comments
 (0)