Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow consumers to implement POSIX AIO sources. #4054

Merged
merged 18 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ default = []

# enable everything
full = [
"aio",
"fs",
"io-util",
"io-std",
Expand All @@ -42,6 +43,10 @@ full = [
"time",
]

aio = [
"mio/os-poll"
]

fs = []
io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
Expand Down Expand Up @@ -130,6 +135,10 @@ tempfile = "3.1.0"
async-stream = "0.3"
socket2 = "0.4"

[target.'cfg(target_os = "freebsd")'.dev-dependencies]
futures-test = "0.3.7"
mio-aio = { git = "https://github.com/asomers/mio-aio", rev = "2f56696", features = ["tokio"] }

[target.'cfg(loom)'.dev-dependencies]
loom = { version = "0.5", features = ["futures", "checkpoint"] }

Expand Down
10 changes: 10 additions & 0 deletions tokio/src/io/driver/interest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ use std::ops;
pub struct Interest(mio::Interest);

impl Interest {
cfg_aio! {
/// Interest for POSIX AIO
pub const AIO: Interest = Interest(mio::Interest::AIO);
}

cfg_aio! {
/// Interest for POSIX AIO lio_listio events
pub const LIO: Interest = Interest(mio::Interest::LIO);
}

/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
Expand Down
1 change: 1 addition & 0 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

#[derive(Debug)]
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
Expand Down
11 changes: 11 additions & 0 deletions tokio/src/io/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ impl Ready {
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
let mut ready = Ready::EMPTY;

#[cfg(all(target_os = "freebsd", feature = "aio"))]
{
if event.is_aio() {
ready |= Ready::READABLE;
}

if event.is_lio() {
ready |= Ready::READABLE;
}
}

if event.is_readable() {
ready |= Ready::READABLE;
}
Expand Down
5 changes: 5 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ cfg_io_driver_impl! {
pub use driver::{Interest, Ready};
}

cfg_aio! {
mod poll_aio;
pub use poll_aio::{AioSource, PollAio, PollAioEvent};
}

mod poll_evented;

#[cfg(not(loom))]
Expand Down
171 changes: 171 additions & 0 deletions tokio/src/io/poll_aio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use crate::io::driver::{Handle, Interest, ReadyEvent, Registration};
use mio::event::Source;
use mio::Registry;
use mio::Token;
use std::fmt;
use std::io;
use std::ops::{Deref, DerefMut};
use std::os::unix::io::AsRawFd;
use std::os::unix::prelude::RawFd;
use std::task::{Context, Poll};

/// Like [`mio::event::Source`], but for POSIX AIO only.
///
/// Tokio's consumer must pass an implementor of this trait to create a
/// [`PollAio`] object.
pub trait AioSource {
/// Register this AIO event source with Tokio's reactor
fn register(&mut self, kq: RawFd, token: usize);

/// Deregister this AIO event source with Tokio's reactor
fn deregister(&mut self);
}

/// Wrap the user's AioSource in order to implement mio::event::Source, which
/// is what the rest of the crate wants.
struct MioSource<T>(T);

impl<T: AioSource> Source for MioSource<T> {
fn register(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}

fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
self.0.deregister();
Ok(())
}

fn reregister(
&mut self,
registry: &Registry,
token: Token,
interests: mio::Interest,
) -> io::Result<()> {
assert!(interests.is_aio() || interests.is_lio());
self.0.register(registry.as_raw_fd(), usize::from(token));
Ok(())
}
}

/// Associates a POSIX AIO control block with the reactor that drives it.
///
/// `PollAio`'s wrapped type must implement [`AioSource`] to be driven
/// by the reactor.
///
/// The wrapped source may be accessed through the `PollAio` via the `Deref` and
/// `DerefMut` traits.
///
/// ## Clearing readiness
///
/// If [`PollAio::poll`] returns ready, but the consumer determines that the
/// Source is not completely ready and must return to the Pending state,
/// [`PollAio::clear_ready`] may be used. This can be useful with
/// [`lio_listio`], which may generate a kevent when only a portion of the
/// operations have completed.
///
/// ## Platforms
///
/// Only FreeBSD implements POSIX AIO with kqueue notification, so
/// `PollAio` is only available for that operating system.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
// Note: Unlike every other kqueue event source, POSIX AIO registers events not
// via kevent(2) but when the aiocb is submitted to the kernel via aio_read,
// aio_write, etc. It needs the kqueue's file descriptor to do that. So
// AsyncFd can't be used for POSIX AIO.
//
// Note that PollAio doesn't implement Drop. There's no need. Unlike other
// kqueue sources, there is nothing to deregister.
pub struct PollAio<E: AioSource> {
io: MioSource<E>,
registration: Registration,
}

// ===== impl PollAio =====

impl<E: AioSource> PollAio<E> {
/// Indicates to Tokio that the source is no longer ready. The internal
/// readiness flag will be cleared, and tokio will wait for the next
/// edge-triggered readiness notification from the OS.
///
/// It is critical that this function not be called unless your code
/// _actually observes_ that the source is _not_ ready. The OS must
/// deliver a subsequent notification, or this source will block
/// forever.
pub fn clear_ready(&self, ev: PollAioEvent) {
self.registration.clear_readiness(ev.0)
}

/// Destroy the [`PollAio`] and return its inner Source
pub fn into_inner(self) -> E {
self.io.0
}

/// Creates a new `PollAio` suitable for use with POSIX AIO functions.
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
pub fn new_for_aio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::AIO)
}

/// Creates a new `PollAio` suitable for use with [`lio_listio`].
///
/// It will be associated with the default reactor. The runtime is usually
/// set implicitly when this function is called from a future driven by a
/// tokio runtime, otherwise runtime can be set explicitly with
/// [`Runtime::enter`](crate::runtime::Runtime::enter) function.
///
/// [`lio_listio`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html
pub fn new_for_lio(io: E) -> io::Result<Self> {
Self::new_with_interest(io, Interest::LIO)
}

fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
let mut io = MioSource(io);
let handle = Handle::current();
let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
Ok(Self { io, registration })
}

