Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ impl Datagrams<'_> {
Ok(())
}

/// Marks a sender as blocked.
/// Triggers a `datagram_unblocked` notification once sending becomes possible again.
pub fn set_send_blocked(&mut self) {
self.conn.datagrams.send_blocked = true;
}

/// Compute the maximum size of datagrams that may passed to `send_datagram`
///
/// Returns `None` if datagrams are unsupported by the peer or disabled locally.
Expand Down Expand Up @@ -87,6 +93,11 @@ impl Datagrams<'_> {
self.conn.datagrams.recv()
}

/// Recv Bytes currently stored in the read buffer
pub fn recv_buffered(&self) -> usize {
self.conn.datagrams.recv_buffered
}

/// Bytes available in the outgoing datagram buffer
///
/// When greater than zero, [`send`](Self::send)ing a datagram of at most this size is
Expand Down
182 changes: 182 additions & 0 deletions quinn/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,14 @@ impl Connection {
}
}

/// Resolves as soon as a readable datagram is buffered
pub fn datagram_readable(&self) -> DatagramReadable<'_> {
DatagramReadable {
conn: &self.0,
notify: self.0.shared.datagram_received.notified(),
}
}

/// Receive an application datagram
pub fn read_datagram(&self) -> ReadDatagram<'_> {
ReadDatagram {
Expand All @@ -353,6 +361,19 @@ impl Connection {
}
}

/// Attempts to receive an application datagram
///
/// If there are no readable datagrams, this will return [TryReceiveDatagramError::WouldBlock]
pub fn try_read_datagram(&self) -> Result<Option<Bytes>, ConnectionError> {
let mut state = self.0.state.lock("try_read_datagram");

if let Some(ref e) = state.error {
return Err(e.clone());
}

Ok(state.inner.datagrams().recv())
}

/// Wait for the connection to be closed for any reason
///
/// Despite the return type's name, closed connections are often not an error condition at the
Expand Down Expand Up @@ -422,6 +443,15 @@ impl Connection {
conn.close(error_code, Bytes::copy_from_slice(reason), &self.0.shared);
}

/// Creates a future, resolving as soon as a Datagram with the given size in byte can be sent
pub fn datagram_sendable(&self, size: usize) -> DatagramSendable<'_> {
DatagramSendable {
conn: &self.0,
required_space: size,
notify: self.0.shared.datagrams_unblocked.notified(),
}
}

/// Transmit `data` as an unreliable, unordered application datagram
///
/// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
Expand Down Expand Up @@ -450,6 +480,33 @@ impl Connection {
}
}

/// Transmit `data` as an unreliable, unordered application datagram
///
/// Application datagrams are a low-level primitive. They may be lost or delivered out of order,
/// and `data` must both fit inside a single QUIC packet and be smaller than the maximum
/// dictated by the peer.
///
/// If the send buffer doesn't have enough available space, this will return [TrySendDatagramError::WouldBlock]
pub fn try_send_datagram(&self, data: Bytes) -> Result<(), TrySendDatagramError> {
let conn = &mut *self.0.state.lock("try_send_datagram");
if let Some(ref x) = conn.error {
return Err(SendDatagramError::ConnectionLost(x.clone()).into());
}
use proto::SendDatagramError::*;
match conn.inner.datagrams().send(data, false) {
Ok(()) => {
conn.wake();
Ok(())
}
Err(e) => Err(match e {
Blocked(bytes) => TrySendDatagramError::WouldBlock(bytes),
UnsupportedByPeer => SendDatagramError::UnsupportedByPeer.into(),
Disabled => SendDatagramError::Disabled.into(),
TooLarge => SendDatagramError::TooLarge.into(),
}),
}
}

/// Transmit `data` as an unreliable, unordered application datagram
///
/// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
Expand Down Expand Up @@ -821,6 +878,46 @@ impl Future for ReadDatagram<'_> {
}
}

pin_project! {
/// Future produced by [`Connection::datagram_readable`]
pub struct DatagramReadable<'a> {
conn: &'a ConnectionRef,
#[pin]
notify: Notified<'a>,
}
}

impl Future for DatagramReadable<'_> {
type Output = Result<(), ConnectionError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("ReadDatagram::poll");

// Check for buffered datagrams before checking `state.error` so that already-received
// datagrams, which are necessarily finite, can be drained from a closed connection.

