Skip to content

Commit 5d918a3

Browse files
wathenjiangdjc
authored andcommitted
time: avoid traversing entries in the time wheel twice (#6584)
1 parent 320756c commit 5d918a3

File tree

5 files changed

+185
-190
lines changed

5 files changed

+185
-190
lines changed

tokio/src/runtime/time/entry.rs

+21-78
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
//!
2222
//! Each timer has a state field associated with it. This field contains either
2323
//! the current scheduled time, or a special flag value indicating its state.
24-
//! This state can either indicate that the timer is on the 'pending' queue (and
25-
//! thus will be fired with an `Ok(())` result soon) or that it has already been
26-
//! fired/deregistered.
24+
//! This state can either indicate that the timer is firing (and thus will be fired
25+
//! with an `Ok(())` result soon) or that it has already been fired/deregistered.
2726
//!
2827
//! This single state field allows for code that is firing the timer to
2928
//! synchronize with any racing `reset` calls reliably.
@@ -49,10 +48,10 @@
4948
//! There is of course a race condition between timer reset and timer
5049
//! expiration. If the driver fails to observe the updated expiration time, it
5150
//! could trigger expiration of the timer too early. However, because
52-
//! [`mark_pending`][mark_pending] performs a compare-and-swap, it will identify this race and
53-
//! refuse to mark the timer as pending.
51+
//! [`mark_firing`][mark_firing] performs a compare-and-swap, it will identify this race and
52+
//! refuse to mark the timer as firing.
5453
//!
55-
//! [mark_pending]: TimerHandle::mark_pending
54+
//! [mark_firing]: TimerHandle::mark_firing
5655
5756
use crate::loom::cell::UnsafeCell;
5857
use crate::loom::sync::atomic::AtomicU64;
@@ -70,9 +69,9 @@ use std::{marker::PhantomPinned, pin::Pin, ptr::NonNull};
7069

7170
type TimerResult = Result<(), crate::time::error::Error>;
7271

73-
const STATE_DEREGISTERED: u64 = u64::MAX;
74-
const STATE_PENDING_FIRE: u64 = STATE_DEREGISTERED - 1;
75-
const STATE_MIN_VALUE: u64 = STATE_PENDING_FIRE;
72+
pub(super) const STATE_DEREGISTERED: u64 = u64::MAX;
73+
const STATE_FIRING: u64 = STATE_DEREGISTERED - 1;
74+
const STATE_MIN_VALUE: u64 = STATE_FIRING;
7675
/// The largest safe integer to use for ticks.
7776
///
7877
/// This value should be updated if any other signal values are added above.
@@ -123,10 +122,6 @@ impl StateCell {
123122
}
124123
}
125124

126-
fn is_pending(&self) -> bool {
127-
self.state.load(Ordering::Relaxed) == STATE_PENDING_FIRE
128-
}
129-
130125
/// Returns the current expiration time, or None if not currently scheduled.
131126
fn when(&self) -> Option<u64> {
132127
let cur_state = self.state.load(Ordering::Relaxed);
@@ -162,26 +157,28 @@ impl StateCell {
162157
}
163158
}
164159

