diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index 14b1dd93b5e67c..21913191b6da9e 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -35,6 +35,12 @@ use { tokio_util::sync::CancellationToken, }; +/// RTT after which we start BDP scaling +const REFERENCE_RTT_MS: u64 = 50; + +/// Above this RTT we stop scaling for BDP +const MAX_RTT_MS: u64 = 350; + #[derive(Clone)] pub struct SwQosConfig { pub max_streams_per_ms: u64, @@ -114,8 +120,12 @@ impl SwQos { } } -fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { - match peer_type { +fn compute_max_allowed_uni_streams_with_rtt( + rtt_millis: u64, + peer_type: ConnectionPeerType, + total_stake: u64, +) -> usize { + let streams = match peer_type { ConnectionPeerType::Staked(peer_stake) => { // No checked math for f64 type. So let's explicitly check for 0 here if total_stake == 0 || peer_stake > total_stake { @@ -138,7 +148,10 @@ fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u } } ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - } + }; + // scale amount of streams based on RTT if RTT is larger than REFERENCE_RTT_MS + // multiply first then divide to avoid rounding errors. + (streams * rtt_millis.clamp(REFERENCE_RTT_MS, MAX_RTT_MS) as usize) / REFERENCE_RTT_MS as usize } impl SwQos { @@ -156,7 +169,10 @@ impl SwQos { ), ConnectionHandlerError, > { - if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( + // get current RTT and limit it to MAX_RTT_MS + let rtt_millis = connection.rtt().as_millis() as u64; + if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams_with_rtt( + rtt_millis, conn_context.peer_type(), conn_context.total_stake, ) as u64) @@ -488,6 +504,10 @@ impl QosController for SwQos { pub mod test { use super::*; + fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { + compute_max_allowed_uni_streams_with_rtt(REFERENCE_RTT_MS, peer_type, total_stake) + } + #[test] fn test_max_allowed_uni_streams() { assert_eq!( @@ -514,4 +534,26 @@ pub mod test { QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); } + + #[test] + fn test_max_allowed_uni_streams_with_rtt() { + assert_eq!( + compute_max_allowed_uni_streams_with_rtt( + REFERENCE_RTT_MS / 2, + ConnectionPeerType::Unstaked, + 10000 + ), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + "Max streams should not be less than normal for low RTT" + ); + assert_eq!( + compute_max_allowed_uni_streams_with_rtt( + REFERENCE_RTT_MS + REFERENCE_RTT_MS / 2, + ConnectionPeerType::Unstaked, + 10000 + ), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS / 2, + "Max streams should scale with BDP in high-RTT connections" + ); + } }