-
-
Notifications
You must be signed in to change notification settings - Fork 495
feat(datagrams): add sendable/readable futures and try_send/try_recv methods #2366
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
3bc56de
6b9bc15
9e03a85
e122d8c
4b007a1
7e7ce32
147df23
787caf8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -345,6 +345,14 @@ impl Connection { | |
| } | ||
| } | ||
|
|
||
| /// Creates a future, resolving as soon as a readable datagram is buffered | ||
|
||
| pub fn datagram_readable(&self) -> DatagramReadable<'_> { | ||
| DatagramReadable { | ||
| conn: &self.0, | ||
| notify: self.0.shared.datagram_received.notified(), | ||
| } | ||
| } | ||
|
|
||
| /// Receive an application datagram | ||
| pub fn read_datagram(&self) -> ReadDatagram<'_> { | ||
| ReadDatagram { | ||
|
|
@@ -353,6 +361,23 @@ impl Connection { | |
| } | ||
| } | ||
|
|
||
| /// Attempts to receive an application datagram | ||
| /// | ||
| /// If there are no readable datagrams, this will return [TryReceiveDatagramError::WouldBlock] | ||
| pub fn try_read_datagram(&self) -> Result<Bytes, TryReceiveDatagramError> { | ||
|
||
| let mut state = self.0.state.lock("try_read_datagram"); | ||
|
|
||
| if let Some(ref e) = state.error { | ||
| return Err(e.clone().into()); | ||
| } | ||
|
|
||
| state | ||
| .inner | ||
| .datagrams() | ||
| .recv() | ||
| .ok_or(TryReceiveDatagramError::WouldBlock) | ||
| } | ||
|
|
||
| /// Wait for the connection to be closed for any reason | ||
| /// | ||
| /// Despite the return type's name, closed connections are often not an error condition at the | ||
|
|
@@ -422,6 +447,15 @@ impl Connection { | |
| conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared); | ||
| } | ||
|
|
||
| /// Creates a future, resolving as soon as a Datagram with the given size in byte can be sent | ||
| pub fn datagram_sendable(&self, size: usize) -> DatagramSendable<'_> { | ||
| DatagramSendable { | ||
| conn: &self.0, | ||
| required_space: size, | ||
| notify: self.0.shared.datagrams_unblocked.notified(), | ||
| } | ||
| } | ||
|
|
||
| /// Transmit `data` as an unreliable, unordered application datagram | ||
| /// | ||
| /// Application datagrams are a low-level primitive. They may be lost or delivered out of order, | ||
|
|
@@ -450,6 +484,33 @@ impl Connection { | |
| } | ||
| } | ||
|
|
||
| /// Transmit `data` as an unreliable, unordered application datagram | ||
| /// | ||
| /// Application datagrams are a low-level primitive. They may be lost or delivered out of order, | ||
| /// and `data` must both fit inside a single QUIC packet and be smaller than the maximum | ||
| /// dictated by the peer. | ||
| /// | ||
| /// If the send buffer doesn't have enough available space, this will return [TrySendDatagramError::WouldBlock] | ||
| pub fn try_send_datagram(&self, data: Bytes) -> Result<(), TrySendDatagramError> { | ||
| let conn = &mut *self.0.state.lock("try_send_datagram"); | ||
| if let Some(ref x) = conn.error { | ||
| return Err(SendDatagramError::ConnectionLost(x.clone()).into()); | ||
| } | ||
| use proto::SendDatagramError::*; | ||
| match conn.inner.datagrams().send(data, false) { | ||
| Ok(()) => { | ||
| conn.wake(); | ||
| Ok(()) | ||
| } | ||
| Err(e) => Err(match e { | ||
| Blocked(bytes) => TrySendDatagramError::WouldBlock(bytes), | ||
| UnsupportedByPeer => SendDatagramError::UnsupportedByPeer.into(), | ||
| Disabled => SendDatagramError::Disabled.into(), | ||
| TooLarge => SendDatagramError::TooLarge.into(), | ||
| }), | ||
| } | ||
| } | ||
|
|
||
| /// Transmit `data` as an unreliable, unordered application datagram | ||
| /// | ||
| /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion | ||
|
|
@@ -821,6 +882,46 @@ impl Future for ReadDatagram<'_> { | |
| } | ||
| } | ||
|
|
||
| pin_project! { | ||
| /// Future produced by [`Connection::datagram_readable`] | ||
| pub struct DatagramReadable<'a> { | ||
| conn: &'a ConnectionRef, | ||
| #[pin] | ||
| notify: Notified<'a>, | ||
| } | ||
| } | ||
|
|
||
| impl Future for DatagramReadable<'_> { | ||
| type Output = Result<(), ConnectionError>; | ||
| fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| let mut this = self.project(); | ||
| let mut state = this.conn.state.lock("ReadDatagram::poll"); | ||
|
|
||
| // Check for buffered datagrams before checking `state.error` so that already-received | ||
| // datagrams, which are necessarily finite, can be drained from a closed connection. | ||
|
|
||
| if state.inner.datagrams().recv_buffered() > 0 { | ||
| return Poll::Ready(Ok(())); | ||
| } | ||
|
|
||
| if let Some(ref e) = state.error { | ||
| return Poll::Ready(Err(e.clone())); | ||
| } | ||
|
|
||
| loop { | ||
| // Poll next datagram received notification | ||
| match this.notify.as_mut().poll(ctx) { | ||
| // `state` lock ensures we didn't race with readiness | ||
| Poll::Pending => return Poll::Pending, | ||
| // Replace already used Notify from previous poll | ||
| Poll::Ready(()) => this | ||
| .notify | ||
| .set(this.conn.shared.datagram_received.notified()), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pin_project! { | ||
| /// Future produced by [`Connection::send_datagram_wait`] | ||
| pub struct SendDatagram<'a> { | ||
|
|
@@ -854,6 +955,7 @@ impl Future for SendDatagram<'_> { | |
| this.data.replace(data); | ||
| loop { | ||
| match this.notify.as_mut().poll(ctx) { | ||
| // `state` lock ensures we didn't race with readiness | ||
| Poll::Pending => return Poll::Pending, | ||
| // Spurious wakeup, get a new future | ||
| Poll::Ready(()) => this | ||
|
|
@@ -870,6 +972,59 @@ impl Future for SendDatagram<'_> { | |
| } | ||
| } | ||
|
|
||
| pin_project! { | ||
| /// Future produced by [`Connection::sendable_datagram`] | ||
| pub struct DatagramSendable<'a> { | ||
| conn: &'a ConnectionRef, | ||
| required_space: usize, | ||
| #[pin] | ||
| notify: Notified<'a>, | ||
| } | ||
| } | ||
|
|
||
| impl Future for DatagramSendable<'_> { | ||
| type Output = Result<(), SendDatagramError>; | ||
| fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| let mut this = self.project(); | ||
| let mut state = this.conn.state.lock("DatagramSendable::poll"); | ||
| if let Some(ref e) = state.error { | ||
| return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone()))); | ||
| } | ||
|
|
||
| // Check if peer support datagrams | ||
| let max = state | ||
| .inner | ||
| .datagrams() | ||
| .max_size() | ||
| .ok_or(SendDatagramError::UnsupportedByPeer)?; | ||
|
|
||
| if *this.required_space > max { | ||
| return Poll::Ready(Err(SendDatagramError::TooLarge)); | ||
| } | ||
|
|
||
| // Check if send can be satisfied | ||
| if state.inner.datagrams().send_buffer_space() > *this.required_space { | ||
| // We currently have enough space to satisfy the requirements | ||
| return Poll::Ready(Ok(())); | ||
| } | ||
|
|
||
| // Otherwise - set send_blocked and wait for the unblocked notification | ||
| state.inner.datagrams().set_send_blocked(); | ||
|
|
||
| loop { | ||
| // Poll next datagram unblocked | ||
| match this.notify.as_mut().poll(ctx) { | ||
| // `state` lock ensures we didn't race with readiness | ||
| Poll::Pending => return Poll::Pending, | ||
| // Replace already used Notify | ||
| Poll::Ready(()) => this | ||
| .notify | ||
| .set(this.conn.shared.datagrams_unblocked.notified()), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| pub(crate) struct ConnectionRef(Arc<ConnectionInner>); | ||
|
|
||
|
|
@@ -1313,6 +1468,37 @@ pub enum SendDatagramError { | |
| ConnectionLost(#[from] ConnectionError), | ||
| } | ||
|
|
||
| /// Errors that can arise when trying to send a datagram without blocking | ||
| #[derive(Debug, Error, Clone, Eq, PartialEq)] | ||
| pub enum TrySendDatagramError { | ||
| /// Send Would Block - contains the unsent Bytes | ||
| #[error("send would block")] | ||
| WouldBlock(Bytes), | ||
| /// Actual Error sending the Datagram | ||
| #[error(transparent)] | ||
| SendDatagramError(#[from] SendDatagramError), | ||
| } | ||
|
|
||
| /// Errors that can arise when trying to receive a datagram without blocking | ||
| #[derive(Debug, Error, Clone, PartialEq, Eq)] | ||
| pub enum TryReceiveDatagramError { | ||
| /// The operation would block | ||
| #[error("operation would block")] | ||
| WouldBlock, | ||
| /// A Connection error has occurred | ||
| #[error(transparent)] | ||
| ConnectionError(#[from] ConnectionError), | ||
| } | ||
|
|
||
| impl From<TryReceiveDatagramError> for io::Error { | ||
| fn from(err: TryReceiveDatagramError) -> Self { | ||
| match err { | ||
| TryReceiveDatagramError::ConnectionError(err) => err.into(), | ||
| TryReceiveDatagramError::WouldBlock => Self::new(io::ErrorKind::WouldBlock, err), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// The maximum amount of datagrams which will be produced in a single `drive_transmit` call | ||
| /// | ||
| /// This limits the amount of CPU resources consumed by datagram generation, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The send_blocked flag is not part of the public API, so this is not helpful documentation. Maybe try an imperative phrasing of what you wrote just below?