if state.inner.datagrams().recv_buffered() > 0 {
return Poll::Ready(Ok(()));
}

if let Some(ref e) = state.error {
return Poll::Ready(Err(e.clone()));
}

loop {
// Poll next datagram received notification
match this.notify.as_mut().poll(ctx) {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Replace already used Notify from previous poll
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagram_received.notified()),
}
}
}
}

pin_project! {
/// Future produced by [`Connection::send_datagram_wait`]
pub struct SendDatagram<'a> {
Expand Down Expand Up @@ -854,6 +951,7 @@ impl Future for SendDatagram<'_> {
this.data.replace(data);
loop {
match this.notify.as_mut().poll(ctx) {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Spurious wakeup, get a new future
Poll::Ready(()) => this
Expand All @@ -870,6 +968,59 @@ impl Future for SendDatagram<'_> {
}
}

pin_project! {
/// Future produced by [`Connection::sendable_datagram`]
pub struct DatagramSendable<'a> {
conn: &'a ConnectionRef,
required_space: usize,
#[pin]
notify: Notified<'a>,
}
}

impl Future for DatagramSendable<'_> {
type Output = Result<(), SendDatagramError>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut state = this.conn.state.lock("DatagramSendable::poll");
if let Some(ref e) = state.error {
return Poll::Ready(Err(SendDatagramError::ConnectionLost(e.clone())));
}

// Check if peer support datagrams
let max = state
.inner
.datagrams()
.max_size()
.ok_or(SendDatagramError::UnsupportedByPeer)?;

if *this.required_space > max {
return Poll::Ready(Err(SendDatagramError::TooLarge));
}

// Check if send can be satisfied
if state.inner.datagrams().send_buffer_space() > *this.required_space {
// We currently have enough space to satisfy the requirements
return Poll::Ready(Ok(()));
}

// Otherwise - set send_blocked and wait for the unblocked notification
state.inner.datagrams().set_send_blocked();

loop {
// Poll next datagram unblocked
match this.notify.as_mut().poll(ctx) {
// `state` lock ensures we didn't race with readiness
Poll::Pending => return Poll::Pending,
// Replace already used Notify
Poll::Ready(()) => this
.notify
.set(this.conn.shared.datagrams_unblocked.notified()),
}
}
}
}

#[derive(Debug)]
pub(crate) struct ConnectionRef(Arc<ConnectionInner>);

Expand Down Expand Up @@ -1313,6 +1464,37 @@ pub enum SendDatagramError {
ConnectionLost(#[from] ConnectionError),
}

/// Errors that can arise when trying to send a datagram without blocking
#[derive(Debug, Error, Clone, Eq, PartialEq)]
pub enum TrySendDatagramError {
/// Send Would Block - contains the unsent Bytes
#[error("send would block")]
WouldBlock(Bytes),
/// Actual Error sending the Datagram
#[error(transparent)]
SendDatagramError(#[from] SendDatagramError),
}

/// Errors that can arise when trying to receive a datagram without blocking
#[derive(Debug, Error, Clone, PartialEq, Eq)]
pub enum TryReceiveDatagramError {
/// The operation would block
#[error("operation would block")]
WouldBlock,
/// A Connection error has occurred
#[error(transparent)]
ConnectionError(#[from] ConnectionError),
}

impl From<TryReceiveDatagramError> for io::Error {
fn from(err: TryReceiveDatagramError) -> Self {
match err {
TryReceiveDatagramError::ConnectionError(err) => err.into(),
TryReceiveDatagramError::WouldBlock => Self::new(io::ErrorKind::WouldBlock, err),
}
}
}

/// The maximum amount of datagrams which will be produced in a single `drive_transmit` call
///
/// This limits the amount of CPU resources consumed by datagram generation,
Expand Down
2 changes: 1 addition & 1 deletion quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub use udp;

pub use crate::connection::{
AcceptBi, AcceptUni, Connecting, Connection, OpenBi, OpenUni, ReadDatagram, SendDatagram,
SendDatagramError, ZeroRttAccepted,
SendDatagramError, TryReceiveDatagramError, TrySendDatagramError, ZeroRttAccepted,
};
pub use crate::endpoint::{Accept, Endpoint, EndpointStats};
pub use crate::incoming::{Incoming, IncomingFuture, RetryError};
Expand Down
Loading