/// Polls for readiness. Either AIO or LIO counts.
pub fn poll<'a>(&'a self, cx: &mut Context<'_>) -> Poll<io::Result<PollAioEvent>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid inherent fns named just poll as they would conflict w/ Future. Also, why is this an inherent fn? If it were implemented as a future, then it could be used with .await.

Perhaps, the inherent fn could be named poll_ready (or something) and an equivalent async fn is added async fn ready(&self) -> io::Result<AioEvent>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under the principle that "futures should do nothing until polled", I elected to submit the operation to the kernel (aio_write, etc) as part of poll. But that means that it needs to be in the external crate instead of in Tokio. Hence the inherent function. I'll rename it to poll_ready.

let ev = ready!(self.registration.poll_read_ready(cx))?;
Poll::Ready(Ok(PollAioEvent(ev)))
}
}

impl<E: AioSource> Deref for PollAio<E> {
type Target = E;

fn deref(&self) -> &E {
&self.io.0
}
}

impl<E: AioSource> DerefMut for PollAio<E> {
fn deref_mut(&mut self) -> &mut E {
&mut self.io.0
}
}

impl<E: AioSource + fmt::Debug> fmt::Debug for PollAio<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollAio").field("io", &self.io.0).finish()
}
}

/// Opaque data returned by [`PollAio::poll`].
///
/// It can be fed back to [`PollAio::clear_ready`].
#[derive(Debug)]
pub struct PollAioEvent(ReadyEvent);
1 change: 1 addition & 0 deletions tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@
//! - `sync`: Enables all `tokio::sync` types.
//! - `signal`: Enables all `tokio::signal` types.
//! - `fs`: Enables `tokio::fs` types.
//! - `aio`: Enables `tokio::io::PollAio`, for working with POSIX AIO.
//! - `test-util`: Enables testing based infrastructure for the Tokio runtime.
//!
//! _Note: `AsyncRead` and `AsyncWrite` traits do not require any features and are
Expand Down
15 changes: 15 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ macro_rules! cfg_atomic_waker_impl {
}
}

macro_rules! cfg_aio {
($($item:item)*) => {
$(
#[cfg(all(target_os = "freebsd", feature = "aio"))]
#[cfg_attr(docsrs,
doc(cfg(all(target_os = "freebsd", feature = "aio")))
)]
$item
)*
}
}

macro_rules! cfg_fs {
($($item:item)*) => {
$(
Expand All @@ -65,6 +77,7 @@ macro_rules! cfg_io_driver {
($($item:item)*) => {
$(
#[cfg(any(
all(target_os = "freebsd", feature = "aio"),
feature = "net",
feature = "process",
all(unix, feature = "signal"),
Expand All @@ -83,6 +96,7 @@ macro_rules! cfg_io_driver_impl {
( $( $item:item )* ) => {
$(
#[cfg(any(
all(target_os = "freebsd", feature = "aio"),
feature = "net",
feature = "process",
all(unix, feature = "signal"),
Expand All @@ -96,6 +110,7 @@ macro_rules! cfg_not_io_driver {
($($item:item)*) => {
$(
#[cfg(not(any(
all(target_os = "freebsd", feature = "aio"),
feature = "net",
feature = "process",
all(unix, feature = "signal"),
Expand Down
Loading