Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: add Ready::ERROR and report error readiness #5781

Merged
merged 9 commits into from
Aug 16, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 168 additions & 16 deletions tokio/src/io/interest.rs
Original file line number Diff line number Diff line change
@@ -5,49 +5,70 @@ use crate::io::ready::Ready;
use std::fmt;
use std::ops;

// These must be unique.
// same as mio
const READABLE: usize = 0b0001;
const WRITABLE: usize = 0b0010;
// The following are not available on all platforms.
#[cfg(target_os = "freebsd")]
const AIO: usize = 0b0100;
#[cfg(target_os = "freebsd")]
const LIO: usize = 0b1000;
#[cfg(any(target_os = "linux", target_os = "android"))]
const PRIORITY: usize = 0b0001_0000;
// error is available on all platforms, but behavior is platform-specific
// mio does not have this interest
const ERROR: usize = 0b0010_0000;

/// Readiness event interest.
///
/// Specifies the readiness events the caller is interested in when awaiting on
/// I/O resource readiness states.
#[cfg_attr(docsrs, doc(cfg(feature = "net")))]
#[derive(Clone, Copy, Eq, PartialEq)]
pub struct Interest(mio::Interest);
pub struct Interest(usize);

impl Interest {
// The non-FreeBSD definitions in this block are active only when
// building documentation.
cfg_aio! {
/// Interest for POSIX AIO.
#[cfg(target_os = "freebsd")]
pub const AIO: Interest = Interest(mio::Interest::AIO);
pub const AIO: Interest = Interest(AIO);

/// Interest for POSIX AIO.
#[cfg(not(target_os = "freebsd"))]
pub const AIO: Interest = Interest(mio::Interest::READABLE);
pub const AIO: Interest = Interest(READABLE);

/// Interest for POSIX AIO lio_listio events.
#[cfg(target_os = "freebsd")]
pub const LIO: Interest = Interest(mio::Interest::LIO);
pub const LIO: Interest = Interest(LIO);

/// Interest for POSIX AIO lio_listio events.
#[cfg(not(target_os = "freebsd"))]
pub const LIO: Interest = Interest(mio::Interest::READABLE);
pub const LIO: Interest = Interest(READABLE);
}

/// Interest in all readable events.
///
/// Readable interest includes read-closed events.
pub const READABLE: Interest = Interest(mio::Interest::READABLE);
pub const READABLE: Interest = Interest(READABLE);

/// Interest in all writable events.
///
/// Writable interest includes write-closed events.
pub const WRITABLE: Interest = Interest(mio::Interest::WRITABLE);
pub const WRITABLE: Interest = Interest(WRITABLE);

/// Interest in error events.
///
/// Passes error interest to the underlying OS selector.
/// Behavior is platform-specific, read your platform's documentation.
pub const ERROR: Interest = Interest(ERROR);

/// Returns a `Interest` set representing priority completion interests.
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub const PRIORITY: Interest = Interest(mio::Interest::PRIORITY);
pub const PRIORITY: Interest = Interest(PRIORITY);

/// Returns true if the value includes readable interest.
///
@@ -63,7 +84,7 @@ impl Interest {
/// assert!(both.is_readable());
/// ```
pub const fn is_readable(self) -> bool {
self.0.is_readable()
self.0 & READABLE != 0
}

/// Returns true if the value includes writable interest.
@@ -80,7 +101,34 @@ impl Interest {
/// assert!(both.is_writable());
/// ```
pub const fn is_writable(self) -> bool {
self.0.is_writable()
self.0 & WRITABLE != 0
}

/// Returns true if the value includes error interest.
///
/// # Examples
///
/// ```
/// use tokio::io::Interest;
///
/// assert!(Interest::ERROR.is_error());
/// assert!(!Interest::WRITABLE.is_error());
///
/// let combined = Interest::READABLE | Interest::ERROR;
/// assert!(combined.is_error());
/// ```
pub const fn is_error(self) -> bool {
self.0 & ERROR != 0
}

#[cfg(target_os = "freebsd")]
const fn is_aio(self) -> bool {
self.0 & AIO != 0
}

#[cfg(target_os = "freebsd")]
const fn is_lio(self) -> bool {
self.0 & LIO != 0
}

/// Returns true if the value includes priority interest.
@@ -99,7 +147,7 @@ impl Interest {
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub const fn is_priority(self) -> bool {
self.0.is_priority()
self.0 & PRIORITY != 0
}

/// Add together two `Interest` values.
@@ -116,12 +164,60 @@ impl Interest {
/// assert!(BOTH.is_readable());
/// assert!(BOTH.is_writable());
pub const fn add(self, other: Interest) -> Interest {
Interest(self.0.add(other.0))
Self(self.0 | other.0)
}

// This function must be crate-private to avoid exposing a `mio` dependency.
pub(crate) const fn to_mio(self) -> mio::Interest {
self.0
pub(crate) fn to_mio(self) -> mio::Interest {
fn mio_add(wrapped: &mut Option<mio::Interest>, add: mio::Interest) {
match wrapped {
Some(inner) => *inner |= add,
None => *wrapped = Some(add),
}
}

// mio does not allow and empty interest, so use None for empty
let mut mio = None;

if self.is_readable() {
mio_add(&mut mio, mio::Interest::READABLE);
}

if self.is_writable() {
mio_add(&mut mio, mio::Interest::WRITABLE);
}

#[cfg(any(target_os = "linux", target_os = "android"))]
if self.is_priority() {
mio_add(&mut mio, mio::Interest::PRIORITY);
}

#[cfg(target_os = "freebsd")]
if self.is_aio() {
mio_add(&mut mio, mio::Interest::AIO);
}

#[cfg(target_os = "freebsd")]
if self.is_lio() {
mio_add(&mut mio, mio::Interest::LIO);
}

if self.is_error() {
// There is no error interest in mio, because error events are always reported.
// But mio interests cannot be empty and an interest is needed just for the registeration.
//
// read readiness is filtered out in `Interest::mask` or `Ready::from_interest` if
// the read interest was not specified by the user.
mio_add(&mut mio, mio::Interest::READABLE);
}

// the default `mio::Interest::READABLE` should never be used in practice. Either
//
// - at least one tokio interest with a mio counterpart was used
// - only the error tokio interest was specified
//
// in both cases, `mio` is Some already
mio.unwrap_or(mio::Interest::READABLE)
}

pub(crate) fn mask(self) -> Ready {
@@ -130,6 +226,7 @@ impl Interest {
Interest::WRITABLE => Ready::WRITABLE | Ready::WRITE_CLOSED,
#[cfg(any(target_os = "linux", target_os = "android"))]
Interest::PRIORITY => Ready::PRIORITY | Ready::READ_CLOSED,
Interest::ERROR => Ready::ERROR,
_ => Ready::EMPTY,
}
}
@@ -147,12 +244,67 @@ impl ops::BitOr for Interest {
impl ops::BitOrAssign for Interest {
#[inline]
fn bitor_assign(&mut self, other: Self) {
self.0 = (*self | other).0;
*self = *self | other
}
}

impl fmt::Debug for Interest {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(fmt)
let mut separator = false;

if self.is_readable() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "READABLE")?;
separator = true;
}

if self.is_writable() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "WRITABLE")?;
separator = true;
}

#[cfg(any(target_os = "linux", target_os = "android"))]
if self.is_priority() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "PRIORITY")?;
separator = true;
}

#[cfg(target_os = "freebsd")]
if self.is_aio() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "AIO")?;
separator = true;
}

