Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion tokio/src/time/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,19 @@ cfg_test_util! {
}

pub(crate) fn advance(&self, duration: Duration) -> Result<(), &'static str> {
// Retrieve `far_future` before acquiring the mutex to prevent deadlock,
// as `Instant::far_future()` also acquires a mutex internally.
let far_future = Instant::far_future().into_std();
Comment on lines +288 to +290
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would ideally like to avoid acquiring the mutex twice every time just for supporting this one case.

This comment was marked as duplicate.

Copy link
Member Author

@ADD-SP ADD-SP Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inner.base = inner
    .base
    .checked_add(duration)
    .unwrap_or_else(|| inner.base + <30 years>);

To bypass the Mutex manually add 30 years on overflow, but this still panics after many times of time::advance(Duration::MAX), I don't have perfect idea yet.

Let me ask the issue author about the real (original) requirement, maybe there will be a better solution.

let mut inner = self.inner.lock();

if inner.unfrozen.is_some() {
return Err("time is not frozen");
}

inner.base += duration;
inner.base = inner
.base
.checked_add(duration)
.unwrap_or(far_future);
Comment on lines -294 to +300
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids the panic, but I wonder how correct this is. Does the timer still behave correctly after this call?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
// SAFETY: We hold the driver lock, and just removed the entry from any linked lists.
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list.push(waker);
if !waker_list.can_push() {
// Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
drop(lock);
waker_list.wake_all();
lock = self.inner.lock();
}
}
}

pub(crate) fn poll(&mut self, now: u64) -> Option<TimerHandle> {
loop {
if let Some(handle) = self.pending.pop_back() {
return Some(handle);
}
match self.next_expiration() {
Some(ref expiration) if expiration.deadline <= now => {
self.process_expiration(expiration);
self.set_elapsed(expiration.deadline);
}
_ => {
// in this case the poll did not indicate an expiration
// _and_ we were not able to find a next expiration in
// the current list of timers. advance to the poll's
// current time and do nothing else.
self.set_elapsed(now);
break;
}
}
}
self.pending.pop_back()
}

Good question, I read the code and I realized that the time driver keeps polling the wheel until wheel.elapsed == now.

So I think that even though the wheel rotates a lot of full revolution because of far_future, it still works correctly.

I also added more tests to ensure the correct behavior after a huge advance.

Ok(())
}

Expand Down
52 changes: 52 additions & 0 deletions tokio/tests/time_pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,58 @@ async fn exact_1ms_advance() {
assert_eq!(now.elapsed(), dur);
}

#[tokio::test(start_paused = true)]
async fn advance_duration_max() {
let now = Instant::now();

let dur = Duration::from_millis(1);
time::advance(dur).await;
assert_eq!(now.elapsed(), dur);

let now = Instant::now();
time::advance(Duration::MAX).await;
// magic number from `tokio::time::Instant::far_future()`
assert_eq!(now.elapsed(), Duration::from_secs(86400 * 365 * 30));

// then we need to test if the timer wheel is still working correctly

// hit the first level of wheel
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());
time::advance(Duration::from_millis(1)).await;
assert!(sleep.is_woken());

// hit the second level of wheel
let mut sleep = task::spawn(time::sleep(Duration::from_millis(65)));
assert_pending!(sleep.poll());
time::advance(Duration::from_millis(65)).await;
assert!(sleep.is_woken());

// hit the third level of wheel
let mut sleep = task::spawn(time::sleep(Duration::from_secs(5)));
assert_pending!(sleep.poll());
time::advance(Duration::from_secs(5)).await;
assert!(sleep.is_woken());

// hit the fourth level of wheel
let mut sleep = task::spawn(time::sleep(Duration::from_secs(60 * 5)));
assert_pending!(sleep.poll());
time::advance(Duration::from_secs(60 * 5)).await;
assert!(sleep.is_woken());

// hit the fifth level of wheel
let mut sleep = task::spawn(time::sleep(Duration::from_secs(60 * 60 * 5)));
assert_pending!(sleep.poll());
time::advance(Duration::from_secs(60 * 60 * 5)).await;
assert!(sleep.is_woken());

// hit the sixth level of wheel
let mut sleep = task::spawn(time::sleep(Duration::from_secs(60 * 60 * 24 * 13)));
assert_pending!(sleep.poll());
time::advance(Duration::from_secs(60 * 60 * 24 * 13)).await;
assert!(sleep.is_woken());
}

#[tokio::test(start_paused = true)]
async fn advance_once_with_timer() {
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
Expand Down
Loading