From 10d60dbf76eb2f812fe296fd7b3d1aff67a4838a Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Thu, 6 Nov 2025 16:51:06 +0000 Subject: [PATCH] scale RX window and max_streams with BDP --- streamer/src/nonblocking/quic.rs | 5 -- streamer/src/nonblocking/swqos.rs | 132 +++++++++++++++++------------- 2 files changed, 73 insertions(+), 64 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 74a1b1089d587f..32abebd90b0112 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -61,10 +61,6 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped"; pub(crate) const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2; pub(crate) const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed"; -pub(crate) const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3; -pub(crate) const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = - b"exceed_max_stream_count"; - const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; @@ -412,7 +408,6 @@ pub fn get_connection_stake( #[derive(Debug)] pub(crate) enum ConnectionHandlerError { ConnectionAddError, - MaxStreamError, } pub(crate) fn update_open_connections_stat( diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index cb7ba04f0d97d2..6408070062cf2c 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -6,8 +6,7 @@ use { get_connection_stake, update_open_connections_stat, ClientConnectionTracker, ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey, ConnectionTableType, CONNECTION_CLOSE_CODE_DISALLOWED, - CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT, CONNECTION_CLOSE_REASON_DISALLOWED, - CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, + CONNECTION_CLOSE_REASON_DISALLOWED, }, stream_throttle::{ throttle_stream, ConnectionStreamCounter, StakedStreamLoadEMA, @@ -48,6 +47,13 @@ pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512; pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000; +/// Below this RTT, we apply the legacy logic (no BDP scaling) +/// Above this RTT, we increase the RX window and number of streams +/// as RTT increases to preserve reasonable bandwidth. +const REFERENCE_RTT_MS: u64 = 50; + +/// Above this RTT we stop scaling for BDP +const MAX_RTT_MS: u64 = 350; #[derive(Clone)] pub struct SwQosConfig { @@ -140,8 +146,12 @@ impl SwQos { } } -fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { - match peer_type { +fn compute_max_allowed_uni_streams( + rtt_millis: u64, + peer_type: ConnectionPeerType, + total_stake: u64, +) -> u32 { + let streams = match peer_type { ConnectionPeerType::Staked(peer_stake) => { // No checked math for f64 type. So let's explicitly check for 0 here if total_stake == 0 || peer_stake > total_stake { @@ -164,7 +174,10 @@ fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u } } ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - } + }; + let streams = + streams as u64 * rtt_millis.clamp(REFERENCE_RTT_MS, MAX_RTT_MS) / REFERENCE_RTT_MS; + streams.min(u32::MAX as u64) as u32 } impl SwQos { @@ -182,58 +195,51 @@ impl SwQos { ), ConnectionHandlerError, > { - if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( + // get current RTT and limit it to MAX_RTT_MS + let rtt_millis = connection.rtt().as_millis() as u64; + let max_uni_streams = VarInt::from_u32(compute_max_allowed_uni_streams( + rtt_millis, conn_context.peer_type(), conn_context.total_stake, - ) as u64) - { - let remote_addr = connection.remote_address(); + )); - debug!( - "Peer type {:?}, total stake {}, max streams {} from peer {}", + let remote_addr = connection.remote_address(); + + debug!( + "Peer type {:?}, total stake {}, max streams {} from peer {}", + conn_context.peer_type(), + conn_context.total_stake, + max_uni_streams.into_inner(), + remote_addr, + ); + + let max_connections_per_peer = match conn_context.peer_type() { + ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer, + ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer, + }; + if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey), + remote_addr.port(), + client_connection_tracker, + Some(connection.clone()), conn_context.peer_type(), - conn_context.total_stake, - max_uni_streams.into_inner(), - remote_addr, - ); + conn_context.last_update.clone(), + max_connections_per_peer, + || Arc::new(ConnectionStreamCounter::new()), + ) + { + update_open_connections_stat(&self.stats, &connection_table_l); + drop(connection_table_l); - let max_connections_per_peer = match conn_context.peer_type() { - ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer, - ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer, - }; - if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l - .try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey), - remote_addr.port(), - client_connection_tracker, - Some(connection.clone()), - conn_context.peer_type(), - conn_context.last_update.clone(), - max_connections_per_peer, - || Arc::new(ConnectionStreamCounter::new()), - ) - { - update_open_connections_stat(&self.stats, &connection_table_l); - drop(connection_table_l); - - connection.set_max_concurrent_uni_streams(max_uni_streams); - - Ok((last_update, cancel_connection, stream_counter)) - } else { - self.stats - .connection_add_failed - .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::ConnectionAddError) - } + connection.set_max_concurrent_uni_streams(max_uni_streams); + + Ok((last_update, cancel_connection, stream_counter)) } else { - connection.close( - CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(), - CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, - ); self.stats - .connection_add_failed_invalid_stream_count + .connection_add_failed .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::MaxStreamError) + Err(ConnectionHandlerError::ConnectionAddError) } } @@ -528,27 +534,35 @@ pub mod test { #[test] fn test_max_allowed_uni_streams() { assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 0), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32 ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), - QUIC_MIN_STAKED_CONCURRENT_STREAMS + compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Staked(10), 0), + QUIC_MIN_STAKED_CONCURRENT_STREAMS as u32 ); let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), - QUIC_MAX_STAKED_CONCURRENT_STREAMS, + compute_max_allowed_uni_streams( + REFERENCE_RTT_MS, + ConnectionPeerType::Staked(1000), + 10000 + ), + QUIC_MAX_STAKED_CONCURRENT_STREAMS as u32, ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), + compute_max_allowed_uni_streams( + REFERENCE_RTT_MS, + ConnectionPeerType::Staked(100), + 10000 + ), ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) + .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) as u32 ); assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 10000), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32 ); } }