Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consumers to implement POSIX AIO sources. #4054

Merged
merged 18 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
mio-aio = { git = "https://github.com/asomers/mio-aio", rev = "2f56696", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5", features = ["futures", "checkpoint"] }

Expand Down
195 changes: 195 additions & 0 deletions tokio/src/io/bsd/poll_aio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
//! Use POSIX AIO futures with Tokio

use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::event::Source;
use mio::Registry;
use mio::Token;
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::task::{Context, Poll};

/// Like [`mio::event::Source`], but for POSIX AIO only.
///
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`Aio`] object.
pub trait AioSource {
/// Register this AIO event source with Tokio's reactor
fn register(&mut self, kq: RawFd, token: usize);

/// Deregister this AIO event source with Tokio's reactor
fn deregister(&mut self);
}

/// Wrap the user's AioSource in order to implement mio::event::Source, which
/// is what the rest of the crate wants.
struct MioSource<T>(T);

impl<T: AioSource> Source for MioSource<T> {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}

fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
self.0.deregister();
Ok(())
}

fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}
}

/// Associates a POSIX AIO control block with the reactor that drives it.
///
/// `Aio`'s wrapped type must implement [`AioSource`] to be driven
/// by the reactor.
///
/// The wrapped source may be accessed through the `Aio` via the `Deref` and
/// `DerefMut` traits.
///
/// ## Clearing readiness
///
/// If [`Aio::poll`] returns ready, but the consumer determines that the
/// Source is not completely ready and must return to the Pending state,
/// [`Aio::clear_ready`] may be used. This can be useful with
/// [`lio_listio`], which may generate a kevent when only a portion of the
/// operations have completed.
///
/// ## Platforms
///
/// Only FreeBSD implements POSIX AIO with kqueue notification, so
/// `Aio` is only available for that operating system.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
// Note: Unlike every other kqueue event source, POSIX AIO registers events not
// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
// aio_write, etc. It needs the kqueue's file descriptor to do that. So
// AsyncFd can't be used for POSIX AIO.
//
// Note that Aio doesn't implement Drop. There's no need. Unlike other
// kqueue sources, simply dropping the object effectively deregisters it.
pub struct Aio<E: AioSource> {
io: MioSource<E>,
registration: Registration,
}

// ===== impl Aio =====

impl<E: AioSource> Aio<E> {
/// Indicates to Tokio that the source is no longer ready. The internal
/// readiness flag will be cleared, and tokio will wait for the next
/// edge-triggered readiness notification from the OS.
///
/// It is critical that this method not be called unless your code
/// _actually observes_ that the source is _not_ ready. The OS must
/// deliver a subsequent notification, or this source will block
/// forever. It is equally critical that you `do` call this method if you
/// resubmit the same structure to the kernel and poll it again.
///
/// This method is not very useful with AIO readiness, since each `aiocb`
/// structure is typically only used once. It's main use with
/// [`lio_listio`], which will sometimes send notification when only a
/// portion of its elements are complete. In that case, the caller must
/// call `clear_ready` before resubmitting it.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn clear_ready(&self, ev: AioEvent) {
self.registration.clear_readiness(ev.0)
}

/// Destroy the [`Aio`] and return its inner Source
pub fn into_inner(self) -> E {
self.io.0
}

/// Creates a new `Aio` suitable for use with POSIX AIO functions.
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_for_aio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::AIO)
}

/// Creates a new `Aio` suitable for use with [`lio_listio`].
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn new_for_lio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::LIO)
}

fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
let mut io = MioSource(io);
let handle = Handle::current();
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io, registration })
}

/// Polls for readiness. Either AIO or LIO counts.
///
/// This method returns:
/// * `Poll::Pending` if the underlying operation is not complete, whether
/// or not it completed successfully. This will be true if the OS is
/// still processing it, or if it has not yet been submitted to the OS.
/// * `Poll::Ready(Ok(_))` if the underlying operation is complete.
/// * `Poll::Ready(Err(_))` if the reactor has been shutdown. This does
/// _not_ indicate that the underlying operation encountered an error.
///
/// When the method returns Poll::Pending, the Waker in the provided Context
/// is scheduled to receive a wakeup when the underlying operation
/// completes. Note that on multiple calls to poll, only the Waker from the
/// Context passed to the most recent call is scheduled to receive a wakeup.
pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<AioEvent>> {
let ev = ready!(self.registration.poll_read_ready(cx))?;
Poll::Ready(Ok(AioEvent(ev)))
}
}

impl<E: AioSource> Deref for Aio<E> {
type Target = E;

fn deref(&self) -> &E {
&self.io.0
}
}

impl<E: AioSource> DerefMut for Aio<E> {
fn deref_mut(&mut self) -> &mut E {
&mut self.io.0
}
}

impl<E: AioSource + fmt::Debug> fmt::Debug for Aio<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Aio").field("io", &self.io.0).finish()
}
}

/// Opaque data returned by [`Aio::poll`].
///
/// It can be fed back to [`Aio::clear_ready`].
#[derive(Debug)]
pub struct AioEvent(ReadyEvent);
18 changes: 18 additions & 0 deletions tokio/src/io/driver/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ use std::ops;
pub struct Interest(mio::Interest);

impl Interest {
cfg_aio! {
/// Interest for POSIX AIO
#[cfg(target_os = "freebsd")]
pub const AIO: Interest = Interest(mio::Interest::AIO);

/// Interest for POSIX AIO
#[cfg(not(target_os = "freebsd"))]
pub const AIO: Interest = Interest(mio::Interest::READABLE);

/// Interest for POSIX AIO lio_listio events
#[cfg(target_os = "freebsd")]
pub const LIO: Interest = Interest(mio::Interest::LIO);

/// Interest for POSIX AIO lio_listio events
#[cfg(not(target_os = "freebsd"))]
pub const LIO: Interest = Interest(mio::Interest::READABLE);
}

/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/io/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ impl Ready {
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;

#[cfg(all(target_os = "freebsd", feature = "net"))]
{
if event.is_aio() {
ready |= Ready::READABLE;
}

if event.is_lio() {
ready |= Ready::READABLE;
}
}

if event.is_readable() {
ready |= Ready::READABLE;
}
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ cfg_io_driver_impl! {
pub(crate) use poll_evented::PollEvented;
}

cfg_aio! {
/// BSD-specific I/O types
pub mod bsd {
pub mod poll_aio;
}
}

cfg_net_unix! {
mod async_fd;

Expand Down
5 changes: 3 additions & 2 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,8 +307,9 @@
//! - `rt-multi-thread`: Enables the heavier, multi-threaded, work-stealing scheduler.
//! - `io-util`: Enables the IO based `Ext` traits.
//! - `io-std`: Enable `Stdout`, `Stdin` and `Stderr` types.
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and `UdpSocket`,
//! as well as (on Unix-like systems) `AsyncFd`
//! - `net`: Enables `tokio::net` types such as `TcpStream`, `UnixStream` and
//! `UdpSocket`, as well as (on Unix-like systems) `AsyncFd` and (on
//! FreeBSD) `PollAio`.
//! - `time`: Enables `tokio::time` types and allows the schedulers to enable
//! the built in timer.
//! - `process`: Enables `tokio::process` types.
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl {
}
}

macro_rules! cfg_aio {
($($item:item)*) => {
$(
#[cfg(all(any(docsrs, target_os = "freebsd"), feature = "net"))]
#[cfg_attr(docsrs,
doc(cfg(all(target_os = "freebsd", feature = "net")))
)]
$item
)*
}
}

macro_rules! cfg_fs {
($($item:item)*) => {
$(
Expand Down
Loading