-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync, coop: apply cooperative scheduling to sync::watch
#6846
sync, coop: apply cooperative scheduling to sync::watch
#6846
Conversation
tokio/src/sync/watch.rs
Outdated
pub async fn changed(&mut self) -> Result<(), error::RecvError> { | ||
changed_impl(&self.shared, &mut self.version).await | ||
crate::runtime::coop::budget_constraint(changed_impl(&self.shared, &mut self.version)).await | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a test for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure! It has to be a loom test, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not necessary. You can do something like this:
let my_fut = async { loop { chan.changed().await } };
tokio::select! {
biased;
_ = my_fut => {},
_ = core::future::ready(()) => {},
}
Without coop, this test would run forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests to tokio::tests::sync_watch
for changed
, wait_for
and closed
.
sync::watch::Receiver::changed
sync::watch
tokio/src/sync/watch.rs
Outdated
closed = changed_impl(&self.shared, &mut self.version).await.is_err(); | ||
closed = cooperative(changed_impl(&self.shared, &mut self.version)) | ||
.await | ||
.is_err(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make wait_for
cooperative, as it may exit before it reaches this call without touching the budget. You need to wrap the entire function.
Also, you may want to make a wait_for_inner
method instead of wrapping the entire body in cooperative(async { ... })
to avoid extra indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yea I totally missed that! Thanks. Done in d5a6b3b.
tokio/tests/sync_watch.rs
Outdated
biased; | ||
_ = async { | ||
loop { | ||
let _ = rx.wait_for(|val| *val == 1).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let _ = rx.wait_for(|val| *val == 1).await; | |
assert!(rx.wait_for(|val| *val == 1).await.is_err()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in d5a6b3b.
`sync::watch::Receiver::changed`
d5a6b3b
to
95e4f9b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
Motivation
#6839 showed that
sync::watch::Receiver::changed
will not cooperatively yield control back to the runtime which led to it blocking the thread it was running on.Edit: This is also true for
sync::watch::Receiver::wait_for
andsync::watch::Sender::closed
Solution
Added cooperative scheduling to
sync::watch::Receiver::changed
by utilizing theruntime::coop
module.More specifically I added a new future type that wraps the implementation of
sync::watch::Receiver::changed
and checks if the current task has exceeded its budget before polling the wrapped future.Closes #6839.