Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed Jun 13, 2023
1 parent d12e56c commit da711aa
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
8 changes: 2 additions & 6 deletions quic/s2n-quic-platform/src/io/tokio/task/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,11 @@ impl tx::Socket<Message> for UdpSocket {
entries: &mut [Message],
events: &mut tx::Events,
) -> io::Result<()> {
let mut index = 0;
while let Some(entry) = entries.get_mut(index) {
for entry in entries {
let target = (*entry.remote_address()).into();
let payload = entry.payload_mut();
match self.poll_send_to(cx, payload, target) {
Poll::Ready(Ok(_)) => {
index += 1;
if events.on_complete(1).is_break() {
return Ok(());
}
Expand Down Expand Up @@ -93,8 +91,7 @@ impl rx::Socket<Message> for UdpSocket {
entries: &mut [Message],
events: &mut rx::Events,
) -> io::Result<()> {
let mut index = 0;
while let Some(entry) = entries.get_mut(index) {
for entry in entries {
let payload = entry.payload_mut();
let mut buf = io::ReadBuf::new(payload);
match self.poll_recv_from(cx, &mut buf) {
Expand All @@ -105,7 +102,6 @@ impl rx::Socket<Message> for UdpSocket {
}
entry.set_remote_address(&(addr.into()));

index += 1;
if events.on_complete(1).is_break() {
return Ok(());
}
Expand Down
26 changes: 26 additions & 0 deletions quic/s2n-quic-platform/src/io/tokio/task/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,25 @@ impl<S: AsRawFd, M: UnixMessage> tx::Socket<M> for AsyncFd<S> {
entries: &mut [M],
events: &mut tx::Events,
) -> io::Result<()> {
// Call the syscall for the socket
//
// NOTE: we usually wrap this in a `AsyncFdReadyGuard::try_io`. However, here we just
// assume the socket is ready in the general case and then fall back to querying
// socket readiness if it's not. This can avoid some things like having to construct
// a `std::io::Error` with `WouldBlock` and dereferencing the registration.
M::send(self.get_ref().as_raw_fd(), entries, events);

// yield back if we weren't blocked
if !events.is_blocked() {
return Ok(());
}

// * First iteration we need to clear socket readiness since the `send` call returned a
// `WouldBlock`.
// * Second iteration we need to register the waker, assuming the socket readiness was
// cleared.
// * If we got a `Ready` anyway, then clear the blocked status and have the caller try
// again.
for i in 0..2 {
match self.poll_write_ready(cx) {
Poll::Ready(guard) => {
Expand Down Expand Up @@ -92,17 +105,30 @@ impl<S: AsRawFd, M: UnixMessage> rx::Socket<M> for AsyncFd<S> {
entries: &mut [M],
events: &mut rx::Events,
) -> io::Result<()> {
// Call the syscall for the socket
//
// NOTE: we usually wrap this in a `AsyncFdReadyGuard::try_io`. However, here we just
// assume the socket is ready in the general case and then fall back to querying
// socket readiness if it's not. This can avoid some things like having to construct
// a `std::io::Error` with `WouldBlock` and dereferencing the registration.
M::recv(
self.get_ref().as_raw_fd(),
SocketType::NonBlocking,
entries,
events,
);

// yield back if we weren't blocked
if !events.is_blocked() {
return Ok(());
}

// * First iteration we need to clear socket readiness since the `recv` call returned a
// `WouldBlock`.
// * Second iteration we need to register the waker, assuming the socket readiness was
// cleared.
// * If we got a `Ready` anyway, then clear the blocked status and have the caller try
// again.
for i in 0..2 {
match self.poll_read_ready(cx) {
Poll::Ready(guard) => {
Expand Down

0 comments on commit da711aa

Please sign in to comment.