Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-platform): wire up tokio sockets to ring #1790

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quic/s2n-quic-platform/src/io/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::{net::UdpSocket, runtime::Handle};

mod builder;
mod clock;
mod task;
#[cfg(test)]
mod tests;

Expand Down
47 changes: 47 additions & 0 deletions quic/s2n-quic-platform/src/io/tokio/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

// depending on the platform, some of these implementations aren't used
#![allow(dead_code)]

mod simple;
#[cfg(unix)]
mod unix;

cfg_if::cfg_if! {
if #[cfg(s2n_quic_platform_socket_mmsg)] {
pub use mmsg::{rx, tx};
} else if #[cfg(s2n_quic_platform_socket_msg)] {
pub use msg::{rx, tx};
} else {
pub use simple::{rx, tx};
}
}

macro_rules! libc_msg {
($message:ident, $cfg:ident) => {
#[cfg($cfg)]
mod $message {
use super::unix;
use crate::{features::Gso, message::$message::Message, socket::ring};

pub async fn rx<S: Into<std::net::UdpSocket>>(
socket: S,
producer: ring::Producer<Message>,
) -> std::io::Result<()> {
unix::rx(socket, producer).await
}

pub async fn tx<S: Into<std::net::UdpSocket>>(
socket: S,
consumer: ring::Consumer<Message>,
gso: Gso,
) -> std::io::Result<()> {
unix::tx(socket, consumer, gso).await
}
}
};
}

libc_msg!(msg, s2n_quic_platform_socket_msg);
libc_msg!(mmsg, s2n_quic_platform_socket_mmsg);
123 changes: 123 additions & 0 deletions quic/s2n-quic-platform/src/io/tokio/task/simple.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
features::Gso,
message::{simple::Message, Message as _},
socket::{
ring, task,
task::{rx, tx},
},
syscall::SocketEvents,
};
use core::task::{Context, Poll};
use tokio::{io, net::UdpSocket};

pub async fn rx<S: Into<std::net::UdpSocket>>(
socket: S,
producer: ring::Producer<Message>,
) -> io::Result<()> {
let socket = socket.into();
socket.set_nonblocking(true).unwrap();

let socket = UdpSocket::from_std(socket).unwrap();
let result = task::Receiver::new(producer, socket).await;
if let Some(err) = result {
Err(err)
} else {
Ok(())
}
}

pub async fn tx<S: Into<std::net::UdpSocket>>(
socket: S,
consumer: ring::Consumer<Message>,
gso: Gso,
) -> io::Result<()> {
let socket = socket.into();
socket.set_nonblocking(true).unwrap();

let socket = UdpSocket::from_std(socket).unwrap();
let result = task::Sender::new(consumer, socket, gso).await;
if let Some(err) = result {
Err(err)
} else {
Ok(())
}
}

impl tx::Socket<Message> for UdpSocket {
type Error = io::Error;

#[inline]
fn send(
&mut self,
cx: &mut Context,
entries: &mut [Message],
events: &mut tx::Events,
) -> io::Result<()> {
for entry in entries {
let target = (*entry.remote_address()).into();
let payload = entry.payload_mut();
match self.poll_send_to(cx, payload, target) {
Poll::Ready(Ok(_)) => {
if events.on_complete(1).is_break() {
return Ok(());
}
}
Poll::Ready(Err(err)) => {
if events.on_error(err).is_break() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if I'm reading on_error right and it's not updated later in this PR somewhere, if we keep hitting errors we're going to spin here rather than exiting? Not entirely sure what kind of error that could be...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so the general case for TX errors is discarding the packet and moving on to the next one. The only reason we'd break is if we got a WouldBlock or Interrupted. Outside of these, the socket could return errors for various reasons like the address not being reachable or MTU being too large (for platforms that don't have a way to disable kernel-level MTU probing). In these cases, the packet transmission will always fail so we just discard it and let the connection assume it was lost.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think the latest version (with the for loop) fixes the issue I was worried about, or at least does within this function, where we'd get an Err, but not increment the pointer into entries and so infinitely spin in this loop trying to send (and failing).

return Ok(());
}
}
Poll::Pending => {
events.blocked();
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific code makes me wonder if an interface like events.finished(count: usize, res: Poll<Result<T, E>>) would reduce duplication and make things clearer in terms of why we increment self.pending in on_error, for example.

}
}
}

Ok(())
}
}

