Skip to content

io: remove redundant check #6966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 23 additions & 54 deletions tokio/src/runtime/io/scheduled_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,43 +206,23 @@ impl ScheduledIo {
/// specific tick.
/// - `f`: a closure returning a new readiness value given the previous
/// readiness.
pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) {
let mut current = self.readiness.load(Acquire);

// If the io driver is shut down, then you are only allowed to clear readiness.
debug_assert!(SHUTDOWN.unpack(current) == 0 || matches!(tick, Tick::Clear(_)));

loop {
// Mask out the tick bits so that the modifying function doesn't see
// them.
let current_readiness = Ready::from_usize(current);
let new = f(current_readiness);

let new_tick = match tick {
Tick::Set => {
let current = TICK.unpack(current);
current.wrapping_add(1) % (TICK.max_value() + 1)
}
Tick::Clear(t) => {
if TICK.unpack(current) as u8 != t {
// Trying to clear readiness with an old event!
return;
}

t as usize
}
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
// If the io driver is shut down, then you are only allowed to clear readiness.
debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));

const MAX_TICK: usize = TICK.max_value() + 1;
let tick = TICK.unpack(curr);

let new_tick = match tick_op {
// Trying to clear readiness with an old event!
Tick::Clear(t) if tick as u8 != t => return None,
Tick::Clear(t) => t as usize,
Tick::Set => tick.wrapping_add(1) % MAX_TICK,
};
let next = TICK.pack(new_tick, new.as_usize());

match self
.readiness
.compare_exchange(current, next, AcqRel, Acquire)
{
Ok(_) => return,
// we lost the race, retry!
Err(actual) => current = actual,
}
}
let ready = Ready::from_usize(READINESS.unpack(curr));
Some(TICK.pack(new_tick, f(ready).as_usize()))
});
}

/// Notifies all pending waiters that have registered interest in `ready`.
Expand Down Expand Up @@ -335,22 +315,16 @@ impl ScheduledIo {
if ready.is_empty() && !is_shutdown {
// Update the task info
let mut waiters = self.waiters.lock();
let slot = match direction {
let waker = match direction {
Direction::Read => &mut waiters.reader,
Direction::Write => &mut waiters.writer,
};

// Avoid cloning the waker if one is already stored that matches the
// current task.
match slot {
Some(existing) => {
if !existing.will_wake(cx.waker()) {
existing.clone_from(cx.waker());
}
}
None => {
*slot = Some(cx.waker().clone());
}
match waker {
Some(waker) => waker.clone_from(cx.waker()),
None => *waker = Some(cx.waker().clone()),
}

// Try again, in case the readiness was changed while we were
Expand Down Expand Up @@ -465,12 +439,11 @@ impl Future for Readiness<'_> {
State::Init => {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;

// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);
let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);

if !ready.is_empty() || is_shutdown {
// Currently ready!
Expand Down Expand Up @@ -538,10 +511,7 @@ impl Future for Readiness<'_> {
*state = State::Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
}

w.waker.as_mut().unwrap().clone_from(cx.waker());
return Poll::Pending;
}

Expand All @@ -566,8 +536,7 @@ impl Future for Readiness<'_> {

// The readiness state could have been cleared in the meantime,
// but we allow the returned ready set to be empty.
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
let ready = curr_ready.intersection(w.interest);
let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest);

return Poll::Ready(ReadyEvent {
tick,
Expand Down
Loading