diff --git a/quic/s2n-quic-core/src/crypto/tls/testing.rs b/quic/s2n-quic-core/src/crypto/tls/testing.rs index 91060f5a2a..63238d0bcf 100644 --- a/quic/s2n-quic-core/src/crypto/tls/testing.rs +++ b/quic/s2n-quic-core/src/crypto/tls/testing.rs @@ -8,7 +8,7 @@ use crate::{ crypto::{ header_crypto::{LONG_HEADER_MASK, SHORT_HEADER_MASK}, scatter, tls, - tls::ApplicationParameters, + tls::{ApplicationParameters, CipherSuite, TlsExportError, TlsSession}, CryptoSuite, HeaderKey, Key, }, endpoint, transport, @@ -110,6 +110,21 @@ impl CryptoSuite for Session { type RetryKey = crate::crypto::key::testing::Key; } +impl TlsSession for Session { + fn tls_exporter( + &self, + _label: &[u8], + _context: &[u8], + _output: &mut [u8], + ) -> Result<(), TlsExportError> { + Ok(()) + } + + fn cipher_suite(&self) -> CipherSuite { + CipherSuite::TLS_AES_128_GCM_SHA256 + } +} + #[derive(Debug)] pub struct TlsEndpoint where diff --git a/quic/s2n-quic-core/src/dc.rs b/quic/s2n-quic-core/src/dc.rs index d60110a1a7..4866c1d2cb 100644 --- a/quic/s2n-quic-core/src/dc.rs +++ b/quic/s2n-quic-core/src/dc.rs @@ -6,7 +6,7 @@ use crate::{ event::{api::SocketAddress, IntoEvent as _}, inet, path::MaxMtu, - transport::parameters::InitialFlowControlLimits, + transport::parameters::{DcSupportedVersions, InitialFlowControlLimits}, varint::VarInt, }; use core::time::Duration; @@ -14,16 +14,22 @@ use core::time::Duration; mod disabled; mod traits; +#[cfg(any(test, feature = "testing"))] +pub mod testing; + pub use disabled::*; pub use traits::*; +pub type Version = u32; + // dc versions supported by this code, in order of preference (SUPPORTED_VERSIONS[0] is most preferred) -const SUPPORTED_VERSIONS: &[u32] = &[0x0]; +pub const SUPPORTED_VERSIONS: [Version; 1] = [0x0]; /// Called on the server to select the dc version to use (if any) /// /// The server's version preference takes precedence -pub fn select_version(client_supported_versions: &[u32]) -> Option { +pub fn select_version(client_supported_versions: DcSupportedVersions) -> Option { + let client_supported_versions = client_supported_versions.into_iter().as_slice(); SUPPORTED_VERSIONS .iter() .find(|&supported_version| client_supported_versions.contains(supported_version)) @@ -48,7 +54,7 @@ impl<'a> ConnectionInfo<'a> { #[doc(hidden)] pub fn new( remote_address: &'a inet::SocketAddress, - dc_version: u32, + dc_version: Version, application_params: ApplicationParams, ) -> Self { Self { diff --git a/quic/s2n-quic-core/src/dc/disabled.rs b/quic/s2n-quic-core/src/dc/disabled.rs index 042b1d7b1a..e3498a9680 100644 --- a/quic/s2n-quic-core/src/dc/disabled.rs +++ b/quic/s2n-quic-core/src/dc/disabled.rs @@ -4,8 +4,9 @@ use crate::{ crypto::tls::TlsSession, dc::{ConnectionInfo, Endpoint, Path}, - stateless_reset::Token, + stateless_reset, }; +use alloc::vec::Vec; #[derive(Debug, Default)] pub struct Disabled(()); @@ -22,18 +23,14 @@ impl Endpoint for Disabled { // The Disabled Endpoint returns `None`, so this is not used impl Path for () { - fn on_path_secrets_ready(&mut self, _session: &impl TlsSession) { + fn on_path_secrets_ready(&mut self, _session: &impl TlsSession) -> Vec { unimplemented!() } fn on_peer_stateless_reset_tokens<'a>( &mut self, - _stateless_reset_tokens: impl Iterator, + _stateless_reset_tokens: impl Iterator, ) { unimplemented!() } - - fn stateless_reset_tokens(&mut self) -> &[Token] { - unimplemented!() - } } diff --git a/quic/s2n-quic-core/src/dc/testing.rs b/quic/s2n-quic-core/src/dc/testing.rs new file mode 100644 index 0000000000..98aec289d1 --- /dev/null +++ b/quic/s2n-quic-core/src/dc/testing.rs @@ -0,0 +1,51 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{crypto::tls::TlsSession, dc, dc::ConnectionInfo, stateless_reset}; + +pub struct MockDcEndpoint { + stateless_reset_tokens: Vec, +} + +impl MockDcEndpoint { + pub fn new(tokens: &[stateless_reset::Token]) -> Self { + Self { + stateless_reset_tokens: tokens.to_vec(), + } + } +} + +#[derive(Default)] +pub struct MockDcPath { + pub on_path_secrets_ready_count: u8, + pub on_peer_stateless_reset_tokens_count: u8, + pub stateless_reset_tokens: Vec, + pub peer_stateless_reset_tokens: Vec, +} + +impl dc::Endpoint for MockDcEndpoint { + type Path = MockDcPath; + + fn new_path(&mut self, _connection_info: &ConnectionInfo) -> Option { + Some(MockDcPath { + stateless_reset_tokens: self.stateless_reset_tokens.clone(), + ..Default::default() + }) + } +} + +impl dc::Path for MockDcPath { + fn on_path_secrets_ready(&mut self, _session: &impl TlsSession) -> Vec { + self.on_path_secrets_ready_count += 1; + self.stateless_reset_tokens.clone() + } + + fn on_peer_stateless_reset_tokens<'a>( + &mut self, + stateless_reset_tokens: impl Iterator, + ) { + self.on_peer_stateless_reset_tokens_count += 1; + self.peer_stateless_reset_tokens + .extend(stateless_reset_tokens); + } +} diff --git a/quic/s2n-quic-core/src/dc/traits.rs b/quic/s2n-quic-core/src/dc/traits.rs index 307300f077..5902455f57 100644 --- a/quic/s2n-quic-core/src/dc/traits.rs +++ b/quic/s2n-quic-core/src/dc/traits.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{crypto::tls::TlsSession, dc, stateless_reset}; +use alloc::vec::Vec; /// The `dc::Endpoint` trait provides a way to support dc functionality pub trait Endpoint: 'static + Send { @@ -21,24 +22,25 @@ pub trait Endpoint: 'static + Send { /// A dc path pub trait Path: 'static + Send { /// Called when path secrets are ready to be derived from the given `TlsSession` - fn on_path_secrets_ready(&mut self, session: &impl TlsSession); + /// + /// Returns the stateless reset tokens to include in a `DC_STATELESS_RESET_TOKENS` + /// frame sent to the peer. + fn on_path_secrets_ready(&mut self, session: &impl TlsSession) -> Vec; /// Called when a `DC_STATELESS_RESET_TOKENS` frame has been received from the peer fn on_peer_stateless_reset_tokens<'a>( &mut self, stateless_reset_tokens: impl Iterator, ); - - /// Returns the stateless reset tokens to include in a `DC_STATELESS_RESET_TOKENS` - /// frame sent to the peer. - fn stateless_reset_tokens(&mut self) -> &[stateless_reset::Token]; } impl Path for Option

{ #[inline] - fn on_path_secrets_ready(&mut self, session: &impl TlsSession) { + fn on_path_secrets_ready(&mut self, session: &impl TlsSession) -> Vec { if let Some(path) = self { path.on_path_secrets_ready(session) + } else { + Vec::default() } } @@ -51,13 +53,4 @@ impl Path for Option

{ path.on_peer_stateless_reset_tokens(stateless_reset_tokens) } } - - #[inline] - fn stateless_reset_tokens(&mut self) -> &[stateless_reset::Token] { - if let Some(path) = self { - path.stateless_reset_tokens() - } else { - &[] - } - } } diff --git a/quic/s2n-quic-core/src/event/generated.rs b/quic/s2n-quic-core/src/event/generated.rs index 7ebfb4c51a..dd5fbb3a4b 100644 --- a/quic/s2n-quic-core/src/event/generated.rs +++ b/quic/s2n-quic-core/src/event/generated.rs @@ -589,6 +589,16 @@ pub mod api { } #[derive(Clone, Debug)] #[non_exhaustive] + pub enum DcState { + #[non_exhaustive] + VersionNegotiated { version: u32 }, + #[non_exhaustive] + PathSecretsReady {}, + #[non_exhaustive] + Complete {}, + } + #[derive(Clone, Debug)] + #[non_exhaustive] #[doc = " Application level protocol"] pub struct ApplicationProtocolInformation<'a> { pub chosen_application_protocol: &'a [u8], @@ -1030,6 +1040,15 @@ pub mod api { } #[derive(Clone, Debug)] #[non_exhaustive] + #[doc = " The DC state has changed"] + pub struct DcStateChanged { + pub state: DcState, + } + impl Event for DcStateChanged { + const NAME: &'static str = "transport:dc_state_changed"; + } + #[derive(Clone, Debug)] + #[non_exhaustive] #[doc = " QUIC version"] pub struct VersionInformation<'a> { pub server_versions: &'a [u32], @@ -2233,6 +2252,17 @@ pub mod tracing { tracing :: event ! (target : "bbr_state_changed" , parent : id , tracing :: Level :: DEBUG , path_id = tracing :: field :: debug (path_id) , state = tracing :: field :: debug (state)); } #[inline] + fn on_dc_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + _meta: &api::ConnectionMeta, + event: &api::DcStateChanged, + ) { + let id = context.id(); + let api::DcStateChanged { state } = event; + tracing :: event ! (target : "dc_state_changed" , parent : id , tracing :: Level :: DEBUG , state = tracing :: field :: debug (state)); + } + #[inline] fn on_version_information( &mut self, meta: &api::EndpointMeta, @@ -3474,6 +3504,25 @@ pub mod builder { } } #[derive(Clone, Debug)] + pub enum DcState { + VersionNegotiated { version: u32 }, + PathSecretsReady, + Complete, + } + impl IntoEvent for DcState { + #[inline] + fn into_event(self) -> api::DcState { + use api::DcState::*; + match self { + Self::VersionNegotiated { version } => VersionNegotiated { + version: version.into_event(), + }, + Self::PathSecretsReady => PathSecretsReady {}, + Self::Complete => Complete {}, + } + } + } + #[derive(Clone, Debug)] #[doc = " Application level protocol"] pub struct ApplicationProtocolInformation<'a> { pub chosen_application_protocol: &'a [u8], @@ -4253,6 +4302,20 @@ pub mod builder { } } #[derive(Clone, Debug)] + #[doc = " The DC state has changed"] + pub struct DcStateChanged { + pub state: DcState, + } + impl IntoEvent for DcStateChanged { + #[inline] + fn into_event(self) -> api::DcStateChanged { + let DcStateChanged { state } = self; + api::DcStateChanged { + state: state.into_event(), + } + } + } + #[derive(Clone, Debug)] #[doc = " QUIC version"] pub struct VersionInformation<'a> { pub server_versions: &'a [u32], @@ -5214,6 +5277,18 @@ mod traits { let _ = meta; let _ = event; } + #[doc = "Called when the `DcStateChanged` event is triggered"] + #[inline] + fn on_dc_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + event: &DcStateChanged, + ) { + let _ = context; + let _ = meta; + let _ = event; + } #[doc = "Called when the `VersionInformation` event is triggered"] #[inline] fn on_version_information(&mut self, meta: &EndpointMeta, event: &VersionInformation) { @@ -5846,6 +5921,16 @@ mod traits { (self.1).on_bbr_state_changed(&mut context.1, meta, event); } #[inline] + fn on_dc_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &ConnectionMeta, + event: &DcStateChanged, + ) { + (self.0).on_dc_state_changed(&mut context.0, meta, event); + (self.1).on_dc_state_changed(&mut context.1, meta, event); + } + #[inline] fn on_version_information(&mut self, meta: &EndpointMeta, event: &VersionInformation) { (self.0).on_version_information(meta, event); (self.1).on_version_information(meta, event); @@ -6230,6 +6315,8 @@ mod traits { fn on_pacing_rate_updated(&mut self, event: builder::PacingRateUpdated); #[doc = "Publishes a `BbrStateChanged` event to the publisher's subscriber"] fn on_bbr_state_changed(&mut self, event: builder::BbrStateChanged); + #[doc = "Publishes a `DcStateChanged` event to the publisher's subscriber"] + fn on_dc_state_changed(&mut self, event: builder::DcStateChanged); #[doc = r" Returns the QUIC version negotiated for the current connection, if any"] fn quic_version(&self) -> u32; #[doc = r" Returns the [`Subject`] for the current publisher"] @@ -6652,6 +6739,15 @@ mod traits { self.subscriber.on_event(&self.meta, &event); } #[inline] + fn on_dc_state_changed(&mut self, event: builder::DcStateChanged) { + let event = event.into_event(); + self.subscriber + .on_dc_state_changed(self.context, &self.meta, &event); + self.subscriber + .on_connection_event(self.context, &self.meta, &event); + self.subscriber.on_event(&self.meta, &event); + } + #[inline] fn quic_version(&self) -> u32 { self.quic_version } @@ -6710,6 +6806,7 @@ pub mod testing { pub delivery_rate_sampled: u32, pub pacing_rate_updated: u32, pub bbr_state_changed: u32, + pub dc_state_changed: u32, pub version_information: u32, pub endpoint_packet_sent: u32, pub endpoint_packet_received: u32, @@ -6790,6 +6887,7 @@ pub mod testing { delivery_rate_sampled: 0, pacing_rate_updated: 0, bbr_state_changed: 0, + dc_state_changed: 0, version_information: 0, endpoint_packet_sent: 0, endpoint_packet_received: 0, @@ -7278,6 +7376,17 @@ pub mod testing { self.output.push(format!("{meta:?} {event:?}")); } } + fn on_dc_state_changed( + &mut self, + _context: &mut Self::ConnectionContext, + meta: &api::ConnectionMeta, + event: &api::DcStateChanged, + ) { + self.dc_state_changed += 1; + if self.location.is_some() { + self.output.push(format!("{meta:?} {event:?}")); + } + } fn on_version_information( &mut self, meta: &api::EndpointMeta, @@ -7421,6 +7530,7 @@ pub mod testing { pub delivery_rate_sampled: u32, pub pacing_rate_updated: u32, pub bbr_state_changed: u32, + pub dc_state_changed: u32, pub version_information: u32, pub endpoint_packet_sent: u32, pub endpoint_packet_received: u32, @@ -7491,6 +7601,7 @@ pub mod testing { delivery_rate_sampled: 0, pacing_rate_updated: 0, bbr_state_changed: 0, + dc_state_changed: 0, version_information: 0, endpoint_packet_sent: 0, endpoint_packet_received: 0, @@ -7888,6 +7999,13 @@ pub mod testing { self.output.push(format!("{event:?}")); } } + fn on_dc_state_changed(&mut self, event: builder::DcStateChanged) { + self.dc_state_changed += 1; + let event = event.into_event(); + if self.location.is_some() { + self.output.push(format!("{event:?}")); + } + } fn quic_version(&self) -> u32 { 1 } diff --git a/quic/s2n-quic-core/src/lib.rs b/quic/s2n-quic-core/src/lib.rs index b3fc390d6d..c0f811aa97 100644 --- a/quic/s2n-quic-core/src/lib.rs +++ b/quic/s2n-quic-core/src/lib.rs @@ -22,6 +22,7 @@ pub mod counter; pub mod crypto; pub mod ct; pub mod datagram; +#[cfg(feature = "alloc")] pub mod dc; pub mod endpoint; pub mod event; diff --git a/quic/s2n-quic-events/events/common.rs b/quic/s2n-quic-events/events/common.rs index fc5f0e0d60..82d4c22c59 100644 --- a/quic/s2n-quic-events/events/common.rs +++ b/quic/s2n-quic-events/events/common.rs @@ -972,3 +972,9 @@ enum BbrState { ProbeBwUp, ProbeRtt, } + +enum DcState { + VersionNegotiated { version: u32 }, + PathSecretsReady, + Complete, +} diff --git a/quic/s2n-quic-events/events/connection.rs b/quic/s2n-quic-events/events/connection.rs index 423cb31416..98a06f426f 100644 --- a/quic/s2n-quic-events/events/connection.rs +++ b/quic/s2n-quic-events/events/connection.rs @@ -337,3 +337,9 @@ struct BbrStateChanged { path_id: u64, state: BbrState, } + +#[event("transport:dc_state_changed")] +/// The DC state has changed +struct DcStateChanged { + state: DcState, +} diff --git a/quic/s2n-quic-transport/src/dc.rs b/quic/s2n-quic-transport/src/dc.rs new file mode 100644 index 0000000000..ec4f6c5418 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc.rs @@ -0,0 +1,6 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod manager; + +pub use manager::Manager; diff --git a/quic/s2n-quic-transport/src/dc/manager.rs b/quic/s2n-quic-transport/src/dc/manager.rs new file mode 100644 index 0000000000..59db0a1afe --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager.rs @@ -0,0 +1,239 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + contexts::WriteContext, + endpoint, + sync::{flag, flag::Writer}, +}; +use s2n_quic_core::{ + ack, + crypto::tls, + dc, + dc::{Endpoint, Path}, + ensure, event, + event::builder::{DcState, DcStateChanged}, + frame::DcStatelessResetTokens, + packet::number::PacketNumber, + state::{event, is}, + stateless_reset, transmission, + transmission::interest::Query, +}; + +/// Manages transmission and receipt of `DC_STATELESS_RESET_TOKENS` and +/// notifications to the dc::Endpoint Path +pub struct Manager { + path: Option<<::DcEndpoint as Endpoint>::Path>, + version: Option, + state: State, + stateless_reset_token_sync: Flag, +} + +type Flag = flag::Flag; + +#[derive(Clone, Debug, PartialEq, Eq)] +enum State { + InitServer, + InitClient, + ServerPathSecretsReady, + ClientPathSecretsReady, + ServerTokensSent, + Complete, +} + +impl State { + is!(is_init, InitServer | InitClient); + is!( + is_path_secrets_ready, + ServerPathSecretsReady | ClientPathSecretsReady + ); + is!(is_server_tokens_sent, ServerTokensSent); + is!(is_complete, Complete); + + event! { + on_path_secrets_ready( + InitServer => ServerPathSecretsReady, + InitClient => ClientPathSecretsReady + ); + on_peer_stateless_reset_tokens( + ClientPathSecretsReady => Complete, + ServerPathSecretsReady => ServerTokensSent + ); + on_stateless_reset_tokens_acked(ServerTokensSent => Complete); + } +} + +impl Manager { + /// Constructs a new `dc::Manager` with the optional given path + /// + /// If path is `None`, the `dc::Manager` will be disabled + pub fn new( + path: Option<<::DcEndpoint as Endpoint>::Path>, + version: dc::Version, + publisher: &mut Pub, + ) -> Self { + ensure!(path.is_some(), Self::disabled()); + + publisher.on_dc_state_changed(DcStateChanged { + state: DcState::VersionNegotiated { version }, + }); + let state = if Config::ENDPOINT_TYPE.is_server() { + State::InitServer + } else { + State::InitClient + }; + + Self { + path, + version: Some(version), + state, + stateless_reset_token_sync: Flag::default(), + } + } + + /// Returns a disabled `dc::Manager` + pub fn disabled() -> Self { + Self { + path: None, + version: None, + state: State::Complete, + stateless_reset_token_sync: Flag::default(), + } + } + + /// The dc version that was negotiated, if any + /// + /// Returns `None` if no version was negotiated or the `dc::Endpoint` did + /// not initialize a path for the connection + pub fn version(&self) -> Option { + self.version + } + + /// Called when the TLS session has indicated path secrets are ready + /// to be derived for the dc path + /// + /// Initiates sending of the `DC_STATELESS_RESET_TOKENS` frame on the client + pub fn on_path_secrets_ready( + &mut self, + session: &impl tls::TlsSession, + publisher: &mut Pub, + ) { + ensure!(self.state.on_path_secrets_ready().is_ok()); + + let tokens = self.path.on_path_secrets_ready(session); + let flag = Flag::new(DcStatelessResetTokenWriter::new(tokens)); + self.stateless_reset_token_sync = flag; + + if Config::ENDPOINT_TYPE.is_client() { + // Start sending the `DC_STATELESS_RESET_TOKENS` frame on the client + // The server will wait until it has received tokens from the client + // before sending its own, though typically this will happen immediately + // after path secrets are ready + self.stateless_reset_token_sync.send(); + } + + publisher.on_dc_state_changed(DcStateChanged { + state: DcState::PathSecretsReady, + }) + } + + /// Called when a `DC_STATELESS_RESET_TOKENS` frame is received from the peer + /// + /// On the client, this completes the dc path handshake + /// On the server, this initiates sending the server's `DC_STATELESS_RESET_TOKENS` frame + pub fn on_peer_dc_stateless_reset_tokens<'a, Pub: event::ConnectionPublisher>( + &mut self, + stateless_reset_tokens: impl Iterator, + publisher: &mut Pub, + ) { + ensure!(self.state.on_peer_stateless_reset_tokens().is_ok()); + + self.path + .on_peer_stateless_reset_tokens(stateless_reset_tokens); + if Config::ENDPOINT_TYPE.is_server() { + self.stateless_reset_token_sync.send(); + } else { + publisher.on_dc_state_changed(DcStateChanged { + state: DcState::Complete, + }); + } + } + + /// Called when a range of packets have been acknowledged + pub fn on_packet_ack( + &mut self, + ack_set: &A, + publisher: &mut Pub, + ) { + ensure!(self.stateless_reset_token_sync.on_packet_ack(ack_set)); + ensure!(self.state.on_stateless_reset_tokens_acked().is_ok()); + + debug_assert!(Config::ENDPOINT_TYPE.is_server()); + publisher.on_dc_state_changed(DcStateChanged { + state: DcState::Complete, + }); + } + + /// Called when a range of packets has been lost + pub fn on_packet_loss(&mut self, ack_set: &A) { + self.stateless_reset_token_sync.on_packet_loss(ack_set); + } + + #[cfg(any(test, feature = "testing"))] + pub fn path(&self) -> &<::DcEndpoint as Endpoint>::Path { + self.path.as_ref().expect("path should be specified") + } +} + +impl transmission::Provider for Manager { + fn on_transmit(&mut self, context: &mut W) { + let _ = self.stateless_reset_token_sync.on_transmit(context); + } +} + +impl transmission::interest::Provider for Manager { + fn transmission_interest(&self, query: &mut Q) -> transmission::interest::Result { + let result = self.stateless_reset_token_sync.transmission_interest(query); + #[cfg(debug_assertions)] + if result.is_err() { + if Config::ENDPOINT_TYPE.is_server() { + // The server should only have transmission interest in the server tokens sent state + assert!(self.state.is_server_tokens_sent()); + } else { + // The client should only have transmission interest in the path secrets are ready state + assert!(self.state.is_path_secrets_ready()); + } + } + result + } +} + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +struct DcStatelessResetTokenWriter { + tokens: Vec, +} + +impl DcStatelessResetTokenWriter { + fn new(tokens: Vec) -> Self { + Self { tokens } + } +} + +impl Writer for DcStatelessResetTokenWriter { + fn write_frame(&mut self, context: &mut W) -> Option { + match DcStatelessResetTokens::new(self.tokens.as_slice()) { + Ok(frame) => context.write_frame(&frame), + Err(error) => { + debug_assert!( + false, + "The dc provider produced invalid stateless reset tokens: {}", + error + ); + None + } + } + } +} + +#[cfg(test)] +mod tests; diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__disabled.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__disabled.snap new file mode 100644 index 0000000000..6e3aa896a0 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__disabled.snap @@ -0,0 +1,5 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- + diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__new.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__new.snap new file mode 100644 index 0000000000..999f047b57 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__new.snap @@ -0,0 +1,6 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: VersionNegotiated { version: 1 } } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_ack_client.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_ack_client.snap new file mode 100644 index 0000000000..ce5dbd2707 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_ack_client.snap @@ -0,0 +1,6 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_ack_server.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_ack_server.snap new file mode 100644 index 0000000000..b3c2c92849 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_ack_server.snap @@ -0,0 +1,7 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } +DcStateChanged { state: Complete } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_loss.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_loss.snap new file mode 100644 index 0000000000..ce5dbd2707 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_packet_loss.snap @@ -0,0 +1,6 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_path_secrets_ready.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_path_secrets_ready.snap new file mode 100644 index 0000000000..29cba608b6 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_path_secrets_ready.snap @@ -0,0 +1,8 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_peer_dc_stateless_reset_tokens_client.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_peer_dc_stateless_reset_tokens_client.snap new file mode 100644 index 0000000000..b3c2c92849 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_peer_dc_stateless_reset_tokens_client.snap @@ -0,0 +1,7 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } +DcStateChanged { state: Complete } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_peer_dc_stateless_reset_tokens_server.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_peer_dc_stateless_reset_tokens_server.snap new file mode 100644 index 0000000000..ce5dbd2707 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/quic__s2n-quic-transport__src__dc__manager__tests__events__on_peer_dc_stateless_reset_tokens_server.snap @@ -0,0 +1,6 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "" +--- +DcStateChanged { state: VersionNegotiated { version: 1 } } +DcStateChanged { state: PathSecretsReady } diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/s2n_quic_transport__dc__manager__tests__dot_test.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/s2n_quic_transport__dc__manager__tests__dot_test.snap new file mode 100644 index 0000000000..5591683069 --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/s2n_quic_transport__dc__manager__tests__dot_test.snap @@ -0,0 +1,17 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "State::dot()" +--- +digraph { + ClientPathSecretsReady; + Complete; + InitClient; + InitServer; + ServerPathSecretsReady; + ServerTokensSent; + InitServer -> ServerPathSecretsReady [label = "on_path_secrets_ready"]; + InitClient -> ClientPathSecretsReady [label = "on_path_secrets_ready"]; + ClientPathSecretsReady -> Complete [label = "on_peer_stateless_reset_tokens"]; + ServerPathSecretsReady -> ServerTokensSent [label = "on_peer_stateless_reset_tokens"]; + ServerTokensSent -> Complete [label = "on_stateless_reset_tokens_acked"]; +} diff --git a/quic/s2n-quic-transport/src/dc/manager/snapshots/s2n_quic_transport__dc__manager__tests__snapshots.snap b/quic/s2n-quic-transport/src/dc/manager/snapshots/s2n_quic_transport__dc__manager__tests__snapshots.snap new file mode 100644 index 0000000000..ca4a3aba5a --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/snapshots/s2n_quic_transport__dc__manager__tests__snapshots.snap @@ -0,0 +1,110 @@ +--- +source: quic/s2n-quic-transport/src/dc/manager/tests.rs +expression: "State::test_transitions()" +--- +{ + ClientPathSecretsReady: { + on_path_secrets_ready: Err( + InvalidTransition { + current: ClientPathSecretsReady, + event: "on_path_secrets_ready", + }, + ), + on_peer_stateless_reset_tokens: Ok( + Complete, + ), + on_stateless_reset_tokens_acked: Err( + InvalidTransition { + current: ClientPathSecretsReady, + event: "on_stateless_reset_tokens_acked", + }, + ), + }, + Complete: { + on_path_secrets_ready: Err( + InvalidTransition { + current: Complete, + event: "on_path_secrets_ready", + }, + ), + on_peer_stateless_reset_tokens: Err( + InvalidTransition { + current: Complete, + event: "on_peer_stateless_reset_tokens", + }, + ), + on_stateless_reset_tokens_acked: Err( + NoOp { + current: Complete, + }, + ), + }, + InitClient: { + on_path_secrets_ready: Ok( + ClientPathSecretsReady, + ), + on_peer_stateless_reset_tokens: Err( + InvalidTransition { + current: InitClient, + event: "on_peer_stateless_reset_tokens", + }, + ), + on_stateless_reset_tokens_acked: Err( + InvalidTransition { + current: InitClient, + event: "on_stateless_reset_tokens_acked", + }, + ), + }, + InitServer: { + on_path_secrets_ready: Ok( + ServerPathSecretsReady, + ), + on_peer_stateless_reset_tokens: Err( + InvalidTransition { + current: InitServer, + event: "on_peer_stateless_reset_tokens", + }, + ), + on_stateless_reset_tokens_acked: Err( + InvalidTransition { + current: InitServer, + event: "on_stateless_reset_tokens_acked", + }, + ), + }, + ServerPathSecretsReady: { + on_path_secrets_ready: Err( + InvalidTransition { + current: ServerPathSecretsReady, + event: "on_path_secrets_ready", + }, + ), + on_peer_stateless_reset_tokens: Ok( + ServerTokensSent, + ), + on_stateless_reset_tokens_acked: Err( + InvalidTransition { + current: ServerPathSecretsReady, + event: "on_stateless_reset_tokens_acked", + }, + ), + }, + ServerTokensSent: { + on_path_secrets_ready: Err( + InvalidTransition { + current: ServerTokensSent, + event: "on_path_secrets_ready", + }, + ), + on_peer_stateless_reset_tokens: Err( + InvalidTransition { + current: ServerTokensSent, + event: "on_peer_stateless_reset_tokens", + }, + ), + on_stateless_reset_tokens_acked: Ok( + Complete, + ), + }, +} diff --git a/quic/s2n-quic-transport/src/dc/manager/tests.rs b/quic/s2n-quic-transport/src/dc/manager/tests.rs new file mode 100644 index 0000000000..10a19b97ab --- /dev/null +++ b/quic/s2n-quic-transport/src/dc/manager/tests.rs @@ -0,0 +1,272 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use crate::{ + contexts::testing::MockWriteContext, + endpoint::testing::{Client, Server}, +}; +use insta::{assert_debug_snapshot, assert_snapshot}; +use s2n_quic_core::{ + crypto::tls::testing::Session, + dc::testing::MockDcPath, + event::testing::Publisher, + frame::Frame, + packet::number::{PacketNumberRange, PacketNumberSpace}, + stateless_reset::token::testing::{TEST_TOKEN_1, TEST_TOKEN_2, TEST_TOKEN_3}, + time::clock::testing::now, + transmission::{interest::Provider as _, writer::testing::OutgoingFrameBuffer, Provider as _}, + varint::VarInt, +}; + +#[test] +fn new() { + let mut publisher = Publisher::snapshot(); + let manager: Manager = Manager::new(Some(MockDcPath::default()), 1, &mut publisher); + + assert!(matches!(manager.state, State::InitServer)); + assert_eq!(Some(1), manager.version); + + let manager: Manager = Manager::new(Some(MockDcPath::default()), 1, &mut publisher); + + assert!(matches!(manager.state, State::InitClient)); + assert_eq!(Some(1), manager.version); +} + +#[test] +fn disabled() { + let mut publisher = Publisher::snapshot(); + let manager: Manager = Manager::disabled(); + assert_eq!(None, manager.version()); + assert!(manager.state.is_complete()); + assert!(!manager.has_transmission_interest()); + + let mut manager: Manager = Manager::new(None, 1, &mut publisher); + let ack_set = &PacketNumberRange::new(pn(1), pn(2)); + let mut frame_buffer = OutgoingFrameBuffer::new(); + let mut context = MockWriteContext::new( + now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Client, + ); + + assert_eq!(None, manager.version()); + assert!(manager.state.is_complete()); + assert!(!manager.has_transmission_interest()); + + // verify calling all the methods doesn't panic + manager.on_peer_dc_stateless_reset_tokens([TEST_TOKEN_1].iter(), &mut publisher); + manager.on_path_secrets_ready(&Session, &mut publisher); + manager.on_packet_ack(ack_set, &mut publisher); + manager.on_packet_loss(ack_set); + manager.on_transmit(&mut context); + + // state is still complete + assert!(manager.state.is_complete()); +} + +#[test] +fn on_path_secrets_ready() { + let mut publisher = Publisher::snapshot(); + let path = MockDcPath::default(); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + + manager.on_path_secrets_ready(&Session, &mut publisher); + + assert_eq!(1, manager.path().on_path_secrets_ready_count); + assert!(manager.state.is_path_secrets_ready()); + // Server doesn't transmit until it receives tokens from the client + assert!(!manager.has_transmission_interest()); + + let path = MockDcPath::default(); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + + manager.on_path_secrets_ready(&Session, &mut publisher); + + assert_eq!(1, manager.path().on_path_secrets_ready_count); + assert!(manager.state.is_path_secrets_ready()); + // Client starts transmitting as soon as path secrets are ready + assert!(manager.has_transmission_interest()); +} + +#[test] +fn on_peer_dc_stateless_reset_tokens_server() { + let mut publisher = Publisher::snapshot(); + let path = MockDcPath::default(); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + + on_peer_dc_stateless_reset_tokens(&mut manager, &mut publisher); +} + +#[test] +fn on_peer_dc_stateless_reset_tokens_client() { + let mut publisher = Publisher::snapshot(); + let path = MockDcPath::default(); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + + on_peer_dc_stateless_reset_tokens(&mut manager, &mut publisher); +} + +fn on_peer_dc_stateless_reset_tokens( + manager: &mut Manager, + publisher: &mut Publisher, +) where + Config: endpoint::Config, + Endpoint: dc::Endpoint, +{ + let tokens = [TEST_TOKEN_1, TEST_TOKEN_2, TEST_TOKEN_3]; + + manager.on_peer_dc_stateless_reset_tokens(tokens.iter(), publisher); + + // peer tokens were delivered too early + assert_eq!(0, manager.path().on_peer_stateless_reset_tokens_count); + assert!(manager.path().peer_stateless_reset_tokens.is_empty()); + + // Now path secrets are ready, so the peer tokens are received + manager.on_path_secrets_ready(&Session, publisher); + + manager.on_peer_dc_stateless_reset_tokens(tokens.iter(), publisher); + + assert_eq!(1, manager.path().on_peer_stateless_reset_tokens_count); + assert_eq!( + tokens.as_slice(), + manager.path().peer_stateless_reset_tokens.as_slice() + ); + + if Config::ENDPOINT_TYPE.is_server() { + assert!(manager.state.is_server_tokens_sent()); + } else { + assert!(manager.state.is_complete()); + } + + // Receiving the peer tokens again doesn't call the provider again + manager.on_peer_dc_stateless_reset_tokens(tokens.iter(), publisher); + assert_eq!(1, manager.path().on_peer_stateless_reset_tokens_count); +} + +#[test] +fn on_packet_ack_client() { + let mut publisher = Publisher::snapshot(); + let mut path = MockDcPath::default(); + let tokens = [TEST_TOKEN_1, TEST_TOKEN_2]; + path.stateless_reset_tokens.extend(tokens); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + on_packet_ack(&mut manager, tokens.as_slice(), &mut publisher); + + // Client completes when it has received stateless reset tokens from the peer + assert!(!manager.state.is_complete()); +} + +#[test] +fn on_packet_ack_server() { + let mut publisher = Publisher::snapshot(); + let mut path = MockDcPath::default(); + let tokens = [TEST_TOKEN_1, TEST_TOKEN_2]; + path.stateless_reset_tokens.extend(tokens); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + on_packet_ack(&mut manager, tokens.as_slice(), &mut publisher); + + // Server completes when its stateless reset tokens are acked + assert!(manager.state.is_complete()); +} + +fn on_packet_ack( + manager: &mut Manager, + tokens: &[stateless_reset::Token], + publisher: &mut Publisher, +) where + Config: endpoint::Config, + Endpoint: dc::Endpoint, +{ + let expected_frame = + Frame::DcStatelessResetTokens(DcStatelessResetTokens::new(tokens).unwrap()); + + let mut frame_buffer = OutgoingFrameBuffer::new(); + let mut context = MockWriteContext::new( + now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Client, + ); + let pn = context.packet_number(); + + manager.on_path_secrets_ready(&Session, publisher); + + if Config::ENDPOINT_TYPE.is_server() { + // Receive tokens on the server to trigger sending + manager.on_peer_dc_stateless_reset_tokens([TEST_TOKEN_3].iter(), publisher); + } + assert!(manager.has_transmission_interest()); + + manager.on_transmit(&mut context); + // We no longer have transmission interest, but DC_STATELESS_RESET_TOKENS will still + // be transmitted passively until one is acked + assert!(!manager.has_transmission_interest()); + assert_eq!( + expected_frame, + context.frame_buffer.pop_front().unwrap().as_frame() + ); + + manager.on_transmit(&mut context); + + // Same DC_STATELESS_RESET_TOKENS frame is written + assert_eq!( + expected_frame, + context.frame_buffer.pop_front().unwrap().as_frame() + ); + + // Ack the first one + manager.on_packet_ack(&PacketNumberRange::new(pn, pn), publisher); + + assert!(!manager.has_transmission_interest()); +} + +#[test] +fn on_packet_loss() { + let mut publisher = Publisher::snapshot(); + let mut path = MockDcPath::default(); + let tokens = [TEST_TOKEN_1, TEST_TOKEN_2]; + path.stateless_reset_tokens.extend(tokens); + let mut manager: Manager = Manager::new(Some(path), 1, &mut publisher); + let mut frame_buffer = OutgoingFrameBuffer::new(); + let mut context = MockWriteContext::new( + now(), + &mut frame_buffer, + transmission::Constraint::None, + transmission::Mode::Normal, + endpoint::Type::Client, + ); + let pn = context.packet_number(); + + manager.on_path_secrets_ready(&Session, &mut publisher); + assert!(manager.has_transmission_interest()); + + manager.on_transmit(&mut context); + assert!(!manager.has_transmission_interest()); + + // DC_STATELESS_RESET_TOKENS frame was lost + manager.on_packet_loss(&PacketNumberRange::new(pn, pn)); + + // so now we have transmission interest again + assert!(manager.has_transmission_interest()); +} + +#[test] +#[cfg_attr(miri, ignore)] +fn snapshots() { + assert_debug_snapshot!(State::test_transitions()); +} + +#[test] +#[cfg_attr(miri, ignore)] +fn dot_test() { + assert_snapshot!(State::dot()); +} + +/// Creates an application space packet number with the given value +fn pn(nr: usize) -> PacketNumber { + PacketNumberSpace::ApplicationData.new_packet_number(VarInt::new(nr as u64).unwrap()) +} diff --git a/quic/s2n-quic-transport/src/endpoint/mod.rs b/quic/s2n-quic-transport/src/endpoint/mod.rs index a7848864ee..dfd975be46 100644 --- a/quic/s2n-quic-transport/src/endpoint/mod.rs +++ b/quic/s2n-quic-transport/src/endpoint/mod.rs @@ -29,6 +29,8 @@ use s2n_quic_core::{ }, crypto::{tls, tls::Endpoint as _, CryptoSuite, InitialKey}, datagram::{Endpoint as DatagramEndpoint, PreConnectionInfo}, + dc, + dc::Endpoint as _, endpoint::{limits::Outcome, Limiter as _}, event::{ self, supervisor, ConnectionPublisher, EndpointPublisher as _, IntoEvent, Subscriber as _, @@ -42,7 +44,7 @@ use s2n_quic_core::{ stateless_reset::token::{Generator as _, LEN as StatelessResetTokenLen}, time::{Clock, Timestamp}, token::{self, Format}, - transport::parameters::ClientTransportParameters, + transport::parameters::{ClientTransportParameters, DcSupportedVersions}, }; pub mod close; @@ -1063,6 +1065,11 @@ impl Endpoint { .try_into() .unwrap(); + if Cfg::DcEndpoint::ENABLED { + transport_parameters.dc_supported_versions = + DcSupportedVersions::for_client(dc::SUPPORTED_VERSIONS); + } + //= https://www.rfc-editor.org/rfc/rfc9000#section-7.2 //# The Destination Connection ID field from the first Initial packet //# sent by a client is used to determine packet protection keys for @@ -1154,7 +1161,7 @@ pub mod testing { type PathMigrationValidator = path::migration::allow_all::Validator; type PacketInterceptor = s2n_quic_core::packet::interceptor::Disabled; type DatagramEndpoint = s2n_quic_core::datagram::Disabled; - type DcEndpoint = s2n_quic_core::dc::Disabled; + type DcEndpoint = s2n_quic_core::dc::testing::MockDcEndpoint; fn context(&mut self) -> super::Context { todo!() @@ -1185,7 +1192,7 @@ pub mod testing { type PathMigrationValidator = path::migration::allow_all::Validator; type PacketInterceptor = s2n_quic_core::packet::interceptor::Disabled; type DatagramEndpoint = s2n_quic_core::datagram::Disabled; - type DcEndpoint = s2n_quic_core::dc::Disabled; + type DcEndpoint = s2n_quic_core::dc::testing::MockDcEndpoint; fn context(&mut self) -> super::Context { todo!() diff --git a/quic/s2n-quic-transport/src/lib.rs b/quic/s2n-quic-transport/src/lib.rs index 35bea9a4e0..40cb9e6d1e 100644 --- a/quic/s2n-quic-transport/src/lib.rs +++ b/quic/s2n-quic-transport/src/lib.rs @@ -9,6 +9,7 @@ extern crate alloc; mod ack; mod contexts; +mod dc; mod processed_packet; mod space; mod sync; diff --git a/quic/s2n-quic-transport/src/space/application.rs b/quic/s2n-quic-transport/src/space/application.rs index f94e7a44d1..06461184bc 100644 --- a/quic/s2n-quic-transport/src/space/application.rs +++ b/quic/s2n-quic-transport/src/space/application.rs @@ -4,7 +4,7 @@ use crate::{ ack::AckManager, connection::{self, ConnectionTransmissionContext, ProcessingError}, - endpoint, path, + dc, endpoint, path, path::{path_event, Path}, processed_packet::ProcessedPacket, recovery, @@ -24,12 +24,13 @@ use s2n_codec::EncoderBuffer; use s2n_quic_core::{ counter::{Counter, Saturating}, crypto::{application::KeySet, limited, tls, CryptoSuite}, + dc::Endpoint as _, event::{self, ConnectionPublisher as _, IntoEvent}, frame::{ ack::AckRanges, crypto::CryptoRef, datagram::DatagramRef, stream::StreamRef, Ack, - ConnectionClose, DataBlocked, HandshakeDone, MaxData, MaxStreamData, MaxStreams, - NewConnectionId, NewToken, PathChallenge, PathResponse, ResetStream, RetireConnectionId, - StopSending, StreamDataBlocked, StreamsBlocked, + ConnectionClose, DataBlocked, DcStatelessResetTokens, HandshakeDone, MaxData, + MaxStreamData, MaxStreams, NewConnectionId, NewToken, PathChallenge, PathResponse, + ResetStream, RetireConnectionId, StopSending, StreamDataBlocked, StreamsBlocked, }, inet::DatagramInfo, packet::{ @@ -75,6 +76,7 @@ pub struct ApplicationSpace { processed_packet_numbers: SlidingWindow, recovery_manager: recovery::Manager, pub datagram_manager: datagram::Manager, + pub dc_manager: dc::Manager, /// Counter used for detecting an Optimistic Ack attack skip_counter: Option>, /// Keeps track of if the TLS session still exists. If it does, we buffer @@ -106,6 +108,7 @@ impl ApplicationSpace { keep_alive: KeepAlive, max_mtu: MaxMtu, datagram_manager: datagram::Manager, + dc_manager: dc::Manager, ) -> Self { let key_set = KeySet::new(key, Self::key_limits(max_mtu)); @@ -122,6 +125,7 @@ impl ApplicationSpace { processed_packet_numbers: SlidingWindow::default(), recovery_manager: recovery::Manager::new(PacketNumberSpace::ApplicationData), datagram_manager, + dc_manager, skip_counter: None, buffer_crypto_frames: Config::ENDPOINT_TYPE.is_client(), } @@ -223,6 +227,7 @@ impl ApplicationSpace { &mut self.recovery_manager, &mut self.crypto_stream, &mut self.datagram_manager, + &mut self.dc_manager, ), timestamp: context.timestamp, transmission_constraint, @@ -574,6 +579,7 @@ impl ApplicationSpace { path_id, path_manager, tx_packet_numbers: &mut self.tx_packet_numbers, + dc_manager: &mut self.dc_manager, }, ) } @@ -724,6 +730,7 @@ impl transmission::interest::Provider for ApplicationS self.recovery_manager.transmission_interest(query)?; self.stream_manager.transmission_interest(query)?; self.datagram_manager.transmission_interest(query)?; + self.dc_manager.transmission_interest(query)?; Ok(()) } } @@ -744,6 +751,7 @@ struct RecoveryContext<'a, Config: endpoint::Config> { path_id: path::Id, path_manager: &'a mut path::Manager, tx_packet_numbers: &'a mut TxPacketNumbers, + dc_manager: &'a mut dc::Manager, } impl<'a, Config: endpoint::Config> recovery::Context for RecoveryContext<'a, Config> { @@ -801,6 +809,8 @@ impl<'a, Config: endpoint::Config> recovery::Context for RecoveryContext ) { self.handshake_status .on_packet_ack(packet_number_range, publisher); + self.dc_manager + .on_packet_ack(packet_number_range, publisher); self.crypto_stream.on_packet_ack(packet_number_range); self.ping.on_packet_ack(packet_number_range); self.stream_manager.on_packet_ack(packet_number_range); @@ -822,6 +832,7 @@ impl<'a, Config: endpoint::Config> recovery::Context for RecoveryContext self.crypto_stream.on_packet_loss(packet_number_range); self.handshake_status .on_packet_loss(packet_number_range, publisher); + self.dc_manager.on_packet_loss(packet_number_range); self.ping.on_packet_loss(packet_number_range); self.stream_manager.on_packet_loss(packet_number_range); self.local_id_registry.on_packet_loss(packet_number_range); @@ -1128,6 +1139,23 @@ impl PacketSpace for ApplicationSpace Ok(()) } + fn handle_dc_stateless_reset_tokens_frame( + &mut self, + frame: DcStatelessResetTokens, + publisher: &mut Pub, + ) -> Result<(), transport::Error> { + if Config::DcEndpoint::ENABLED { + self.dc_manager + .on_peer_dc_stateless_reset_tokens(frame.into_iter(), publisher); + } else { + return Err(transport::Error::PROTOCOL_VIOLATION + .with_reason("Invalid frame") + .with_frame_type(frame.tag())); + } + + Ok(()) + } + fn on_processed_packet( &mut self, processed_packet: ProcessedPacket, diff --git a/quic/s2n-quic-transport/src/space/mod.rs b/quic/s2n-quic-transport/src/space/mod.rs index 45e3a48656..511d2e9f39 100644 --- a/quic/s2n-quic-transport/src/space/mod.rs +++ b/quic/s2n-quic-transport/src/space/mod.rs @@ -24,9 +24,9 @@ use s2n_quic_core::{ event::{self, IntoEvent}, frame::{ ack::AckRanges, crypto::CryptoRef, datagram::DatagramRef, stream::StreamRef, Ack, - ConnectionClose, DataBlocked, HandshakeDone, MaxData, MaxStreamData, MaxStreams, - NewConnectionId, NewToken, PathChallenge, PathResponse, ResetStream, RetireConnectionId, - StopSending, StreamDataBlocked, StreamsBlocked, + ConnectionClose, DataBlocked, DcStatelessResetTokens, HandshakeDone, MaxData, + MaxStreamData, MaxStreams, NewConnectionId, NewToken, PathChallenge, PathResponse, + ResetStream, RetireConnectionId, StopSending, StreamDataBlocked, StreamsBlocked, }, inet::DatagramInfo, packet::number::{PacketNumber, PacketNumberSpace}, @@ -789,6 +789,16 @@ pub trait PacketSpace: Sized { .with_frame_type(frame.tag().into())) } + fn handle_dc_stateless_reset_tokens_frame( + &mut self, + frame: DcStatelessResetTokens, + _publisher: &mut Pub, + ) -> Result<(), transport::Error> { + Err(transport::Error::PROTOCOL_VIOLATION + .with_reason(Self::INVALID_FRAME_ERROR) + .with_frame_type(frame.tag())) + } + default_frame_handler!(handle_data_blocked_frame, DataBlocked); default_frame_handler!(handle_max_data_frame, MaxData); default_frame_handler!(handle_max_stream_data_frame, MaxStreamData); @@ -1048,10 +1058,8 @@ pub trait PacketSpace: Sized { } Frame::DcStatelessResetTokens(frame) => { let on_error = on_frame_processed!(frame); - Err(on_error( - transport::Error::PROTOCOL_VIOLATION.with_reason("invalid frame"), - ))? - // TODO: Process DcStatelessResetTokens in dc provider + self.handle_dc_stateless_reset_tokens_frame(frame, publisher) + .map_err(on_error)?; } } diff --git a/quic/s2n-quic-transport/src/space/session_context.rs b/quic/s2n-quic-transport/src/space/session_context.rs index 3826100d27..07b9da81bf 100644 --- a/quic/s2n-quic-transport/src/space/session_context.rs +++ b/quic/s2n-quic-transport/src/space/session_context.rs @@ -22,6 +22,8 @@ use s2n_quic_core::{ crypto::{tls, tls::ApplicationParameters, CryptoSuite, Key}, ct::ConstantTimeEq, datagram::{ConnectionInfo, Endpoint}, + dc, + dc::Endpoint as _, event, event::IntoEvent, packet::number::PacketNumberSpace, @@ -30,8 +32,8 @@ use s2n_quic_core::{ self, parameters::{ ActiveConnectionIdLimit, ClientTransportParameters, DatagramLimits, - InitialFlowControlLimits, InitialSourceConnectionId, MaxAckDelay, - ServerTransportParameters, + DcSupportedVersions, InitialFlowControlLimits, InitialSourceConnectionId, MaxAckDelay, + ServerTransportParameters, TransportParameter as _, }, Error, }, @@ -72,6 +74,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> ActiveConnectionIdLimit, DatagramLimits, MaxAckDelay, + Option, ), transport::Error, > { @@ -185,11 +188,24 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> let active_connection_id_limit = peer_parameters.active_connection_id_limit; let datagram_limits = peer_parameters.datagram_limits(); + let dc_version = if Config::DcEndpoint::ENABLED { + peer_parameters + .dc_supported_versions + .selected_version() + .map_err(|_| { + transport::Error::TRANSPORT_PARAMETER_ERROR + .with_reason("invalid dc supported versions") + })? + } else { + None + }; + Ok(( initial_flow_control_limits, active_connection_id_limit, datagram_limits, peer_parameters.max_ack_delay, + dc_version, )) } @@ -203,6 +219,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> ActiveConnectionIdLimit, DatagramLimits, MaxAckDelay, + Option, ), transport::Error, > { @@ -243,11 +260,18 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> let active_connection_id_limit = peer_parameters.active_connection_id_limit; let datagram_limits = peer_parameters.datagram_limits(); + let dc_version = if Config::DcEndpoint::ENABLED { + dc::select_version(peer_parameters.dc_supported_versions) + } else { + None + }; + Ok(( initial_flow_control_limits, active_connection_id_limit, datagram_limits, peer_parameters.max_ack_delay, + dc_version, )) } @@ -375,11 +399,16 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> // Parse transport parameters let param_decoder = DecoderBuffer::new(application_parameters.transport_parameters); - let (peer_flow_control_limits, active_connection_id_limit, datagram_limits, max_ack_delay) = - match Config::ENDPOINT_TYPE { - endpoint::Type::Client => self.on_server_params(param_decoder)?, - endpoint::Type::Server => self.on_client_params(param_decoder)?, - }; + let ( + peer_flow_control_limits, + active_connection_id_limit, + datagram_limits, + max_ack_delay, + dc_version, + ) = match Config::ENDPOINT_TYPE { + endpoint::Type::Client => self.on_server_params(param_decoder)?, + endpoint::Type::Server => self.on_client_params(param_decoder)?, + }; self.local_id_registry .set_active_connection_id_limit(active_connection_id_limit.as_u64()); @@ -411,7 +440,19 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> datagram_limits.max_datagram_payload, ); - // TODO: call self.dc.new_path(..) + let max_mtu = self.path_manager.max_mtu(); + + let dc_manager = if let Some(dc_version) = dc_version { + let application_params = + dc::ApplicationParams::new(max_mtu, &peer_flow_control_limits, self.limits); + let remote_address = self.path_manager.active_path().remote_address().0; + let conn_info = + dc::ConnectionInfo::new(&remote_address, dc_version, application_params); + let dc_path = self.dc.new_path(&conn_info); + crate::dc::Manager::new(dc_path, dc_version, self.publisher) + } else { + crate::dc::Manager::disabled() + }; self.path_manager .active_path_mut() @@ -419,7 +460,6 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> .on_max_ack_delay(max_ack_delay); let cipher_suite = key.cipher_suite().into_event(); - let max_mtu = self.path_manager.max_mtu(); *self.application = Some(Box::new(ApplicationSpace::new( key, header_key, @@ -429,6 +469,7 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> keep_alive, max_mtu, datagram_manager, + dc_manager, ))); self.publisher.on_key_update(event::builder::KeyUpdate { key_type: event::builder::KeyType::OneRtt { generation: 0 }, @@ -466,6 +507,12 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> &mut self, session: &impl tls::TlsSession, ) -> Result<(), transport::Error> { + self.application + .as_mut() + .expect("application keys should be ready before the tls exporter") + .dc_manager + .on_path_secrets_ready(session, self.publisher); + self.publisher .on_tls_exporter_ready(event::builder::TlsExporterReady { session: s2n_quic_core::event::TlsSession::new(session), @@ -610,13 +657,31 @@ impl<'a, Config: endpoint::Config, Pub: event::ConnectionPublisher> fn on_client_application_params( &mut self, - _client_params: ApplicationParameters, - _server_params: &mut Vec, + client_params: ApplicationParameters, + server_params: &mut Vec, ) -> Result<(), Error> { debug_assert!(Config::ENDPOINT_TYPE.is_server()); - // TODO: Append `DcSupportedVersion` based on dc negotiation - // DcSupportedVersions::for_server(1).append_to_buffer(server_params) + if Config::DcEndpoint::ENABLED { + let param_decoder = DecoderBuffer::new(client_params.transport_parameters); + let (client_params, remaining) = ClientTransportParameters::decode(param_decoder) + .map_err(|_| { + //= https://www.rfc-editor.org/rfc/rfc9000#section-7.4 + //# An endpoint SHOULD treat receipt of + //# duplicate transport parameters as a connection error of type + //# TRANSPORT_PARAMETER_ERROR. + transport::Error::TRANSPORT_PARAMETER_ERROR + .with_reason("Invalid transport parameters") + })?; + + debug_assert_eq!(remaining.len(), 0); + + if let Some(selected_version) = dc::select_version(client_params.dc_supported_versions) + { + DcSupportedVersions::for_server(selected_version).append_to_buffer(server_params) + } + } + Ok(()) } } diff --git a/quic/s2n-quic-transport/src/sync/flag.rs b/quic/s2n-quic-transport/src/sync/flag.rs index ec9d146982..96410339f9 100644 --- a/quic/s2n-quic-transport/src/sync/flag.rs +++ b/quic/s2n-quic-transport/src/sync/flag.rs @@ -3,10 +3,12 @@ //! Sends a "flag" frame towards the peer //! -//! This is intended to be used by frames, like PING and HANDSHAKE_DONE, that don't have any +//! This can be used by frames, like PING and HANDSHAKE_DONE, that don't have any //! content other than the frame tag itself. At the cost of a single byte per packet, it will passively //! transmit the flag in any outgoing packets until the peer ACKs the frame. This is to increase -//! the likelihood the peer receives the flag, even in a high-loss environment. +//! the likelihood the peer receives the flag, even in a high-loss environment. It may also be used +//! by frames that do have content, such as DC_STATELESS_RESET_TOKENS, that similarly require aggressive +//! transmission to increase the likelihood the peer receives the frame. use crate::{ contexts::{OnTransmitError, WriteContext}, @@ -65,6 +67,14 @@ impl Default for DeliveryState { } impl Flag { + /// Constructs a flag with the given `writer` + pub fn new(writer: W) -> Self { + Self { + writer, + ..Default::default() + } + } + /// Returns `true` if the flag hasn't been sent pub fn is_idle(&self) -> bool { matches!(self.delivery, DeliveryState::Idle) diff --git a/quic/s2n-quic-transport/src/transmission/application.rs b/quic/s2n-quic-transport/src/transmission/application.rs index a7338824ef..b6cc2abc70 100644 --- a/quic/s2n-quic-transport/src/transmission/application.rs +++ b/quic/s2n-quic-transport/src/transmission/application.rs @@ -5,7 +5,7 @@ use crate::{ ack::AckManager, connection, contexts::WriteContext, - endpoint, path, + dc, endpoint, path, path::mtu, recovery, space::{datagram, CryptoStream, HandshakeStatus}, @@ -38,6 +38,7 @@ impl<'a, Config: endpoint::Config> Payload<'a, Config> { recovery_manager: &'a mut recovery::Manager, crypto_stream: &'a mut CryptoStream, datagram_manager: &'a mut datagram::Manager, + dc_manager: &'a mut dc::Manager, ) -> Self { if transmission_mode != Mode::PathValidationOnly { debug_assert_eq!(path_id, path_manager.active_path_id()); @@ -55,6 +56,7 @@ impl<'a, Config: endpoint::Config> Payload<'a, Config> { recovery_manager, crypto_stream, datagram_manager, + dc_manager, prioritize_datagrams: false, }) } @@ -108,6 +110,7 @@ pub struct Normal<'a, Config: endpoint::Config> { recovery_manager: &'a mut recovery::Manager, crypto_stream: &'a mut CryptoStream, datagram_manager: &'a mut datagram::Manager, + dc_manager: &'a mut dc::Manager, prioritize_datagrams: bool, } @@ -172,6 +175,10 @@ impl<'a, Config: endpoint::Config> Normal<'a, Config> { // soon as possible self.handshake_status.on_transmit(context); + // send DC_STATELESS_RESET_FRAMES frames next, if needed, to ensure the dc handshake can + // complete as soon as possible + self.dc_manager.on_transmit(context); + let _ = self.crypto_stream.tx.on_transmit((), context); //= https://www.rfc-editor.org/rfc/rfc9000#section-8.2 @@ -203,6 +210,7 @@ impl<'a, Config: endpoint::Config> transmission::interest::Provider for Normal<' .active_path() .transmission_interest(query)?; self.ping.transmission_interest(query)?; + self.dc_manager.transmission_interest(query)?; Ok(()) } } diff --git a/quic/s2n-quic/src/client/builder.rs b/quic/s2n-quic/src/client/builder.rs index 21c0343612..8360a1106a 100644 --- a/quic/s2n-quic/src/client/builder.rs +++ b/quic/s2n-quic/src/client/builder.rs @@ -269,7 +269,7 @@ impl Builder { ClientProviders ); - #[cfg(feature = "unstable-provider-dc")] + #[cfg(any(test, feature = "unstable-provider-dc"))] impl_provider_method!( /// Sets the dc provider for the [`Client`] with_dc, diff --git a/quic/s2n-quic/src/server/builder.rs b/quic/s2n-quic/src/server/builder.rs index ede1ebcfaa..03394e0e5c 100644 --- a/quic/s2n-quic/src/server/builder.rs +++ b/quic/s2n-quic/src/server/builder.rs @@ -319,7 +319,7 @@ impl Builder { ServerProviders ); - #[cfg(feature = "unstable-provider-dc")] + #[cfg(any(test, feature = "unstable-provider-dc"))] impl_provider_method!( /// Sets the dc provider for the [`Server`] with_dc, diff --git a/quic/s2n-quic/src/tests.rs b/quic/s2n-quic/src/tests.rs index b12243622a..14e90fea98 100644 --- a/quic/s2n-quic/src/tests.rs +++ b/quic/s2n-quic/src/tests.rs @@ -46,6 +46,8 @@ mod skip_packets; #[cfg(not(target_os = "windows"))] mod client_handshake_confirm; #[cfg(not(target_os = "windows"))] +mod dc; +#[cfg(not(target_os = "windows"))] mod mtls; mod exporter; diff --git a/quic/s2n-quic/src/tests/dc.rs b/quic/s2n-quic/src/tests/dc.rs new file mode 100644 index 0000000000..2630ac9f23 --- /dev/null +++ b/quic/s2n-quic/src/tests/dc.rs @@ -0,0 +1,224 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::*; +use crate::{client, client::ClientProviders, server, server::ServerProviders}; +use s2n_quic_core::{ + dc::testing::MockDcEndpoint, + event::{api::DcState, Timestamp}, + stateless_reset::token::testing::{TEST_TOKEN_1, TEST_TOKEN_2}, +}; + +// Client Server +// +// Initial[0]: CRYPTO[CH (DC_SUPPORTED_VERSIONS[3,2,1]] -> +// +// # dc_state_changed: state=VersionNegotiated +// Initial[0]: CRYPTO[SH] ACK[0] +// Handshake[0]: CRYPTO[EE (DC_SUPPORTED_VERSIONS[3], CERT, CV, FIN] +// +// # dc_state_changed: state=VersionNegotiated +// # handshake_status_updated: status=Complete +// # dc_state_changed: state=PathSecretsReady +// Initial[1]: ACK[0] +// Handshake[0]: CRYPTO[FIN], ACK[0] -> +// 1-RTT[0]: DC_STATELESS_RESET_TOKENS[..] +// +// # dc_state_changed: state=PathSecretsReady +// # handshake_status_updated: status=Complete +// # handshake_status_updated: status=Confirmed +// # key_space_discarded: space=Handshake +// <- 1-RTT[1]: HANDSHAKE_DONE, ACK[0], DC_STATELESS_RESET_TOKENS[..] +// +// # handshake_status_updated: status=HandshakeDoneAcked +// # handshake_status_updated: status=Confirmed +// # key_space_discarded: space=Handshake +// # dc_state_changed: state=Complete +// 1-RTT[1]: ACK[1] -> +// # handshake_status_updated: status=HandshakeDoneAcked +// # dc_state_changed: state=Complete +#[test] +fn dc_handshake_self_test() { + let server = Server::builder().with_tls(SERVER_CERTS).unwrap(); + let client = Client::builder().with_tls(certificates::CERT_PEM).unwrap(); + + self_test(server, client); +} + +// Client Server +// +// Initial[0]: CRYPTO[CH (DC_SUPPORTED_VERSIONS[3,2,1]] -> +// +// # dc_state_changed: state=VersionNegotiated +// Initial[0]: CRYPTO[SH] ACK[0] +// Handshake[0]: CRYPTO[EE (DC_SUPPORTED_VERSIONS[3], CERT, CV, FIN] +// +// # dc_state_changed: state=VersionNegotiated +// # handshake_status_updated: status=Complete +// # dc_state_changed: state=PathSecretsReady +// Initial[1]: ACK[0] +// Handshake[0]: CRYPTO[CERT, CV, FIN], ACK[0] -> +// 1-RTT[0]: DC_STATELESS_RESET_TOKENS[..] +// +// # dc_state_changed: state=PathSecretsReady +// # handshake_status_updated: status=Complete +// # handshake_status_updated: status=Confirmed +// # key_space_discarded: space=Handshake +// <- 1-RTT[1]: HANDSHAKE_DONE, ACK[0], DC_STATELESS_RESET_TOKENS[..] +// +// # handshake_status_updated: status=HandshakeDoneAcked +// # handshake_status_updated: status=Confirmed +// # key_space_discarded: space=Handshake +// # dc_state_changed: state=Complete +// 1-RTT[1]: ACK[1] -> +// # handshake_status_updated: status=HandshakeDoneAcked +// # dc_state_changed: state=Complete +#[test] +fn dc_mtls_handshake_self_test() { + let server_tls = build_server_mtls_provider(certificates::MTLS_CA_CERT).unwrap(); + let server = Server::builder().with_tls(server_tls).unwrap(); + + let client_tls = build_client_mtls_provider(certificates::MTLS_CA_CERT).unwrap(); + let client = Client::builder().with_tls(client_tls).unwrap(); + + self_test(server, client); +} + +fn self_test( + server: server::Builder, + client: client::Builder, +) { + let model = Model::default(); + let rtt = Duration::from_millis(100); + model.set_delay(rtt / 2); + const LEN: usize = 1000; + + let server_subscriber = DcStateChanged::new(); + let server_events = server_subscriber.clone(); + let client_subscriber = DcStateChanged::new(); + let client_events = client_subscriber.clone(); + let server_tokens = [TEST_TOKEN_1]; + let client_tokens = [TEST_TOKEN_2]; + + test(model, |handle| { + let mut server = server + .with_io(handle.builder().build()?)? + .with_event((tracing_events(), server_subscriber))? + .with_random(Random::with_seed(456))? + .with_dc(MockDcEndpoint::new(&server_tokens))? + .start()?; + + let addr = server.local_addr()?; + spawn(async move { + let mut conn = server.accept().await.unwrap(); + let mut stream = conn.open_bidirectional_stream().await.unwrap(); + stream.send(vec![42; LEN].into()).await.unwrap(); + stream.flush().await.unwrap(); + }); + + let client = client + .with_io(handle.builder().build().unwrap())? + .with_event((tracing_events(), client_subscriber))? + .with_random(Random::with_seed(456))? + .with_dc(MockDcEndpoint::new(&client_tokens))? + .start()?; + + primary::spawn(async move { + let connect = Connect::new(addr).with_server_name("localhost"); + let mut conn = client.connect(connect).await.unwrap(); + let mut stream = conn.accept_bidirectional_stream().await.unwrap().unwrap(); + + let mut recv_len = 0; + while let Some(chunk) = stream.receive().await.unwrap() { + recv_len += chunk.len(); + } + assert_eq!(LEN, recv_len); + }); + + Ok(addr) + }) + .unwrap(); + + let server_events = server_events.events().lock().unwrap().clone(); + let client_events = client_events.events().lock().unwrap().clone(); + + // 3 state transitions (VersionNegotiated -> PathSecretsReady -> Complete) + assert_eq!(3, server_events.len()); + assert_eq!(3, client_events.len()); + + for events in [server_events.clone(), client_events.clone()] { + if let DcState::VersionNegotiated { version, .. } = events[0].state { + assert_eq!(version, s2n_quic_core::dc::SUPPORTED_VERSIONS[0]); + } else { + panic!("VersionNegotiated should be the first dc state"); + } + + assert!(matches!(events[1].state, DcState::PathSecretsReady { .. })); + assert!(matches!(events[2].state, DcState::Complete { .. })); + } + + // Server path secrets are ready in 1.5 RTTs measured from the start of the test, since it takes + // .5 RTT for the Initial from the client to reach the server + assert_eq!( + // remove floating point division error + Duration::from_millis(rtt.mul_f32(1.5).as_millis() as u64), + server_events[1].timestamp.duration_since_start() + ); + assert_eq!(rtt, client_events[1].timestamp.duration_since_start()); + + // Server completes in 2.5 RTTs measured from the start of the test, since it takes .5 RTT + // for the Initial from the client to reach the server + assert_eq!( + rtt.mul_f32(2.5), + server_events[2].timestamp.duration_since_start() + ); + assert_eq!(rtt * 2, client_events[2].timestamp.duration_since_start()); +} + +#[derive(Clone)] +struct DcStateChangedEvent { + timestamp: Timestamp, + state: DcState, +} + +#[derive(Clone, Default)] +struct DcStateChanged { + pub events: Arc>>, +} +impl DcStateChanged { + pub fn new() -> Self { + Self::default() + } + + pub fn events(&self) -> Arc>> { + self.events.clone() + } +} + +impl events::Subscriber for DcStateChanged { + type ConnectionContext = DcStateChanged; + + fn create_connection_context( + &mut self, + _meta: &events::ConnectionMeta, + _info: &events::ConnectionInfo, + ) -> Self::ConnectionContext { + self.clone() + } + + fn on_dc_state_changed( + &mut self, + context: &mut Self::ConnectionContext, + meta: &events::ConnectionMeta, + event: &events::DcStateChanged, + ) { + let store = |event: &events::DcStateChanged, storage: &mut Vec| { + storage.push(DcStateChangedEvent { + timestamp: meta.timestamp, + state: event.state.clone(), + }); + }; + let mut buffer = context.events.lock().unwrap(); + store(event, &mut buffer); + } +}