Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: eliminate timer wheel allocations #6779

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions tokio/src/loom/mocked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub(crate) mod sync {
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
self.0.try_lock().ok()
}

#[inline]
pub(crate) fn get_mut(&mut self) -> &mut T {
self.0.get_mut().unwrap()
}
}
pub(crate) use loom::sync::*;

Expand Down
8 changes: 8 additions & 0 deletions tokio/src/loom/std/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ impl<T> Mutex<T> {
Err(TryLockError::WouldBlock) => None,
}
}

#[inline]
pub(crate) fn get_mut(&mut self) -> &mut T {
match self.0.get_mut() {
Ok(val) => val,
Err(p_err) => p_err.into_inner(),
}
}
}
118 changes: 84 additions & 34 deletions tokio/src/runtime/time/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::util::WakeList;

use crate::loom::sync::atomic::AtomicU64;
use std::fmt;
use std::sync::RwLock;
use std::{num::NonZeroU64, ptr::NonNull};

struct AtomicOptionNonZeroU64(AtomicU64);
Expand Down Expand Up @@ -115,7 +116,7 @@ struct Inner {
next_wake: AtomicOptionNonZeroU64,

/// Sharded Timer wheels.
wheels: Box<[Mutex<wheel::Wheel>]>,
wheels: RwLock<ShardedWheel>,

/// True if the driver is being shutdown.
pub(super) is_shutdown: AtomicBool,
Expand All @@ -130,6 +131,9 @@ struct Inner {
did_wake: AtomicBool,
}

/// Wrapper around the sharded timer wheels.
struct ShardedWheel(Box<[Mutex<wheel::Wheel>]>);

// ===== impl Driver =====

impl Driver {
Expand All @@ -149,7 +153,7 @@ impl Driver {
time_source,
inner: Inner {
next_wake: AtomicOptionNonZeroU64::new(None),
wheels: wheels.into_boxed_slice(),
wheels: RwLock::new(ShardedWheel(wheels.into_boxed_slice())),
is_shutdown: AtomicBool::new(false),
#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
Expand Down Expand Up @@ -190,23 +194,28 @@ impl Driver {
assert!(!handle.is_shutdown());

// Finds out the min expiration time to park.
let locks = (0..rt_handle.time().inner.get_shard_size())
.map(|id| rt_handle.time().inner.lock_sharded_wheel(id))
.collect::<Vec<_>>();

let expiration_time = locks
.iter()
.filter_map(|lock| lock.next_expiration_time())
.min();

rt_handle
.time()
.inner
.next_wake
.store(next_wake_time(expiration_time));

// Safety: After updating the `next_wake`, we drop all the locks.
drop(locks);
let expiration_time = {
let mut wheels_lock = rt_handle
.time()
.inner
.wheels
.write()
.expect("Timer wheel shards poisened");
let expiration_time = (0..wheels_lock.get_shard_size())
.filter_map(|id| {
let wheel = wheels_lock.get_sharded_wheel(id);
wheel.next_expiration_time()
})
.min();

rt_handle
.time()
.inner
.next_wake
.store(next_wake_time(expiration_time));

expiration_time
};

match expiration_time {
Some(when) => {
Expand Down Expand Up @@ -312,7 +321,12 @@ impl Handle {
// Returns the next wakeup time of this shard.
pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option<u64> {
let mut waker_list = WakeList::new();
let mut lock = self.inner.lock_sharded_wheel(id);
let mut wheels_lock = self
.inner
.wheels
.read()
.expect("Timer wheel shards poisened");
tglane marked this conversation as resolved.
Show resolved Hide resolved
let mut lock = wheels_lock.lock_sharded_wheel(id);

if now < lock.elapsed() {
// Time went backwards! This normally shouldn't happen as the Rust language
Expand All @@ -334,10 +348,16 @@ impl Handle {
if !waker_list.can_push() {
// Wake a batch of wakers. To avoid deadlock, we must do this with the lock temporarily dropped.
drop(lock);
drop(wheels_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also need to drop wheels_lock after the loop, since there's also a call to wake_all there.

Copy link
Contributor Author

@tglane tglane Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Missed that... Implemented in 1610c12.


waker_list.wake_all();

lock = self.inner.lock_sharded_wheel(id);
wheels_lock = self
.inner
.wheels
.read()
.expect("Timer wheel shards poisened");
tglane marked this conversation as resolved.
Show resolved Hide resolved
lock = wheels_lock.lock_sharded_wheel(id);
}
}
}
Expand All @@ -360,7 +380,12 @@ impl Handle {
/// `add_entry` must not be called concurrently.
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.inner.lock_sharded_wheel(entry.as_ref().shard_id());
let wheels_lock = self
.inner
.wheels
.read()
.expect("Timer wheel shards poisened");
let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id());

if entry.as_ref().might_be_registered() {
lock.remove(entry);
Expand All @@ -383,7 +408,13 @@ impl Handle {
entry: NonNull<TimerShared>,
) {
let waker = unsafe {
let mut lock = self.inner.lock_sharded_wheel(entry.as_ref().shard_id());
let wheels_lock = self
.inner
.wheels
.read()
.expect("Timer wheel shards poisened");
tglane marked this conversation as resolved.
Show resolved Hide resolved

let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id());

// We may have raced with a firing/deregistration, so check before
// deregistering.
Expand Down Expand Up @@ -443,24 +474,17 @@ impl Handle {
// ===== impl Inner =====

impl Inner {
/// Locks the driver's sharded wheel structure.
pub(super) fn lock_sharded_wheel(
&self,
shard_id: u32,
) -> crate::loom::sync::MutexGuard<'_, Wheel> {
let index = shard_id % (self.wheels.len() as u32);
// Safety: This modulo operation ensures that the index is not out of bounds.
unsafe { self.wheels.get_unchecked(index as usize).lock() }
}

// Check whether the driver has been shutdown
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}

// Gets the number of shards.
fn get_shard_size(&self) -> u32 {
self.wheels.len() as u32
self.wheels
.read()
.expect("Timer wheel shards poisened")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit unfortunate that this can panic. Not because the panic itself is bad, if the lock is poisoned, we should panic, but because panicking code will make this otherwise small and trivially inlined function generate way more code. Perhaps we should move the number of shards outside of the lock, instead? Since it will never change, it should be fine to do that...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, a typo:

Suggested change
.expect("Timer wheel shards poisened")
.expect("Timer wheel shards poisoned")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems reasonable to put it as a member variable into the Inner struct to avoid the lock here. The increase in size of the struct would probably be ok since it's "only" a u32, so I would agree to do it like that if nobody objects.

Copy link
Contributor Author

@tglane tglane Aug 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included the changes in e9c4351.

I also removed the method to get the shard size from the struct ShardedWheel to prevent duplications since we can get it from the struct Inner everywhere where It's needed.

.get_shard_size()
}
}

Expand All @@ -470,5 +494,31 @@ impl fmt::Debug for Inner {
}
}

// ===== impl ShardedWheel =====

impl ShardedWheel {
/// Locks the driver's sharded wheel structure.
pub(super) fn lock_sharded_wheel(
&self,
shard_id: u32,
) -> crate::loom::sync::MutexGuard<'_, Wheel> {
let index = shard_id % (self.0.len() as u32);
// Safety: This modulo operation ensures that the index is not out of bounds.
unsafe { self.0.get_unchecked(index as usize) }.lock()
}

/// Gets a mutable reference to the sharded wheel with the given id.
pub(super) fn get_sharded_wheel(&mut self, shard_id: u32) -> &mut wheel::Wheel {
tglane marked this conversation as resolved.
Show resolved Hide resolved
let index = shard_id % (self.0.len() as u32);
// Safety: This modulo operation ensures that the index is not out of bounds.
unsafe { self.0.get_unchecked_mut(index as usize) }.get_mut()
}

/// Gets the number of shards.
fn get_shard_size(&self) -> u32 {
self.0.len() as u32
}
}

#[cfg(test)]
mod tests;
Loading