Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 151 additions & 14 deletions quinn-proto/src/cid_queue.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::ops::Range;
use std::{fmt::Debug, ops::Range};

use crate::{ConnectionId, ResetToken, frame::NewConnectionId};

/// DataType stored in CidQueue buffer
type CidData = (ConnectionId, Option<ResetToken>);
#[derive(Debug, Clone, Copy)]
struct CidData(ConnectionId, Option<ResetToken>);

/// Sliding window of active Connection IDs
///
/// May contain gaps due to packet loss or reordering
/// This represents a circular buffer that can contain gaps due to packet loss or reordering.
/// The buffer has three regions:
/// - Exactly one active CID at `self.buffer[self.cursor]`.
/// - Zero to `Self::LEN - 1` reserved CIDs from `self.cursor` up to `self.cursor_reserved`.
/// - More "available"/"ready" CIDs after `self.cursor_reserved`.
///
/// You grow the range of reserved CIDs by calling [`CidQueue::next_reserved`], which takes one
/// of the available ones and returns the CID that was reserved.
/// You add available/ready CIDs by calling [`CidQueue::insert`].
#[derive(Debug)]
pub(crate) struct CidQueue {
/// Ring buffer indexed by `self.cursor`
Expand All @@ -18,16 +27,20 @@ pub(crate) struct CidQueue {
///
/// The sequence number of the active CID; must be the smallest among CIDs in `buffer`.
offset: u64,
/// Circular index for the last reserved CID, i.e. a CID that is
/// not active, but was used for probing packets on a different remote address.
cursor_reserved: usize,
}

impl CidQueue {
pub(crate) fn new(cid: ConnectionId) -> Self {
let mut buffer = [None; Self::LEN];
buffer[0] = Some((cid, None));
buffer[0] = Some(CidData(cid, None));
Self {
buffer,
cursor: 0,
offset: 0,
cursor_reserved: 0,
}
}

Expand All @@ -40,9 +53,8 @@ impl CidQueue {
cid: NewConnectionId,
) -> Result<Option<(Range<u64>, ResetToken)>, InsertError> {
// Position of new CID wrt. the current active CID
let index = match cid.sequence.checked_sub(self.offset) {
None => return Err(InsertError::Retired),
Some(x) => x,
let Some(index) = cid.sequence.checked_sub(self.offset) else {
return Err(InsertError::Retired);
};

let retired_count = cid.retire_prior_to.saturating_sub(self.offset);
Expand All @@ -57,7 +69,7 @@ impl CidQueue {

// Record the new CID
let index = ((self.cursor as u64 + index) % Self::LEN as u64) as usize;
self.buffer[index] = Some((cid.id, Some(cid.reset_token)));
self.buffer[index] = Some(CidData(cid.id, Some(cid.reset_token)));

if retired_count == 0 {
return Ok(None);
Expand All @@ -67,11 +79,12 @@ impl CidQueue {
// retire_prior_to, and inform the caller that all prior CIDs have been retired, and of
// the new CID's reset token.
self.cursor = ((self.cursor as u64 + retired_count) % Self::LEN as u64) as usize;
let (i, (_, token)) = self
.iter()
let (i, CidData(_, token)) = self
.iter_from_active()
.next()
.expect("it is impossible to retire a CID without supplying a new one");
self.cursor = (self.cursor + i) % Self::LEN;
self.cursor_reserved = self.cursor;
let orig_offset = self.offset;
self.offset = cid.retire_prior_to + i as u64;
// We don't immediately retire CIDs in the range (orig_offset +
Expand All @@ -89,27 +102,58 @@ impl CidQueue {
/// Switch to next active CID if possible, return
/// 1) the corresponding ResetToken and 2) a non-empty range preceding it to retire
pub(crate) fn next(&mut self) -> Option<(ResetToken, Range<u64>)> {
let (i, cid_data) = self.iter().nth(1)?;
self.buffer[self.cursor] = None;
let (i, cid_data) = self.iter_from_reserved().nth(1)?;
for i in 0..=self.reserved_len() {
self.buffer[self.cursor + i] = None;
}

let orig_offset = self.offset;
self.offset += i as u64;
self.cursor = (self.cursor + i) % Self::LEN;
self.cursor_reserved = self.cursor;
Some((cid_data.1.unwrap(), orig_offset..self.offset))
}

/// Returns a CID from the available ones and marks it as reserved.
///
/// If there's no more CIDs in the ready set, this will return None.
/// CIDs marked as reserved will be skipped when the active one advances.
pub(crate) fn next_reserved(&mut self) -> Option<ConnectionId> {
let (i, cid_data) = self.iter_from_reserved().nth(1)?;

self.cursor_reserved = (self.cursor_reserved + i) % Self::LEN;
Some(cid_data.0)
}

/// Iterate CIDs in CidQueue that are not `None`, including the active CID
fn iter(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
fn iter_from_active(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(0..Self::LEN).filter_map(move |step| {
let index = (self.cursor + step) % Self::LEN;
self.buffer[index].map(|cid_data| (step, cid_data))
})
}

/// Iterate CIDs in CidQueue that are not `None`, including the active CID
fn iter_from_reserved(&self) -> impl Iterator<Item = (usize, CidData)> + '_ {
(0..(Self::LEN - self.reserved_len())).filter_map(move |step| {
let index = (self.cursor_reserved + step) % Self::LEN;
self.buffer[index].map(|cid_data| (step, cid_data))
})
}

/// The length of the internal buffer's section of CIDs that are marked as reserved.
fn reserved_len(&self) -> usize {
if self.cursor_reserved >= self.cursor {
self.cursor_reserved - self.cursor
} else {
self.cursor_reserved + Self::LEN - self.cursor
}
}

/// Replace the initial CID
pub(crate) fn update_initial_cid(&mut self, cid: ConnectionId) {
debug_assert_eq!(self.offset, 0);
self.buffer[self.cursor] = Some((cid, None));
self.buffer[self.cursor] = Some(CidData(cid, None));
}

/// Return active remote CID itself
Expand Down Expand Up @@ -167,6 +211,7 @@ mod tests {
}
assert!(q.next().is_none());
}

#[test]
fn next_sparse() {
let mut q = CidQueue::new(initial_cid());
Expand Down Expand Up @@ -301,4 +346,96 @@ mod tests {
assert_eq!(q.active(), initial_cid());
assert_eq!(q.active_seq(), 0);
}

#[test]
fn reserved_smoke() {
let mut q = CidQueue::new(initial_cid());
assert_eq!(q.next_reserved(), None);

let one = cid(1, 0);
q.insert(one).unwrap();
assert_eq!(q.next_reserved(), Some(one.id));

let two = cid(2, 2);
let (retired_range, reset_token) = q.insert(two).unwrap().unwrap();
assert_eq!(reset_token, two.reset_token);
assert_eq!(retired_range, 0..2);

assert_eq!(q.next_reserved(), None);

let four = cid(4, 2);
q.insert(four).unwrap();
println!("{q:?}");
assert_eq!(q.next_reserved(), Some(four.id));
assert_eq!(q.active(), two.id);

assert_eq!(q.next(), None);
}

#[test]
fn reserve_multiple() {
let mut q = CidQueue::new(initial_cid());
let one = cid(1, 0);
let two = cid(2, 0);
q.insert(one).unwrap();
q.insert(two).unwrap();
assert_eq!(q.next_reserved(), Some(one.id));
assert_eq!(q.next_reserved(), Some(two.id));
assert_eq!(q.next_reserved(), None);
}

#[test]
fn reserve_multiple_sparse() {
let mut q = CidQueue::new(initial_cid());
let two = cid(2, 0);
let four = cid(4, 0);
q.insert(two).unwrap();
q.insert(four).unwrap();
assert_eq!(q.next_reserved(), Some(two.id));
assert_eq!(q.next_reserved(), Some(four.id));
assert_eq!(q.next_reserved(), None);
}

#[test]
fn reserve_many_next_clears() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN {
q.insert(cid(i as u64, 0)).unwrap();
}

for _ in 0..CidQueue::LEN - 2 {
assert!(q.next_reserved().is_some());
}

q.next();
assert_eq!(q.next(), None);
}

#[test]
fn reserve_many_next_reserved_none() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN {
q.insert(cid(i as u64, 0)).unwrap();
}

for _ in 0..CidQueue::LEN - 1 {
assert!(q.next_reserved().is_some());
}

assert_eq!(q.next_reserved(), None);
}

#[test]
fn one_active_all_else_reserved_next_none() {
let mut q = CidQueue::new(initial_cid());
for i in 1..CidQueue::LEN {
q.insert(cid(i as u64, 0)).unwrap();
}

for _ in 0..CidQueue::LEN - 1 {
assert!(q.next_reserved().is_some());
}

assert_eq!(q.next(), None);
}
}
Loading