#[cfg(target_os = "freebsd")]
if self.is_lio() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "LIO")?;
separator = true;
}

if self.is_error() {
if separator {
write!(fmt, " | ")?;
}
write!(fmt, "ERROR")?;
separator = true;
}

let _ = separator;

Ok(())
}
}
35 changes: 32 additions & 3 deletions tokio/src/io/ready.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ const READ_CLOSED: usize = 0b0_0100;
const WRITE_CLOSED: usize = 0b0_1000;
#[cfg(any(target_os = "linux", target_os = "android"))]
const PRIORITY: usize = 0b1_0000;
const ERROR: usize = 0b10_0000;

/// Describes the readiness state of an I/O resources.
///
@@ -40,13 +41,17 @@ impl Ready {
#[cfg_attr(docsrs, doc(cfg(any(target_os = "linux", target_os = "android"))))]
pub const PRIORITY: Ready = Ready(PRIORITY);

/// Returns a `Ready` representing error readiness.
pub const ERROR: Ready = Ready(ERROR);

/// Returns a `Ready` representing readiness for all operations.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | PRIORITY);
pub const ALL: Ready =
Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR | PRIORITY);

/// Returns a `Ready` representing readiness for all operations.
#[cfg(not(any(target_os = "linux", target_os = "android")))]
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED);
pub const ALL: Ready = Ready(READABLE | WRITABLE | READ_CLOSED | WRITE_CLOSED | ERROR);

