|
17 | 17 |
|
18 | 18 | package org.apache.spark.streaming |
19 | 19 |
|
20 | | -import java.util.concurrent.{TimeoutException, TimeUnit} |
| 20 | +import java.util.concurrent.TimeUnit |
21 | 21 | import java.util.concurrent.locks.ReentrantLock |
22 | | -import javax.annotation.concurrent.GuardedBy |
23 | 22 |
|
24 | 23 | private[streaming] class ContextWaiter { |
25 | 24 |
|
26 | 25 | private val lock = new ReentrantLock() |
27 | 26 | private val condition = lock.newCondition() |
28 | 27 |
|
29 | | - @GuardedBy("lock") |
| 28 | + // Guarded by "lock" |
30 | 29 | private var error: Throwable = null |
31 | 30 |
|
32 | | - @GuardedBy("lock") |
| 31 | + // Guarded by "lock" |
33 | 32 | private var stopped: Boolean = false |
34 | 33 |
|
35 | 34 | def notifyError(e: Throwable) = { |
@@ -60,29 +59,19 @@ private[streaming] class ContextWaiter { |
60 | 59 | lock.lock() |
61 | 60 | try { |
62 | 61 | if (timeout < 0) { |
63 | | - while (true) { |
64 | | - // If already stopped, then exit |
65 | | - if (stopped) return true |
66 | | - // If already had error, then throw it |
67 | | - if (error != null) throw error |
68 | | - |
| 62 | + while (!stopped && error == null) { |
69 | 63 | condition.await() |
70 | 64 | } |
71 | 65 | } else { |
72 | 66 | var nanos = TimeUnit.MILLISECONDS.toNanos(timeout) |
73 | | - while (true) { |
74 | | - // If already stopped, then exit |
75 | | - if (stopped) return true |
76 | | - // If already had error, then throw it |
77 | | - if (error != null) throw error |
78 | | - // If no time remains, then exit |
79 | | - if (nanos <= 0) return false |
80 | | - |
| 67 | + while (!stopped && error == null && nanos > 0) { |
81 | 68 | nanos = condition.awaitNanos(nanos) |
82 | 69 | } |
83 | 70 | } |
84 | | - // Never reached. Make the compiler happy. |
85 | | - true |
| 71 | + // If already had error, then throw it |
| 72 | + if (error != null) throw error |
| 73 | + // already stopped or timeout |
| 74 | + stopped |
86 | 75 | } finally { |
87 | 76 | lock.unlock() |
88 | 77 | } |
|
0 commit comments