Skip to content

Commit e06bd4f

Browse files
committed
Fix the issue that ContextWaiter didn't handle 'spurious wakeup'
1 parent 9bd9334 commit e06bd4f

File tree

1 file changed

+59
-15
lines changed

1 file changed

+59
-15
lines changed

streaming/src/main/scala/org/apache/spark/streaming/ContextWaiter.scala

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,74 @@
1717

1818
package org.apache.spark.streaming
1919

20+
import java.util.concurrent.{TimeoutException, TimeUnit}
21+
import java.util.concurrent.locks.ReentrantLock
22+
import javax.annotation.concurrent.GuardedBy
23+
2024
private[streaming] class ContextWaiter {
25+
26+
private val lock = new ReentrantLock()
27+
private val condition = lock.newCondition()
28+
29+
@GuardedBy("lock")
2130
private var error: Throwable = null
31+
32+
@GuardedBy("lock")
2233
private var stopped: Boolean = false
2334

24-
def notifyError(e: Throwable) = synchronized {
25-
error = e
26-
notifyAll()
35+
def notifyError(e: Throwable) = {
36+
lock.lock()
37+
try {
38+
error = e
39+
condition.signalAll()
40+
} finally {
41+
lock.unlock()
42+
}
2743
}
2844

29-
def notifyStop() = synchronized {
30-
stopped = true
31-
notifyAll()
45+
def notifyStop() = {
46+
lock.lock()
47+
try {
48+
stopped = true
49+
condition.signalAll()
50+
} finally {
51+
lock.unlock()
52+
}
3253
}
3354

34-
def waitForStopOrError(timeout: Long = -1) = synchronized {
35-
// If already had error, then throw it
36-
if (error != null) {
37-
throw error
38-
}
55+
/**
56+
* Return `true` if it's stopped; or throw the reported error if `notifyError` has been called; or
57+
* `false` if the waiting time detectably elapsed before return from the method.
58+
*/
59+
def waitForStopOrError(timeout: Long = -1): Boolean = {
60+
lock.lock()
61+
try {
62+
if (timeout < 0) {
63+
while (true) {
64+
// If already stopped, then exit
65+
if (stopped) return true
66+
// If already had error, then throw it
67+
if (error != null) throw error
68+
69+
condition.await()
70+
}
71+
} else {
72+
var nanos = TimeUnit.MILLISECONDS.toNanos(timeout)
73+
while (true) {
74+
// If already stopped, then exit
75+
if (stopped) return true
76+
// If already had error, then throw it
77+
if (error != null) throw error
78+
// If no time remains, then exit
79+
if (nanos <= 0) return false
3980

40-
// If not already stopped, then wait
41-
if (!stopped) {
42-
if (timeout < 0) wait() else wait(timeout)
43-
if (error != null) throw error
81+
nanos = condition.awaitNanos(nanos)
82+
}
83+
}
84+
// Never reached. Make the compiler happy.
85+
true
86+
} finally {
87+
lock.unlock()
4488
}
4589
}
4690
}

0 commit comments

Comments
 (0)