impl rx::Socket<Message> for UdpSocket {
type Error = io::Error;

#[inline]
fn recv(
&mut self,
cx: &mut Context,
entries: &mut [Message],
events: &mut rx::Events,
) -> io::Result<()> {
for entry in entries {
let payload = entry.payload_mut();
let mut buf = io::ReadBuf::new(payload);
match self.poll_recv_from(cx, &mut buf) {
Poll::Ready(Ok(addr)) => {
unsafe {
let len = buf.filled().len();
entry.set_payload_len(len);
}
entry.set_remote_address(&(addr.into()));

if events.on_complete(1).is_break() {
return Ok(());
}
}
Poll::Ready(Err(err)) => {
if events.on_error(err).is_break() {
return Ok(());
}
}
Poll::Pending => {
events.blocked();
break;
}
}
}

Ok(())
}
}
150 changes: 150 additions & 0 deletions quic/s2n-quic-platform/src/io/tokio/task/unix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
features::Gso,
socket::{
ring,
task::{rx, tx},
},
syscall::{SocketType, UnixMessage},
};
use core::task::{Context, Poll};
use std::{io, os::unix::io::AsRawFd};
use tokio::io::unix::AsyncFd;

pub async fn rx<S: Into<std::net::UdpSocket>, M: UnixMessage + Unpin>(
socket: S,
producer: ring::Producer<M>,
) -> io::Result<()> {
let socket = socket.into();
socket.set_nonblocking(true).unwrap();

let socket = AsyncFd::new(socket).unwrap();
let result = rx::Receiver::new(producer, socket).await;
if let Some(err) = result {
Err(err)
} else {
Ok(())
}
}

pub async fn tx<S: Into<std::net::UdpSocket>, M: UnixMessage + Unpin>(
socket: S,
consumer: ring::Consumer<M>,
gso: Gso,
) -> io::Result<()> {
let socket = socket.into();
socket.set_nonblocking(true).unwrap();

let socket = AsyncFd::new(socket).unwrap();
let result = tx::Sender::new(consumer, socket, gso).await;
if let Some(err) = result {
Err(err)
} else {
Ok(())
}
}

impl<S: AsRawFd, M: UnixMessage> tx::Socket<M> for AsyncFd<S> {
type Error = io::Error;

#[inline]
fn send(
&mut self,
cx: &mut Context,
entries: &mut [M],
events: &mut tx::Events,
) -> io::Result<()> {
// Call the syscall for the socket
//
// NOTE: we usually wrap this in a `AsyncFdReadyGuard::try_io`. However, here we just
// assume the socket is ready in the general case and then fall back to querying
// socket readiness if it's not. This can avoid some things like having to construct
// a `std::io::Error` with `WouldBlock` and dereferencing the registration.
M::send(self.get_ref().as_raw_fd(), entries, events);

// yield back if we weren't blocked
if !events.is_blocked() {
return Ok(());
}

// * First iteration we need to clear socket readiness since the `send` call returned a
// `WouldBlock`.
// * Second iteration we need to register the waker, assuming the socket readiness was
// cleared.
// * If we got a `Ready` anyway, then clear the blocked status and have the caller try
// again.
for i in 0..2 {
camshaft marked this conversation as resolved.
Show resolved Hide resolved
match self.poll_write_ready(cx) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble making sure there's not any race conditions with regards to readiness here -- typically I would expect the M::send to be called after we get the guard here.

But I also don't have a ready-made description of a problem situation -- I guess my main question is we we structure the code like this vs. "normally" with try_io on the guard or similar. Is the idea to avoid calling into Tokio if we're not going to go to sleep? If so, I think it would be helpful to annotate with a comment. (Maybe the same one as above).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment; let me know if that makes sense

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think this makes sense. I can't say I'm fully convinced that retrieving the guard after we call into I/O works in all cases - Tokio definitely seems to want you go "guard, then IO, then mark as complete if WouldBlock" - but I'm not sure why and it may not matter. Semantically this seems fine to me at a high level.

Poll::Ready(guard) => {
let mut guard = guard?;
if i == 0 {
guard.clear_ready();
} else {
events.take_blocked();
}
}
Poll::Pending => {
return Ok(());
}
}
}

Ok(())
}
}