165-
/// Marks this timer as being moved to the pending list, if its scheduled
166-
/// time is not after `not_after`.
160+
/// Marks this timer firing, if its scheduled time is not after `not_after`.
167161
///
168162
/// If the timer is scheduled for a time after `not_after`, returns an Err
169163
/// containing the current scheduled time.
170164
///
171165
/// SAFETY: Must hold the driver lock.
172-
unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
166+
unsafe fn mark_firing(&self, not_after: u64) -> Result<(), u64> {
173167
// Quick initial debug check to see if the timer is already fired. Since
174168
// firing the timer can only happen with the driver lock held, we know
175169
// we shouldn't be able to "miss" a transition to a fired state, even
176170
// with relaxed ordering.
177171
let mut cur_state = self.state.load(Ordering::Relaxed);
178-
179172
loop {
173+
// Because its state is STATE_DEREGISTERED, it has been fired.
174+
if cur_state == STATE_DEREGISTERED {
175+
break Err(cur_state);
176+
}
180177
// improve the error message for things like
181178
// https://github.com/tokio-rs/tokio/issues/3675
182179
assert!(
183180
cur_state < STATE_MIN_VALUE,
184-
"mark_pending called when the timer entry is in an invalid state"
181+
"mark_firing called when the timer entry is in an invalid state"
185182
);
186183

187184
if cur_state > not_after {
@@ -190,7 +187,7 @@ impl StateCell {
190187

191188
match self.state.compare_exchange_weak(
192189
cur_state,
193-
STATE_PENDING_FIRE,
190+
STATE_FIRING,
194191
Ordering::AcqRel,
195192
Ordering::Acquire,
196193
) {
@@ -337,11 +334,6 @@ pub(crate) struct TimerShared {
337334
/// Only accessed under the entry lock.
338335
pointers: linked_list::Pointers<TimerShared>,
339336

340-
/// The expiration time for which this entry is currently registered.
341-
/// Generally owned by the driver, but is accessed by the entry when not
342-
/// registered.
343-
cached_when: AtomicU64,
344-
345337
/// Current state. This records whether the timer entry is currently under
346338
/// the ownership of the driver, and if not, its current state (not
347339
/// complete, fired, error, etc).
@@ -356,7 +348,6 @@ unsafe impl Sync for TimerShared {}
356348
impl std::fmt::Debug for TimerShared {
357349
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
358350
f.debug_struct("TimerShared")
359-
.field("cached_when", &self.cached_when.load(Ordering::Relaxed))
360351
.field("state", &self.state)
361352
.finish()
362353
}
@@ -374,40 +365,12 @@ impl TimerShared {
374365
pub(super) fn new(shard_id: u32) -> Self {
375366
Self {
376367
shard_id,
377-
cached_when: AtomicU64::new(0),
378368
pointers: linked_list::Pointers::new(),
379369
state: StateCell::default(),
380370
_p: PhantomPinned,
381371
}
382372
}
383373

384-
/// Gets the cached time-of-expiration value.
385-
pub(super) fn cached_when(&self) -> u64 {
386-
// Cached-when is only accessed under the driver lock, so we can use relaxed
387-
self.cached_when.load(Ordering::Relaxed)
388-
}
389-
390-
/// Gets the true time-of-expiration value, and copies it into the cached
391-
/// time-of-expiration value.
392-
///
393-
/// SAFETY: Must be called with the driver lock held, and when this entry is
394-
/// not in any timer wheel lists.
395-
pub(super) unsafe fn sync_when(&self) -> u64 {
396-
let true_when = self.true_when();
397-
398-
self.cached_when.store(true_when, Ordering::Relaxed);
399-
400-
true_when
401-
}
402-
403-
/// Sets the cached time-of-expiration value.
404-
///
405-
/// SAFETY: Must be called with the driver lock held, and when this entry is
406-
/// not in any timer wheel lists.
407-
unsafe fn set_cached_when(&self, when: u64) {
408-
self.cached_when.store(when, Ordering::Relaxed);
409-
}
410-
411374
/// Returns the true time-of-expiration value, with relaxed memory ordering.
412375
pub(super) fn true_when(&self) -> u64 {
413376
self.state.when().expect("Timer already fired")
@@ -420,7 +383,6 @@ impl TimerShared {
420383
/// in the timer wheel.
421384
pub(super) unsafe fn set_expiration(&self, t: u64) {
422385
self.state.set_expiration(t);
423-
self.cached_when.store(t, Ordering::Relaxed);
424386
}
425387

426388
/// Sets the true time-of-expiration only if it is after the current.
@@ -590,16 +552,8 @@ impl TimerEntry {
590552
}
591553

592554
impl TimerHandle {
593-
pub(super) unsafe fn cached_when(&self) -> u64 {
594-
unsafe { self.inner.as_ref().cached_when() }
595-
}
596-
597-
pub(super) unsafe fn sync_when(&self) -> u64 {
598-
unsafe { self.inner.as_ref().sync_when() }
599-
}
600-
601-
pub(super) unsafe fn is_pending(&self) -> bool {
602-
unsafe { self.inner.as_ref().state.is_pending() }
555+
pub(super) unsafe fn true_when(&self) -> u64 {
556+
unsafe { self.inner.as_ref().true_when() }
603557
}
604558

605559
/// Forcibly sets the true and cached expiration times to the given tick.
@@ -610,27 +564,16 @@ impl TimerHandle {
610564
self.inner.as_ref().set_expiration(tick);
611565
}
612566

613-
/// Attempts to mark this entry as pending. If the expiration time is after
567+
/// Attempts to mark this entry as firing. If the expiration time is after
614568
/// `not_after`, however, returns an Err with the current expiration time.
615569
///
616570
/// If an `Err` is returned, the `cached_when` value will be updated to this
617571
/// new expiration time.
618572
///
619573
/// SAFETY: The caller must ensure that the handle remains valid, the driver
620574
/// lock is held, and that the timer is not in any wheel linked lists.
621-
/// After returning Ok, the entry must be added to the pending list.
622-
pub(super) unsafe fn mark_pending(&self, not_after: u64) -> Result<(), u64> {
623-
match self.inner.as_ref().state.mark_pending(not_after) {
624-
Ok(()) => {
625-
// mark this as being on the pending queue in cached_when
626-
self.inner.as_ref().set_cached_when(u64::MAX);
627-
Ok(())
628-
}
629-
Err(tick) => {
630-
self.inner.as_ref().set_cached_when(tick);
631-
Err(tick)
632-
}
633-
}
575+
pub(super) unsafe fn mark_firing(&self, not_after: u64) -> Result<(), u64> {
576+
self.inner.as_ref().state.mark_firing(not_after)
634577
}
635578

636579
/// Attempts to transition to a terminal state. If the state is already a

tokio/src/runtime/time/mod.rs

+45-15
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
99
mod entry;
1010
pub(crate) use entry::TimerEntry;
11-
use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION};
11+
use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION, STATE_DEREGISTERED};
1212

1313
mod handle;
1414
pub(crate) use self::handle::Handle;
@@ -324,23 +324,53 @@ impl Handle {
324324
now = lock.elapsed();
325325
}
326326

327-
while let Some(entry) = lock.poll(now) {
328-
debug_assert!(unsafe { entry.is_pending() });
329-
330-
// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
331-
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
332-
waker_list.push(waker);
333-
334-
if !waker_list.can_push() {
335-
// Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
336-
drop(lock);
337-
338-
waker_list.wake_all();
339-
340-
lock = self.inner.lock_sharded_wheel(id);
327+
while let Some(expiration) = lock.poll(now) {
328+
lock.set_elapsed(expiration.deadline);
329+
// It is critical for `GuardedLinkedList` safety that the guard node is
330+
// pinned in memory and is not dropped until the guarded list is dropped.
331+
let guard = TimerShared::new(id);
332+
pin!(guard);
333+
let guard_handle = guard.as_ref().get_ref().handle();
334+
335+
// * This list will be still guarded by the lock of the Wheel with the specefied id.
336+
// `EntryWaitersList` wrapper makes sure we hold the lock to modify it.
337+
// * This wrapper will empty the list on drop. It is critical for safety
338+
// that we will not leave any list entry with a pointer to the local
339+
// guard node after this function returns / panics.
340+
// Safety: The `TimerShared` inside this `TimerHandle` is pinned in the memory.
341+
let mut list = unsafe { lock.get_waiters_list(&expiration, guard_handle, id, self) };
342+
343+
while let Some(entry) = list.pop_back_locked(&mut lock) {
344+
let deadline = expiration.deadline;
345+
// Try to expire the entry; this is cheap (doesn't synchronize) if
346+
// the timer is not expired, and updates cached_when.
347+
match unsafe { entry.mark_firing(deadline) } {
348+
Ok(()) => {
349+
// Entry was expired.
350+
// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
351+
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
352+
waker_list.push(waker);
353+
354+
if !waker_list.can_push() {
355+
// Wake a batch of wakers. To avoid deadlock,
356+
// we must do this with the lock temporarily dropped.
357+
drop(lock);
358+
waker_list.wake_all();
359+
360+
lock = self.inner.lock_sharded_wheel(id);
361+
}
362+
}
363+
}
364+
Err(state) if state == STATE_DEREGISTERED => {}
365+
Err(state) => {
366+
// Safety: This Entry has not expired.
367+
unsafe { lock.reinsert_entry(entry, deadline, state) };
368+
}
341369
}
342370
}
371+
lock.occupied_bit_maintain(&expiration);
343372
}
373+
344374
let next_wake_up = lock.poll_at();
345375
drop(lock);
346376

tokio/src/runtime/time/wheel/level.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ pub(crate) struct Level {
2020
}
2121

2222
/// Indicates when a slot must be processed next.
23-
#[derive(Debug)]
2423
pub(crate) struct Expiration {
2524
/// The level containing the slot.
2625
pub(crate) level: usize,
@@ -81,7 +80,7 @@ impl Level {
8180
// pseudo-ring buffer, and we rotate around them indefinitely. If we
8281
// compute a deadline before now, and it's the top level, it
8382
// therefore means we're actually looking at a slot in the future.
84-
debug_assert_eq!(self.level, super::NUM_LEVELS - 1);
83+
debug_assert_eq!(self.level, super::MAX_LEVEL_INDEX);
8584

8685
deadline += level_range;
8786
}
@@ -120,31 +119,34 @@ impl Level {
120119
}
121120

122121
pub(crate) unsafe fn add_entry(&mut self, item: TimerHandle) {
123-
let slot = slot_for(item.cached_when(), self.level);
122+
let slot = slot_for(item.true_when(), self.level);
124123

125124
self.slot[slot].push_front(item);
126125

127126
self.occupied |= occupied_bit(slot);
128127
}
129128

130129
pub(crate) unsafe fn remove_entry(&mut self, item: NonNull<TimerShared>) {
131-
let slot = slot_for(unsafe { item.as_ref().cached_when() }, self.level);
130+
let slot = slot_for(unsafe { item.as_ref().true_when() }, self.level);
132131

133132
unsafe { self.slot[slot].remove(item) };
134133
if self.slot[slot].is_empty() {
135-
// The bit is currently set
136-
debug_assert!(self.occupied & occupied_bit(slot) != 0);
137-
138134
// Unset the bit
139135
self.occupied ^= occupied_bit(slot);
140136
}
141137
}
142138

143-
pub(crate) fn take_slot(&mut self, slot: usize) -> EntryList {
144-
self.occupied &= !occupied_bit(slot);
145-
139+
pub(super) fn take_slot(&mut self, slot: usize) -> EntryList {
146140
std::mem::take(&mut self.slot[slot])
147141
}
142+
143+
pub(super) fn occupied_bit_maintain(&mut self, slot: usize) {
144+
if self.slot[slot].is_empty() {
145+
self.occupied &= !occupied_bit(slot);
146+
} else {
147+
self.occupied |= occupied_bit(slot);
148+
}
149+
}
148150
}
149151

150152
impl fmt::Debug for Level {

0 commit comments

Comments
 (0)