@@ -28,61 +28,62 @@ import kotlin.concurrent.withLock
28
28
* writing.
29
29
*
30
30
* Subclasses should override [timedOut] to take action when a timeout occurs. This method will be
31
- * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise
31
+ * invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise,
32
32
* we risk starving other timeouts from being triggered.
33
33
*
34
34
* Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the
35
35
* timeout to each operation on the wrapped stream.
36
36
*
37
- * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards .
37
+ * Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward .
38
38
* The return value of [exit] indicates whether a timeout was triggered. Note that the call to
39
39
* [timedOut] is asynchronous, and may be called after [exit].
40
40
*/
41
41
open class AsyncTimeout : Timeout () {
42
- /* * True if this node is currently in the queue. */
43
- private var inQueue = false
42
+ private var state = STATE_IDLE
44
43
45
44
/* * The next node in the linked list. */
46
45
private var next: AsyncTimeout ? = null
47
46
48
47
/* * If scheduled, this is the time that the watchdog should time this out. */
49
48
private var timeoutAt = 0L
50
49
51
- private var isCanceled = false
52
- private var hadTimeoutWhenCanceled = false
53
-
54
50
fun enter () {
55
51
val timeoutNanos = timeoutNanos()
56
52
val hasDeadline = hasDeadline()
57
53
if (timeoutNanos == 0L && ! hasDeadline) {
58
54
return // No timeout and no deadline? Don't bother with the queue.
59
55
}
60
- scheduleTimeout(this , timeoutNanos, hasDeadline)
56
+
57
+ lock.withLock {
58
+ check(state == STATE_IDLE ) { " Unbalanced enter/exit" }
59
+ state = STATE_IN_QUEUE
60
+ insertIntoQueue(this , timeoutNanos, hasDeadline)
61
+ }
61
62
}
62
63
63
64
/* * Returns true if the timeout occurred. */
64
65
fun exit (): Boolean {
65
66
lock.withLock {
66
- if (isCanceled) {
67
- return hadTimeoutWhenCanceled
68
- .also {
69
- isCanceled = false
70
- hadTimeoutWhenCanceled = false
71
- }
67
+ val oldState = this .state
68
+ state = STATE_IDLE
69
+
70
+ if (oldState == STATE_IN_QUEUE ) {
71
+ removeFromQueue(this )
72
+ return false
73
+ } else {
74
+ return oldState == STATE_TIMED_OUT
72
75
}
73
-
74
- return cancelScheduledTimeout(this )
75
76
}
76
77
}
77
78
78
79
override fun cancel () {
79
80
super .cancel()
80
81
81
82
lock.withLock {
82
- if (isCanceled) return
83
- if ( ! inQueue) return
84
- isCanceled = true
85
- hadTimeoutWhenCanceled = cancelScheduledTimeout( this )
83
+ if (state == STATE_IN_QUEUE ) {
84
+ removeFromQueue( this )
85
+ state = STATE_CANCELED
86
+ }
86
87
}
87
88
}
88
89
@@ -197,16 +198,16 @@ open class AsyncTimeout : Timeout() {
197
198
return e
198
199
}
199
200
200
- private class Watchdog internal constructor() : Thread(" Okio Watchdog" ) {
201
+ private class Watchdog : Thread (" Okio Watchdog" ) {
201
202
init {
202
203
isDaemon = true
203
204
}
204
205
205
206
override fun run () {
206
207
while (true ) {
207
208
try {
208
- var timedOut: AsyncTimeout ? = null
209
- AsyncTimeout . lock.withLock {
209
+ var timedOut: AsyncTimeout ?
210
+ lock.withLock {
210
211
timedOut = awaitTimeout()
211
212
212
213
// The queue is completely empty. Let this thread exit and let another watchdog thread
@@ -225,7 +226,7 @@ open class AsyncTimeout : Timeout() {
225
226
}
226
227
}
227
228
228
- companion object {
229
+ private companion object {
229
230
val lock: ReentrantLock = ReentrantLock ()
230
231
val condition: Condition = lock.newCondition()
231
232
@@ -240,6 +241,43 @@ open class AsyncTimeout : Timeout() {
240
241
private val IDLE_TIMEOUT_MILLIS = TimeUnit .SECONDS .toMillis(60 )
241
242
private val IDLE_TIMEOUT_NANOS = TimeUnit .MILLISECONDS .toNanos(IDLE_TIMEOUT_MILLIS )
242
243
244
+ /*
245
+ * .-------------.
246
+ * | |
247
+ * .------------ exit() ------| CANCELED |
248
+ * | | |
249
+ * | '-------------'
250
+ * | ^
251
+ * | | cancel()
252
+ * v |
253
+ * .-------------. .-------------.
254
+ * | |---- enter() ----->| |
255
+ * | IDLE | | IN QUEUE |
256
+ * | |<---- exit() ------| |
257
+ * '-------------' '-------------'
258
+ * ^ |
259
+ * | | time out
260
+ * | v
261
+ * | .-------------.
262
+ * | | |
263
+ * '------------ exit() ------| TIMED OUT |
264
+ * | |
265
+ * '-------------'
266
+ *
267
+ * Notes:
268
+ * * enter() crashes if called from a state other than IDLE.
269
+ * * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to
270
+ * track entered but not in the queue.
271
+ * * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for
272
+ * early implementations to support cases where enter() as a no-op.
273
+ * * cancel() is a no-op from every state but IN QUEUE.
274
+ */
275
+
276
+ private const val STATE_IDLE = 0
277
+ private const val STATE_IN_QUEUE = 1
278
+ private const val STATE_TIMED_OUT = 2
279
+ private const val STATE_CANCELED = 3
280
+
243
281
/* *
244
282
* The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
245
283
* triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
@@ -250,77 +288,67 @@ open class AsyncTimeout : Timeout() {
250
288
*/
251
289
private var head: AsyncTimeout ? = null
252
290
253
- private fun scheduleTimeout (node : AsyncTimeout , timeoutNanos : Long , hasDeadline : Boolean ) {
254
- AsyncTimeout .lock.withLock {
255
- check(! node.inQueue) { " Unbalanced enter/exit" }
256
- node.inQueue = true
257
-
258
- // Start the watchdog thread and create the head node when the first timeout is scheduled.
259
- if (head == null ) {
260
- head = AsyncTimeout ()
261
- Watchdog ().start()
262
- }
291
+ private fun insertIntoQueue (node : AsyncTimeout , timeoutNanos : Long , hasDeadline : Boolean ) {
292
+ // Start the watchdog thread and create the head node when the first timeout is scheduled.
293
+ if (head == null ) {
294
+ head = AsyncTimeout ()
295
+ Watchdog ().start()
296
+ }
263
297
264
- val now = System .nanoTime()
265
- if (timeoutNanos != 0L && hasDeadline) {
266
- // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
267
- // around, minOf() is undefined for absolute values, but meaningful for relative ones.
268
- node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
269
- } else if (timeoutNanos != 0L ) {
270
- node.timeoutAt = now + timeoutNanos
271
- } else if (hasDeadline) {
272
- node.timeoutAt = node.deadlineNanoTime()
273
- } else {
274
- throw AssertionError ()
275
- }
298
+ val now = System .nanoTime()
299
+ if (timeoutNanos != 0L && hasDeadline) {
300
+ // Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
301
+ // around, minOf() is undefined for absolute values, but meaningful for relative ones.
302
+ node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
303
+ } else if (timeoutNanos != 0L ) {
304
+ node.timeoutAt = now + timeoutNanos
305
+ } else if (hasDeadline) {
306
+ node.timeoutAt = node.deadlineNanoTime()
307
+ } else {
308
+ throw AssertionError ()
309
+ }
276
310
277
- // Insert the node in sorted order.
278
- val remainingNanos = node.remainingNanos(now)
279
- var prev = head!!
280
- while (true ) {
281
- if (prev.next == null || remainingNanos < prev.next!! .remainingNanos(now)) {
282
- node.next = prev.next
283
- prev.next = node
284
- if (prev == = head) {
285
- // Wake up the watchdog when inserting at the front.
286
- condition.signal()
287
- }
288
- break
311
+ // Insert the node in sorted order.
312
+ val remainingNanos = node.remainingNanos(now)
313
+ var prev = head!!
314
+ while (true ) {
315
+ if (prev.next == null || remainingNanos < prev.next!! .remainingNanos(now)) {
316
+ node.next = prev.next
317
+ prev.next = node
318
+ if (prev == = head) {
319
+ // Wake up the watchdog when inserting at the front.
320
+ condition.signal()
289
321
}
290
- prev = prev.next !!
322
+ break
291
323
}
324
+ prev = prev.next!!
292
325
}
293
326
}
294
327
295
328
/* * Returns true if the timeout occurred. */
296
- private fun cancelScheduledTimeout (node : AsyncTimeout ): Boolean {
297
- if (! node.inQueue) return false
298
- node.inQueue = false
299
-
300
- // Remove the node from the linked list.
329
+ private fun removeFromQueue (node : AsyncTimeout ) {
301
330
var prev = head
302
331
while (prev != null ) {
303
332
if (prev.next == = node) {
304
333
prev.next = node.next
305
334
node.next = null
306
- return false
335
+ return
307
336
}
308
337
prev = prev.next
309
338
}
310
339
311
- // The node wasn't found in the linked list: it must have timed out!
312
- return true
340
+ error(" node was not found in the queue" )
313
341
}
314
342
315
343
/* *
316
344
* Removes and returns the node at the head of the list, waiting for it to time out if
317
345
* necessary. This returns [head] if there was no node at the head of the list when starting,
318
346
* and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a
319
- * new node was inserted while waiting. Otherwise this returns the node being waited on that has
320
- * been removed.
347
+ * new node was inserted while waiting. Otherwise, this returns the node being waited on that
348
+ * has been removed.
321
349
*/
322
350
@Throws(InterruptedException ::class )
323
- internal fun awaitTimeout (): AsyncTimeout ? {
351
+ fun awaitTimeout (): AsyncTimeout ? {
324
352
// Get the next eligible node.
325
353
val node = head!! .next
326
354
@@ -335,7 +363,7 @@ open class AsyncTimeout : Timeout() {
335
363
}
336
364
}
337
365
338
- var waitNanos = node.remainingNanos(System .nanoTime())
366
+ val waitNanos = node.remainingNanos(System .nanoTime())
339
367
340
368
// The head of the queue hasn't timed out yet. Await that.
341
369
if (waitNanos > 0 ) {
@@ -346,6 +374,7 @@ open class AsyncTimeout : Timeout() {
346
374
// The head of the queue has timed out. Remove it.
347
375
head!! .next = node.next
348
376
node.next = null
377
+ node.state = STATE_TIMED_OUT
349
378
return node
350
379
}
351
380
}
0 commit comments