Skip to content

Commit

Permalink
coop: expose coop as a public module (#7116)
Browse files Browse the repository at this point in the history
  • Loading branch information
maminrayej authored Feb 14, 2025
1 parent 9b578f0 commit 605ef57
Show file tree
Hide file tree
Showing 31 changed files with 190 additions and 112 deletions.
3 changes: 2 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
299
300
&
+
<
Expand Down Expand Up @@ -78,6 +78,7 @@ deallocate
deallocated
Deallocates
debuginfo
decrement
decrementing
demangled
dequeued
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl CopyBuffer {
feature = "time",
))]
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));
loop {
// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
Expand Down
6 changes: 3 additions & 3 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ impl AsyncRead for SimplexStream {
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = self.poll_read_internal(cx, buf);
if ret.is_ready() {
Expand Down Expand Up @@ -362,7 +362,7 @@ impl AsyncWrite for SimplexStream {
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = self.poll_write_internal(cx, buf);
if ret.is_ready() {
Expand Down Expand Up @@ -390,7 +390,7 @@ impl AsyncWrite for SimplexStream {
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
ready!(crate::trace::trace_leaf(cx));
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = self.poll_write_vectored_internal(cx, bufs);
if ret.is_ready() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ cfg_io_util! {

cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
let coop = std::task::ready!(crate::runtime::coop::poll_proceed(cx));
let coop = std::task::ready!(crate::task::coop::poll_proceed(cx));
coop.made_progress();
std::task::Poll::Ready(())
}
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ where
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let ret = Pin::new(&mut self.inner).poll(cx);

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/blocking/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ where
// currently goes through Task::poll(), and so is subject to budgeting. That isn't really
// what we want; a blocking task may itself want to run tasks (it might be a Worker!), so
// we want it to start without any budgeting.
crate::runtime::coop::stop();
crate::task::coop::stop();

Poll::Ready(func())
}
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::loom::thread::AccessError;
use crate::runtime::coop;
use crate::task::coop;

use std::cell::Cell;

Expand Down Expand Up @@ -135,7 +135,7 @@ pub(crate) fn thread_rng_n(n: u32) -> u32 {
})
}

pub(super) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
pub(crate) fn budget<R>(f: impl FnOnce(&Cell<coop::Budget>) -> R) -> Result<R, AccessError> {
CONTEXT.try_with(|ctx| f(&ctx.budget))
}

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/context/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl BlockingRegionGuard {
let when = Instant::now() + timeout;

loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Registration {
) -> Poll<io::Result<ReadyEvent>> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if ev.is_shutdown {
Expand Down Expand Up @@ -219,7 +219,7 @@ impl Registration {
loop {
let event = self.readiness(interest).await?;

let coop = std::future::poll_fn(crate::runtime::coop::poll_proceed).await;
let coop = std::future::poll_fn(crate::task::coop::poll_proceed).await;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@
//! [`event_interval`]: crate::runtime::Builder::event_interval
//! [`disable_lifo_slot`]: crate::runtime::Builder::disable_lifo_slot
//! [the lifo slot optimization]: crate::runtime::Builder::disable_lifo_slot
//! [coop budget]: crate::task#cooperative-scheduling
//! [coop budget]: crate::task::coop#cooperative-scheduling
//! [`worker_mean_poll_time`]: crate::runtime::RuntimeMetrics::worker_mean_poll_time
// At the top due to macros
Expand All @@ -321,8 +321,6 @@ mod tests;

pub(crate) mod context;

pub(crate) mod coop;

pub(crate) mod park;

mod driver;
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl CachedParkThread {
pin!(f);

loop {
if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) {
if let Ready(v) = crate::task::coop::budget(|| f.as_mut().poll(&mut cx)) {
return Ok(v);
}

Expand Down
4 changes: 2 additions & 2 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl Context {
/// thread-local context.
fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
core.metrics.start_poll();
let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
let mut ret = self.enter(core, || crate::task::coop::budget(f));
ret.0.metrics.end_poll();
ret
}
Expand Down Expand Up @@ -730,7 +730,7 @@ impl CoreGuard<'_> {

if handle.reset_woken() {
let (c, res) = context.enter(core, || {
crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
crate::task::coop::budget(|| future.as_mut().poll(&mut cx))
});

core = c;
Expand Down
5 changes: 2 additions & 3 deletions tokio/src/runtime/scheduler/multi_thread/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ use crate::runtime::scheduler::multi_thread::{
};
use crate::runtime::scheduler::{inject, Defer, Lock};
use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
use crate::runtime::{
blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
};
use crate::runtime::{blocking, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{context, TaskHooks};
use crate::task::coop;
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

Expand Down
3 changes: 2 additions & 1 deletion tokio/src/runtime/scheduler/multi_thread_alt/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ use crate::runtime::scheduler::multi_thread_alt::{
};
use crate::runtime::scheduler::{self, inject, Lock};
use crate::runtime::task::{OwnedTasks, TaskHarnessScheduleHooks};
use crate::runtime::{blocking, coop, driver, task, Config, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{blocking, driver, task, Config, SchedulerMetrics, WorkerMetrics};
use crate::runtime::{context, TaskHooks};
use crate::task::coop;
use crate::util::atomic_cell::AtomicCell;
use crate::util::rand::{FastRand, RngSeedGenerator};

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/task/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ impl<T> Future for JoinHandle<T> {
let mut ret = Poll::Pending;

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

// Try to read the task output. If the task is not yet complete, the
// waker is stored and is notified once the task does complete.
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,11 +591,11 @@ impl Future for Acquire<'_> {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let coop = ready!(trace_poll_op!(
"poll_acquire",
crate::runtime::coop::poll_proceed(cx),
crate::task::coop::poll_proceed(cx),
));

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
Poll::Pending => {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicBool, AtomicUsize};
use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
use crate::runtime::coop::cooperative;
use crate::task::coop::cooperative;
use crate::util::linked_list::{self, GuardedLinkedList, LinkedList};
use crate::util::WakeList;

Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@
//! or even use them from non-Tokio runtimes.
//!
//! When used in a Tokio runtime, the synchronization primitives participate in
//! [cooperative scheduling](crate::task#cooperative-scheduling) to avoid
//! [cooperative scheduling](crate::task::coop#cooperative-scheduling) to avoid
//! starvation. This feature does not apply when used from non-Tokio runtimes.
//!
//! As an exception, methods ending in `_timeout` are not runtime agnostic
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl<T, S: Semaphore> Rx<T, S> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
Expand Down Expand Up @@ -354,7 +354,7 @@ impl<T, S: Semaphore> Rx<T, S> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

if limit == 0 {
coop.made_progress();
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
//! runtimes.
//!
//! When used in a Tokio runtime, it participates in
//! [cooperative scheduling](crate::task#cooperative-scheduling) to avoid
//! [cooperative scheduling](crate::task::coop#cooperative-scheduling) to avoid
//! starvation. This feature does not apply when used from non-Tokio runtimes.
//!
//! As an exception, methods ending in `_timeout` are not runtime agnostic
Expand Down
4 changes: 2 additions & 2 deletions tokio/src/sync/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ impl<T> Sender<T> {
ready!(crate::trace::trace_leaf(cx));

// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

let inner = self.inner.as_ref().unwrap();

Expand Down Expand Up @@ -1142,7 +1142,7 @@ impl<T> Inner<T> {
fn poll_recv(&self, cx: &mut Context<'_>) -> Poll<Result<T, RecvError>> {
ready!(crate::trace::trace_leaf(cx));
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let coop = ready!(crate::task::coop::poll_proceed(cx));

// Load the state
let mut state = State::load(&self.state, Acquire);
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe
use crate::runtime::coop::cooperative;
use crate::sync::notify::Notify;
use crate::task::coop::cooperative;

use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::{AcqRel, Relaxed};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::task::{ready, Poll};

/// Consumes a unit of budget and returns the execution back to the Tokio
/// runtime *if* the task's coop budget was exhausted.
///
Expand All @@ -25,14 +23,14 @@ use std::task::{ready, Poll};
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "rt")))]
pub async fn consume_budget() {
let mut status = Poll::Pending;
let mut status = std::task::Poll::Pending;

std::future::poll_fn(move |cx| {
ready!(crate::trace::trace_leaf(cx));
std::task::ready!(crate::trace::trace_leaf(cx));
if status.is_ready() {
return status;
}
status = crate::runtime::coop::poll_proceed(cx).map(|restore| {
status = crate::task::coop::poll_proceed(cx).map(|restore| {
restore.made_progress();
});
status
Expand Down
Loading

0 comments on commit 605ef57

Please sign in to comment.