-
Notifications
You must be signed in to change notification settings - Fork 119
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(s2n-quic-platform): wire up tokio sockets to ring
- Loading branch information
Showing
12 changed files
with
378 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// depending on the platform, some of these implementations aren't used | ||
#![allow(dead_code)] | ||
|
||
mod simple; | ||
#[cfg(unix)] | ||
mod unix; | ||
|
||
cfg_if::cfg_if! { | ||
if #[cfg(s2n_quic_platform_socket_mmsg)] { | ||
pub use mmsg::{rx, tx}; | ||
} else if #[cfg(s2n_quic_platform_socket_msg)] { | ||
pub use msg::{rx, tx}; | ||
} else { | ||
pub use simple::{rx, tx}; | ||
} | ||
} | ||
|
||
macro_rules! libc_msg { | ||
($message:ident, $cfg:ident) => { | ||
#[cfg($cfg)] | ||
mod $message { | ||
use super::unix; | ||
use crate::{features::Gso, message::$message::Message, socket::ring}; | ||
|
||
pub async fn rx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
producer: ring::Producer<Message>, | ||
) -> std::io::Result<()> { | ||
unix::rx(socket, producer).await | ||
} | ||
|
||
pub async fn tx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
consumer: ring::Consumer<Message>, | ||
gso: Gso, | ||
) -> std::io::Result<()> { | ||
unix::tx(socket, consumer, gso).await | ||
} | ||
} | ||
}; | ||
} | ||
|
||
libc_msg!(msg, s2n_quic_platform_socket_msg); | ||
libc_msg!(mmsg, s2n_quic_platform_socket_mmsg); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::{ | ||
features::Gso, | ||
message::{simple::Message, Message as _}, | ||
socket::{ | ||
ring, task, | ||
task::{rx, tx}, | ||
}, | ||
syscall::SocketEvents, | ||
}; | ||
use core::task::{Context, Poll}; | ||
use tokio::{io, net::UdpSocket}; | ||
|
||
pub async fn rx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
producer: ring::Producer<Message>, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = UdpSocket::from_std(socket).unwrap(); | ||
let result = task::Receiver::new(producer, socket).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
pub async fn tx<S: Into<std::net::UdpSocket>>( | ||
socket: S, | ||
consumer: ring::Consumer<Message>, | ||
gso: Gso, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = UdpSocket::from_std(socket).unwrap(); | ||
let result = task::Sender::new(consumer, socket, gso).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl tx::Socket<Message> for UdpSocket { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn send( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [Message], | ||
events: &mut tx::Events, | ||
) -> io::Result<()> { | ||
let mut index = 0; | ||
while let Some(entry) = entries.get_mut(index) { | ||
let target = (*entry.remote_address()).into(); | ||
let payload = entry.payload_mut(); | ||
match self.poll_send_to(cx, payload, target) { | ||
Poll::Ready(Ok(_)) => { | ||
index += 1; | ||
if events.on_complete(1).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Ready(Err(err)) => { | ||
if events.on_error(err).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Pending => { | ||
events.blocked(); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl rx::Socket<Message> for UdpSocket { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn recv( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [Message], | ||
events: &mut rx::Events, | ||
) -> io::Result<()> { | ||
let mut index = 0; | ||
while let Some(entry) = entries.get_mut(index) { | ||
let payload = entry.payload_mut(); | ||
let mut buf = io::ReadBuf::new(payload); | ||
match self.poll_recv_from(cx, &mut buf) { | ||
Poll::Ready(Ok(addr)) => { | ||
unsafe { | ||
let len = buf.filled().len(); | ||
entry.set_payload_len(len); | ||
} | ||
entry.set_remote_address(&(addr.into())); | ||
|
||
index += 1; | ||
if events.on_complete(1).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Ready(Err(err)) => { | ||
if events.on_error(err).is_break() { | ||
return Ok(()); | ||
} | ||
} | ||
Poll::Pending => { | ||
events.blocked(); | ||
break; | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,124 @@ | ||
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::{ | ||
features::Gso, | ||
socket::{ | ||
ring, | ||
task::{rx, tx}, | ||
}, | ||
syscall::{SocketType, UnixMessage}, | ||
}; | ||
use core::task::{Context, Poll}; | ||
use std::{io, os::unix::io::AsRawFd}; | ||
use tokio::io::unix::AsyncFd; | ||
|
||
pub async fn rx<S: Into<std::net::UdpSocket>, M: UnixMessage + Unpin>( | ||
socket: S, | ||
producer: ring::Producer<M>, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = AsyncFd::new(socket).unwrap(); | ||
let result = rx::Receiver::new(producer, socket).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
pub async fn tx<S: Into<std::net::UdpSocket>, M: UnixMessage + Unpin>( | ||
socket: S, | ||
consumer: ring::Consumer<M>, | ||
gso: Gso, | ||
) -> io::Result<()> { | ||
let socket = socket.into(); | ||
socket.set_nonblocking(true).unwrap(); | ||
|
||
let socket = AsyncFd::new(socket).unwrap(); | ||
let result = tx::Sender::new(consumer, socket, gso).await; | ||
if let Some(err) = result { | ||
Err(err) | ||
} else { | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl<S: AsRawFd, M: UnixMessage> tx::Socket<M> for AsyncFd<S> { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn send( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [M], | ||
events: &mut tx::Events, | ||
) -> io::Result<()> { | ||
M::send(self.get_ref().as_raw_fd(), entries, events); | ||
|
||
if !events.is_blocked() { | ||
return Ok(()); | ||
} | ||
|
||
for i in 0..2 { | ||
match self.poll_write_ready(cx) { | ||
Poll::Ready(guard) => { | ||
let mut guard = guard?; | ||
if i == 0 { | ||
guard.clear_ready(); | ||
} else { | ||
events.take_blocked(); | ||
} | ||
} | ||
Poll::Pending => { | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl<S: AsRawFd, M: UnixMessage> rx::Socket<M> for AsyncFd<S> { | ||
type Error = io::Error; | ||
|
||
#[inline] | ||
fn recv( | ||
&mut self, | ||
cx: &mut Context, | ||
entries: &mut [M], | ||
events: &mut rx::Events, | ||
) -> io::Result<()> { | ||
M::recv( | ||
self.get_ref().as_raw_fd(), | ||
SocketType::NonBlocking, | ||
entries, | ||
events, | ||
); | ||
|
||
if !events.is_blocked() { | ||
return Ok(()); | ||
} | ||
|
||
for i in 0..2 { | ||
match self.poll_read_ready(cx) { | ||
Poll::Ready(guard) => { | ||
let mut guard = guard?; | ||
if i == 0 { | ||
guard.clear_ready(); | ||
} else { | ||
events.take_blocked(); | ||
} | ||
} | ||
Poll::Pending => { | ||
return Ok(()); | ||
} | ||
} | ||
} | ||
|
||
Ok(()) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.