Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync, coop: apply cooperative scheduling to sync::watch #6846

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion tokio/src/runtime/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ cfg_rt! {
}

cfg_coop! {
use pin_project_lite::pin_project;
use std::cell::Cell;
use std::task::{Context, Poll};
use std::future::Future;
use std::pin::Pin;
use std::task::{ready, Context, Poll};

#[must_use]
pub(crate) struct RestoreOnPending(Cell<Budget>);
Expand Down Expand Up @@ -240,6 +243,44 @@ cfg_coop! {
self.0.is_none()
}
}

pin_project! {
/// Future wrapper to ensure cooperative scheduling.
///
/// When being polled `poll_proceed` is called before the inner future is polled to check
/// if the inner future has exceeded its budget. If the inner future resolves, this will
/// automatically call `RestoreOnPending::made_progress` before resolving this future with
/// the result of the inner one. If polling the inner future is pending, polling this future
/// type will also return a `Poll::Pending`.
#[must_use = "futures do nothing unless polled"]
pub(crate) struct Coop<F: Future> {
#[pin]
pub(crate) fut: F,
}
}

impl<F: Future> Future for Coop<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let coop = ready!(poll_proceed(cx));
let me = self.project();
if let Poll::Ready(ret) = me.fut.poll(cx) {
coop.made_progress();
Poll::Ready(ret)
} else {
Poll::Pending
}
}
}

/// Run a future with a budget constraint for cooperative scheduling.
/// If the future exceeds its budget while being polled, control is yielded back to the
/// runtime.
#[inline]
pub(crate) fn cooperative<F: Future>(fut: F) -> Coop<F> {
Coop { fut }
}
}

#[cfg(all(test, not(loom)))]
Expand Down
33 changes: 22 additions & 11 deletions tokio/src/sync/watch.rs
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
//! [`Sender::closed`]: crate::sync::watch::Sender::closed
//! [`Sender::subscribe()`]: crate::sync::watch::Sender::subscribe

use crate::runtime::coop::cooperative;
use crate::sync::notify::Notify;

use crate::loom::sync::atomic::AtomicUsize;
Expand Down Expand Up @@ -743,7 +744,7 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
changed_impl(&self.shared, &mut self.version).await
cooperative(changed_impl(&self.shared, &mut self.version)).await
}

/// Waits for a value that satisfies the provided condition.
Expand Down Expand Up @@ -807,6 +808,13 @@ impl<T> Receiver<T> {
/// }
/// ```
pub async fn wait_for(
&mut self,
f: impl FnMut(&T) -> bool,
) -> Result<Ref<'_, T>, error::RecvError> {
cooperative(self.wait_for_inner(f)).await
}

async fn wait_for_inner(
&mut self,
mut f: impl FnMut(&T) -> bool,
) -> Result<Ref<'_, T>, error::RecvError> {
Expand Down Expand Up @@ -1224,19 +1232,22 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
crate::trace::async_trace_leaf().await;
cooperative(async {
crate::trace::async_trace_leaf().await;

while self.receiver_count() > 0 {
let notified = self.shared.notify_tx.notified();
while self.receiver_count() > 0 {
let notified = self.shared.notify_tx.notified();

if self.receiver_count() == 0 {
return;
}
if self.receiver_count() == 0 {
return;
}

notified.await;
// The channel could have been reopened in the meantime by calling
// `subscribe`, so we loop again.
}
notified.await;
// The channel could have been reopened in the meantime by calling
// `subscribe`, so we loop again.
}
})
.await;
}

/// Creates a new [`Receiver`] connected to this `Sender`.
Expand Down
82 changes: 82 additions & 0 deletions tokio/tests/sync_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,85 @@ async fn receiver_is_notified_when_last_sender_is_dropped() {

assert!(t.is_woken());
}

#[tokio::test]
async fn receiver_changed_is_cooperative() {
let (tx, mut rx) = watch::channel(());

drop(tx);

tokio::select! {
biased;
_ = async {
loop {
assert!(rx.changed().await.is_err());
}
} => {},
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn receiver_changed_is_cooperative_ok() {
let (tx, mut rx) = watch::channel(());

tokio::select! {
biased;
_ = async {
loop {
assert!(tx.send(()).is_ok());
assert!(rx.changed().await.is_ok());
}
} => {},
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn receiver_wait_for_is_cooperative() {
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
let (tx, mut rx) = watch::channel(0);

drop(tx);

tokio::select! {
biased;
_ = async {
loop {
assert!(rx.wait_for(|val| *val == 1).await.is_err());
}
} => {},
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn receiver_wait_for_is_cooperative_ok() {
let (tx, mut rx) = watch::channel(0);

tokio::select! {
biased;
_ = async {
loop {
assert!(tx.send(1).is_ok());
assert!(rx.wait_for(|val| *val == 1).await.is_ok());
}
} => {},
_ = tokio::task::yield_now() => {},
}
}

#[tokio::test]
async fn sender_closed_is_cooperative() {
let (tx, rx) = watch::channel(());

drop(rx);

tokio::select! {
_ = async {
loop {
tx.closed().await;
}
} => {},
_ = tokio::task::yield_now() => {},
}
}
Loading