Skip to content

Commit d3d1ad6

Browse files
committed
Use a state machine to implement AsyncTimeout
Previously we had 3 boolean state variables to track 4 possible states. That was more complex than necessary.
1 parent 45d6834 commit d3d1ad6

File tree

1 file changed

+100
-71
lines changed

1 file changed

+100
-71
lines changed

okio/src/jvmMain/kotlin/okio/AsyncTimeout.kt

+100-71
Original file line numberDiff line numberDiff line change
@@ -28,61 +28,62 @@ import kotlin.concurrent.withLock
2828
* writing.
2929
*
3030
* Subclasses should override [timedOut] to take action when a timeout occurs. This method will be
31-
* invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise
31+
* invoked by the shared watchdog thread so it should not do any long-running operations. Otherwise,
3232
* we risk starving other timeouts from being triggered.
3333
*
3434
* Use [sink] and [source] to apply this timeout to a stream. The returned value will apply the
3535
* timeout to each operation on the wrapped stream.
3636
*
37-
* Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterwards.
37+
* Callers should call [enter] before doing work that is subject to timeouts, and [exit] afterward.
3838
* The return value of [exit] indicates whether a timeout was triggered. Note that the call to
3939
* [timedOut] is asynchronous, and may be called after [exit].
4040
*/
4141
open class AsyncTimeout : Timeout() {
42-
/** True if this node is currently in the queue. */
43-
private var inQueue = false
42+
private var state = STATE_IDLE
4443

4544
/** The next node in the linked list. */
4645
private var next: AsyncTimeout? = null
4746

4847
/** If scheduled, this is the time that the watchdog should time this out. */
4948
private var timeoutAt = 0L
5049

51-
private var isCanceled = false
52-
private var hadTimeoutWhenCanceled = false
53-
5450
fun enter() {
5551
val timeoutNanos = timeoutNanos()
5652
val hasDeadline = hasDeadline()
5753
if (timeoutNanos == 0L && !hasDeadline) {
5854
return // No timeout and no deadline? Don't bother with the queue.
5955
}
60-
scheduleTimeout(this, timeoutNanos, hasDeadline)
56+
57+
lock.withLock {
58+
check(state == STATE_IDLE) { "Unbalanced enter/exit" }
59+
state = STATE_IN_QUEUE
60+
insertIntoQueue(this, timeoutNanos, hasDeadline)
61+
}
6162
}
6263

6364
/** Returns true if the timeout occurred. */
6465
fun exit(): Boolean {
6566
lock.withLock {
66-
if (isCanceled) {
67-
return hadTimeoutWhenCanceled
68-
.also {
69-
isCanceled = false
70-
hadTimeoutWhenCanceled = false
71-
}
67+
val oldState = this.state
68+
state = STATE_IDLE
69+
70+
if (oldState == STATE_IN_QUEUE) {
71+
removeFromQueue(this)
72+
return false
73+
} else {
74+
return oldState == STATE_TIMED_OUT
7275
}
73-
74-
return cancelScheduledTimeout(this)
7576
}
7677
}
7778

7879
override fun cancel() {
7980
super.cancel()
8081

8182
lock.withLock {
82-
if (isCanceled) return
83-
if (!inQueue) return
84-
isCanceled = true
85-
hadTimeoutWhenCanceled = cancelScheduledTimeout(this)
83+
if (state == STATE_IN_QUEUE) {
84+
removeFromQueue(this)
85+
state = STATE_CANCELED
86+
}
8687
}
8788
}
8889

@@ -197,16 +198,16 @@ open class AsyncTimeout : Timeout() {
197198
return e
198199
}
199200

200-
private class Watchdog internal constructor() : Thread("Okio Watchdog") {
201+
private class Watchdog : Thread("Okio Watchdog") {
201202
init {
202203
isDaemon = true
203204
}
204205

205206
override fun run() {
206207
while (true) {
207208
try {
208-
var timedOut: AsyncTimeout? = null
209-
AsyncTimeout.lock.withLock {
209+
var timedOut: AsyncTimeout?
210+
lock.withLock {
210211
timedOut = awaitTimeout()
211212

212213
// The queue is completely empty. Let this thread exit and let another watchdog thread
@@ -225,7 +226,7 @@ open class AsyncTimeout : Timeout() {
225226
}
226227
}
227228

228-
companion object {
229+
private companion object {
229230
val lock: ReentrantLock = ReentrantLock()
230231
val condition: Condition = lock.newCondition()
231232

@@ -240,6 +241,43 @@ open class AsyncTimeout : Timeout() {
240241
private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
241242
private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)
242243

244+
/*
245+
* .-------------.
246+
* | |
247+
* .------------ exit() ------| CANCELED |
248+
* | | |
249+
* | '-------------'
250+
* | ^
251+
* | | cancel()
252+
* v |
253+
* .-------------. .-------------.
254+
* | |---- enter() ----->| |
255+
* | IDLE | | IN QUEUE |
256+
* | |<---- exit() ------| |
257+
* '-------------' '-------------'
258+
* ^ |
259+
* | | time out
260+
* | v
261+
* | .-------------.
262+
* | | |
263+
* '------------ exit() ------| TIMED OUT |
264+
* | |
265+
* '-------------'
266+
*
267+
* Notes:
268+
* * enter() crashes if called from a state other than IDLE.
269+
* * If there's no timeout (ie. wait forever), then enter() is a no-op. There's no state to
270+
* track entered but not in the queue.
271+
* * exit() is a no-op from IDLE. This is probably too lenient, but it made it simpler for
272+
* early implementations to support cases where enter() as a no-op.
273+
* * cancel() is a no-op from every state but IN QUEUE.
274+
*/
275+
276+
const val STATE_IDLE = 0
277+
const val STATE_IN_QUEUE = 1
278+
const val STATE_TIMED_OUT = 2
279+
const val STATE_CANCELED = 3
280+
243281
/**
244282
* The watchdog thread processes a linked list of pending timeouts, sorted in the order to be
245283
* triggered. This class synchronizes on AsyncTimeout.class. This lock guards the queue.
@@ -250,77 +288,67 @@ open class AsyncTimeout : Timeout() {
250288
*/
251289
private var head: AsyncTimeout? = null
252290

253-
private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
254-
AsyncTimeout.lock.withLock {
255-
check(!node.inQueue) { "Unbalanced enter/exit" }
256-
node.inQueue = true
257-
258-
// Start the watchdog thread and create the head node when the first timeout is scheduled.
259-
if (head == null) {
260-
head = AsyncTimeout()
261-
Watchdog().start()
262-
}
291+
private fun insertIntoQueue(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
292+
// Start the watchdog thread and create the head node when the first timeout is scheduled.
293+
if (head == null) {
294+
head = AsyncTimeout()
295+
Watchdog().start()
296+
}
263297

264-
val now = System.nanoTime()
265-
if (timeoutNanos != 0L && hasDeadline) {
266-
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
267-
// around, minOf() is undefined for absolute values, but meaningful for relative ones.
268-
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
269-
} else if (timeoutNanos != 0L) {
270-
node.timeoutAt = now + timeoutNanos
271-
} else if (hasDeadline) {
272-
node.timeoutAt = node.deadlineNanoTime()
273-
} else {
274-
throw AssertionError()
275-
}
298+
val now = System.nanoTime()
299+
if (timeoutNanos != 0L && hasDeadline) {
300+
// Compute the earliest event; either timeout or deadline. Because nanoTime can wrap
301+
// around, minOf() is undefined for absolute values, but meaningful for relative ones.
302+
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
303+
} else if (timeoutNanos != 0L) {
304+
node.timeoutAt = now + timeoutNanos
305+
} else if (hasDeadline) {
306+
node.timeoutAt = node.deadlineNanoTime()
307+
} else {
308+
throw AssertionError()
309+
}
276310

277-
// Insert the node in sorted order.
278-
val remainingNanos = node.remainingNanos(now)
279-
var prev = head!!
280-
while (true) {
281-
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
282-
node.next = prev.next
283-
prev.next = node
284-
if (prev === head) {
285-
// Wake up the watchdog when inserting at the front.
286-
condition.signal()
287-
}
288-
break
311+
// Insert the node in sorted order.
312+
val remainingNanos = node.remainingNanos(now)
313+
var prev = head!!
314+
while (true) {
315+
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
316+
node.next = prev.next
317+
prev.next = node
318+
if (prev === head) {
319+
// Wake up the watchdog when inserting at the front.
320+
condition.signal()
289321
}
290-
prev = prev.next!!
322+
break
291323
}
324+
prev = prev.next!!
292325
}
293326
}
294327

295328
/** Returns true if the timeout occurred. */
296-
private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
297-
if (!node.inQueue) return false
298-
node.inQueue = false
299-
300-
// Remove the node from the linked list.
329+
private fun removeFromQueue(node: AsyncTimeout) {
301330
var prev = head
302331
while (prev != null) {
303332
if (prev.next === node) {
304333
prev.next = node.next
305334
node.next = null
306-
return false
335+
return
307336
}
308337
prev = prev.next
309338
}
310339

311-
// The node wasn't found in the linked list: it must have timed out!
312-
return true
340+
error("node was not found in the queue")
313341
}
314342

315343
/**
316344
* Removes and returns the node at the head of the list, waiting for it to time out if
317345
* necessary. This returns [head] if there was no node at the head of the list when starting,
318346
* and there continues to be no node after waiting [IDLE_TIMEOUT_NANOS]. It returns null if a
319-
* new node was inserted while waiting. Otherwise this returns the node being waited on that has
320-
* been removed.
347+
* new node was inserted while waiting. Otherwise, this returns the node being waited on that
348+
* has been removed.
321349
*/
322350
@Throws(InterruptedException::class)
323-
internal fun awaitTimeout(): AsyncTimeout? {
351+
fun awaitTimeout(): AsyncTimeout? {
324352
// Get the next eligible node.
325353
val node = head!!.next
326354

@@ -335,7 +363,7 @@ open class AsyncTimeout : Timeout() {
335363
}
336364
}
337365

338-
var waitNanos = node.remainingNanos(System.nanoTime())
366+
val waitNanos = node.remainingNanos(System.nanoTime())
339367

340368
// The head of the queue hasn't timed out yet. Await that.
341369
if (waitNanos > 0) {
@@ -346,6 +374,7 @@ open class AsyncTimeout : Timeout() {
346374
// The head of the queue has timed out. Remove it.
347375
head!!.next = node.next
348376
node.next = null
377+
node.state = STATE_TIMED_OUT
349378
return node
350379
}
351380
}

0 commit comments

Comments
 (0)