impl<S: AsRawFd, M: UnixMessage> rx::Socket<M> for AsyncFd<S> {
type Error = io::Error;

#[inline]
fn recv(
&mut self,
cx: &mut Context,
entries: &mut [M],
events: &mut rx::Events,
) -> io::Result<()> {
// Call the syscall for the socket
//
// NOTE: we usually wrap this in a `AsyncFdReadyGuard::try_io`. However, here we just
// assume the socket is ready in the general case and then fall back to querying
// socket readiness if it's not. This can avoid some things like having to construct
// a `std::io::Error` with `WouldBlock` and dereferencing the registration.
M::recv(
self.get_ref().as_raw_fd(),
SocketType::NonBlocking,
entries,
events,
);

// yield back if we weren't blocked
if !events.is_blocked() {
return Ok(());
}

// * First iteration we need to clear socket readiness since the `recv` call returned a
// `WouldBlock`.
// * Second iteration we need to register the waker, assuming the socket readiness was
// cleared.
// * If we got a `Ready` anyway, then clear the blocked status and have the caller try
// again.
for i in 0..2 {
match self.poll_read_ready(cx) {
Poll::Ready(guard) => {
let mut guard = guard?;
if i == 0 {
guard.clear_ready();
} else {
events.take_blocked();
}
}
Poll::Pending => {
return Ok(());
}
}
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions quic/s2n-quic-platform/src/message/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ impl Message {
ExplicitCongestionNotification::default()
}

pub(crate) fn remote_address(&self) -> Option<SocketAddress> {
Some(self.address)
pub(crate) fn remote_address(&self) -> &SocketAddress {
&self.address
}

pub(crate) fn set_remote_address(&mut self, remote_address: &SocketAddress) {
Expand Down
36 changes: 17 additions & 19 deletions quic/s2n-quic-platform/src/socket/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,28 +98,26 @@ impl<B: Buffer> Queue<B> {
let mut entries = self.0.occupied_mut();

for entry in entries.as_mut() {
if let Some(remote_address) = entry.remote_address() {
match socket.send_to(entry.payload_mut(), &remote_address) {
Ok(_) => {
count += 1;
let remote_address = *entry.remote_address();
match socket.send_to(entry.payload_mut(), &remote_address) {
Ok(_) => {
count += 1;

publisher.on_platform_tx(event::builder::PlatformTx { count: 1 });
}
Err(err) if count > 0 && err.would_block() => {
break;
}
Err(err) if err.was_interrupted() || err.permission_denied() => {
break;
}
Err(err) => {
entries.finish(count);
publisher.on_platform_tx(event::builder::PlatformTx { count: 1 });
}
Err(err) if count > 0 && err.would_block() => {
break;
}
Err(err) if err.was_interrupted() || err.permission_denied() => {
break;
}
Err(err) => {
entries.finish(count);

publisher.on_platform_tx_error(event::builder::PlatformTxError {
errno: errno().0,
});
publisher
.on_platform_tx_error(event::builder::PlatformTxError { errno: errno().0 });

return Err(err);
}
return Err(err);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion quic/s2n-quic-platform/src/socket/task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

pub mod events;
mod events;
pub mod rx;
pub mod tx;

Expand Down
Loading