// Must remain crate-private to avoid adding a public dependency on Mio.
pub(crate) fn from_mio(event: &mio::event::Event) -> Ready {
@@ -79,6 +84,10 @@ impl Ready {
ready |= Ready::WRITE_CLOSED;
}

if event.is_error() {
ready |= Ready::ERROR;
}

#[cfg(any(target_os = "linux", target_os = "android"))]
{
if event.is_priority() {
@@ -182,6 +191,21 @@ impl Ready {
self.contains(Ready::PRIORITY)
}

/// Returns `true` if the value includes error `readiness`.
///
/// # Examples
///
/// ```
/// use tokio::io::Ready;
///
/// assert!(!Ready::EMPTY.is_error());
/// assert!(!Ready::WRITABLE.is_error());
/// assert!(Ready::ERROR.is_error());
/// ```
pub fn is_error(self) -> bool {
self.contains(Ready::ERROR)
}

/// Returns true if `self` is a superset of `other`.
///
/// `other` may represent more than one readiness operations, in which case
@@ -230,6 +254,10 @@ impl Ready {
ready |= Ready::READ_CLOSED;
}

if interest.is_error() {
ready |= Ready::ERROR;
}

ready
}

@@ -283,7 +311,8 @@ impl fmt::Debug for Ready {
fmt.field("is_readable", &self.is_readable())
.field("is_writable", &self.is_writable())
.field("is_read_closed", &self.is_read_closed())
.field("is_write_closed", &self.is_write_closed());
.field("is_write_closed", &self.is_write_closed())
.field("is_error", &self.is_error());

#[cfg(any(target_os = "linux", target_os = "android"))]
fmt.field("is_priority", &self.is_priority());
125 changes: 125 additions & 0 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
@@ -685,3 +685,128 @@ async fn clear_ready_matching_clears_ready_mut() {
guard.clear_ready_matching(Ready::WRITABLE);
assert_eq!(guard.ready(), Ready::EMPTY);
}

#[tokio::test]
#[cfg(target_os = "linux")]
async fn await_error_readiness_timestamping() {
use std::net::{Ipv4Addr, SocketAddr};

use tokio::io::{Interest, Ready};

let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));

let socket = std::net::UdpSocket::bind(address_a).unwrap();

socket.set_nonblocking(true).unwrap();

// configure send timestamps
configure_timestamping_socket(&socket).unwrap();

socket.connect(address_b).unwrap();

let fd = AsyncFd::new(socket).unwrap();

tokio::select! {
_ = fd.ready(Interest::ERROR) => panic!(),
_ = tokio::time::sleep(Duration::from_millis(10)) => {}
}

let buf = b"hello there";
fd.get_ref().send(buf).unwrap();

// the send timestamp should now be in the error queue
let guard = fd.ready(Interest::ERROR).await.unwrap();
assert_eq!(guard.ready(), Ready::ERROR);
}

#[cfg(target_os = "linux")]
fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
// enable software timestamping, and specifically software send timestamping
let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;

let res = unsafe {
libc::setsockopt(
udp_socket.as_raw_fd(),
libc::SOL_SOCKET,
libc::SO_TIMESTAMP,
&options as *const _ as *const libc::c_void,
std::mem::size_of_val(&options) as libc::socklen_t,
)
};

if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}

#[tokio::test]
#[cfg(target_os = "linux")]
async fn await_error_readiness_invalid_address() {
use std::net::{Ipv4Addr, SocketAddr};
use tokio::io::{Interest, Ready};

let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
let socket_fd = socket.as_raw_fd();

// Enable IP_RECVERR option to receive error messages
// https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
let recv_err: libc::c_int = 1;
unsafe {
let res = libc::setsockopt(
socket.as_raw_fd(),
libc::SOL_IP,
libc::IP_RECVERR,
&recv_err as *const _ as *const libc::c_void,
std::mem::size_of_val(&recv_err) as libc::socklen_t,
);
if res == -1 {
panic!("{:?}", std::io::Error::last_os_error());
}
}

// Spawn a separate thread for sending messages
tokio::spawn(async move {
// Set the destination address. This address is invalid in this context. the OS will notice
// that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
// but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
let mut dest_addr =
unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
dest_addr.sin_family = libc::AF_INET as _;
// based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
// below 1024 to guarantee that other tests don't select this port by accident when they
// use port 0 to select an ephemeral port.
dest_addr.sin_port = 512u16.to_be(); // Destination port

// Prepare the message data
let message = "Hello, Socket!";

// Prepare the message structure for sendmsg
let mut iov = libc::iovec {
iov_base: message.as_ptr() as *mut libc::c_void,
iov_len: message.len(),
};

// Prepare the destination address for the sendmsg call
let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;

let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
msg.msg_name = dest_sockaddr as *mut libc::c_void;
msg.msg_namelen = dest_addrlen;
msg.msg_iov = &mut iov;
msg.msg_iovlen = 1;

if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
Err(std::io::Error::last_os_error()).unwrap()
}
});

let fd = AsyncFd::new(socket).unwrap();

let guard = fd.ready(Interest::ERROR).await.unwrap();
assert_eq!(guard.ready(), Ready::ERROR);
}