Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: add AsyncFd::try_io() and try_io_mut() #6967

Merged
merged 1 commit into from
Nov 19, 2024

Conversation

de-vri-es
Copy link
Contributor

@de-vri-es de-vri-es commented Nov 11, 2024

This allows to provide APIs like try_recv() and try_send() in custom types built on top of AsyncFd.

Closes #4719.

Motivation

Custom types built on top of AsyncFd may want to provide functions like try_recv() and try_send(). Currently, I don't think this can be done cleanly, because the poll_[read|write]_ready() functions will register the current task to be woken. They use the reserved waker slot, so they would interfere with other users of poll_{ready,write}_ready(), such as AsyncRead and AsyncWrite implementations.

I suppose you currently could poll the future from {read,write}_ready() once with a fake waker. But that is quite a hacky workaround, and if the future is not ready, the fake waker would actually be registered only to be removed again directly after the polling.

Solution

Add AsyncFd::try_io() and AsyncFd::try_io_mut() to mirror the API from UnixDatagram other sockets. There is a mut and non-mut version to allow mut or const access to the wrapped resource as required, just like AsyncFd::async_io[_mut]().

Example

To illustrate why this is useful, consider the current implementation of a try_recv() function:

impl CanSocket {
    /// Receive a frame from the socket, without waiting for one to become available.
    pub fn try_recv(&self) -> std::io::Result<CanFrame> {
        use std::future::Future;

        let waker = unsafe { std::task::Waker::from_raw(nop_waker_new()) };
        let mut context = std::task::Context::from_waker(&waker);

        let work = self.io.async_io(tokio::io::Interest::READABLE, |socket| Ok(CanFrame {
            inner: socket.recv()?,
        }));
        let work = std::pin::pin!(work);
        match work.poll(&mut context) {
            std::task::Poll::Ready(result) => result,
            std::task::Poll::Pending => Err(std::io::ErrorKind::WouldBlock.into()),
        }
    }
}

const NOP_WAKER_VTABLE: std::task::RawWakerVTable = std::task::RawWakerVTable::new(nop_waker_clone, nop_waker_wake, nop_waker_wake_by_ref, nop_waker_drop);

fn nop_waker_new() -> std::task::RawWaker {
    std::task::RawWaker::new(std::ptr::null(), &NOP_WAKER_VTABLE)
}
fn nop_waker_clone(_waker: *const ()) -> std::task::RawWaker {
    nop_waker_new()
}
fn nop_waker_wake(_waker: *const ()) { }
fn nop_waker_wake_by_ref(_waker: *const ()) { }
fn nop_waker_drop(_waker: *const ()) { }

Or, if we have try_io:

impl CanSocket {
    /// Receive a frame from the socket, without waiting for one to become available.
    pub fn try_recv(&self) -> std::io::Result<CanFrame> {
        self.io.try_io(|io| Ok(CanFrame {
            inner: io.get_ref().recv()?,
        }));
    }
}

No unnecessary wakers being registered, no unsafe code to avoid Arcs in the waker.

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-io Module: tokio/io labels Nov 19, 2024
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. Are you able to fix the merge conflict?

This allows to provide APIs like `try_recv()` and `try_send()` in custom
types built on top of `AsyncFd`.
@de-vri-es
Copy link
Contributor Author

Rebased on current master, and switched the test to use yield_now() instead of a tiny sleep. Thanks for looking at the PR!

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@Darksonn Darksonn merged commit cbdceb9 into tokio-rs:master Nov 19, 2024
82 checks passed
kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Dec 4, 2024
Bumps tokio from 1.41.1 to 1.42.0.

Release notes
Sourced from tokio's releases.

Tokio v1.42.0
1.42.0 (Dec 3rd, 2024)
Added

io: add AsyncFd::{try_io, try_io_mut} (#6967)

Fixed

io: avoid ptr->ref->ptr roundtrip in RegistrationSet (#6929)
runtime: do not defer yield_now inside block_in_place (#6999)

Changes

io: simplify io readiness logic (#6966)

Documented

net: fix docs for tokio::net::unix::{pid_t, gid_t, uid_t} (#6791)
time: fix a typo in Instant docs (#6982)

#6791: tokio-rs/tokio#6791
#6929: tokio-rs/tokio#6929
#6966: tokio-rs/tokio#6966
#6967: tokio-rs/tokio#6967
#6982: tokio-rs/tokio#6982
#6999: tokio-rs/tokio#6999



Commits

bb9d570 chore: prepare Tokio v1.42.0 (#7005)
af9c683 tests: fix typo in build test instructions (#7004)
4bc5a1a ci: allow Unicode-3.0 license for unicode-ident (#7006)
f8948ea runtime: do not defer yield_now inside block_in_place (#6999)
bce9780 time: use array::from_fn instead of manually creating array (#7000)
38151f3 readme: unlist 1.32.x as LTS release (#6997)
5dda72d ci: pin valgrind to rustc 1.82 (#6998)
c07257f io: simplify io readiness logic (#6966)
d08578f time: fix a typo in Instant docs (#6982)
4047d79 miri: add annotations for tests with miri ignore (#6981)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-io Module: tokio/io
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add AsyncFd::try_io
2 participants