diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 32abebd90b0112..74a1b1089d587f 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -61,6 +61,10 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped"; pub(crate) const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2; pub(crate) const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed"; +pub(crate) const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3; +pub(crate) const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = + b"exceed_max_stream_count"; + const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; @@ -408,6 +412,7 @@ pub fn get_connection_stake( #[derive(Debug)] pub(crate) enum ConnectionHandlerError { ConnectionAddError, + MaxStreamError, } pub(crate) fn update_open_connections_stat( diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index 6408070062cf2c..cb7ba04f0d97d2 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -6,7 +6,8 @@ use { get_connection_stake, update_open_connections_stat, ClientConnectionTracker, ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey, ConnectionTableType, CONNECTION_CLOSE_CODE_DISALLOWED, - CONNECTION_CLOSE_REASON_DISALLOWED, + CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT, CONNECTION_CLOSE_REASON_DISALLOWED, + CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, }, stream_throttle::{ throttle_stream, ConnectionStreamCounter, StakedStreamLoadEMA, @@ -47,13 +48,6 @@ pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128; pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512; pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000; -/// Below this RTT, we apply the legacy logic (no BDP scaling) -/// Above this RTT, we increase the RX window and number of streams -/// as RTT increases to preserve reasonable bandwidth. -const REFERENCE_RTT_MS: u64 = 50; - -/// Above this RTT we stop scaling for BDP -const MAX_RTT_MS: u64 = 350; #[derive(Clone)] pub struct SwQosConfig { @@ -146,12 +140,8 @@ impl SwQos { } } -fn compute_max_allowed_uni_streams( - rtt_millis: u64, - peer_type: ConnectionPeerType, - total_stake: u64, -) -> u32 { - let streams = match peer_type { +fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { + match peer_type { ConnectionPeerType::Staked(peer_stake) => { // No checked math for f64 type. So let's explicitly check for 0 here if total_stake == 0 || peer_stake > total_stake { @@ -174,10 +164,7 @@ fn compute_max_allowed_uni_streams( } } ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - }; - let streams = - streams as u64 * rtt_millis.clamp(REFERENCE_RTT_MS, MAX_RTT_MS) / REFERENCE_RTT_MS; - streams.min(u32::MAX as u64) as u32 + } } impl SwQos { @@ -195,51 +182,58 @@ impl SwQos { ), ConnectionHandlerError, > { - // get current RTT and limit it to MAX_RTT_MS - let rtt_millis = connection.rtt().as_millis() as u64; - let max_uni_streams = VarInt::from_u32(compute_max_allowed_uni_streams( - rtt_millis, + if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( conn_context.peer_type(), conn_context.total_stake, - )); - - let remote_addr = connection.remote_address(); - - debug!( - "Peer type {:?}, total stake {}, max streams {} from peer {}", - conn_context.peer_type(), - conn_context.total_stake, - max_uni_streams.into_inner(), - remote_addr, - ); - - let max_connections_per_peer = match conn_context.peer_type() { - ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer, - ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer, - }; - if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l - .try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey), - remote_addr.port(), - client_connection_tracker, - Some(connection.clone()), - conn_context.peer_type(), - conn_context.last_update.clone(), - max_connections_per_peer, - || Arc::new(ConnectionStreamCounter::new()), - ) + ) as u64) { - update_open_connections_stat(&self.stats, &connection_table_l); - drop(connection_table_l); + let remote_addr = connection.remote_address(); - connection.set_max_concurrent_uni_streams(max_uni_streams); + debug!( + "Peer type {:?}, total stake {}, max streams {} from peer {}", + conn_context.peer_type(), + conn_context.total_stake, + max_uni_streams.into_inner(), + remote_addr, + ); - Ok((last_update, cancel_connection, stream_counter)) + let max_connections_per_peer = match conn_context.peer_type() { + ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer, + ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer, + }; + if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey), + remote_addr.port(), + client_connection_tracker, + Some(connection.clone()), + conn_context.peer_type(), + conn_context.last_update.clone(), + max_connections_per_peer, + || Arc::new(ConnectionStreamCounter::new()), + ) + { + update_open_connections_stat(&self.stats, &connection_table_l); + drop(connection_table_l); + + connection.set_max_concurrent_uni_streams(max_uni_streams); + + Ok((last_update, cancel_connection, stream_counter)) + } else { + self.stats + .connection_add_failed + .fetch_add(1, Ordering::Relaxed); + Err(ConnectionHandlerError::ConnectionAddError) + } } else { + connection.close( + CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(), + CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, + ); self.stats - .connection_add_failed + .connection_add_failed_invalid_stream_count .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::ConnectionAddError) + Err(ConnectionHandlerError::MaxStreamError) } } @@ -534,35 +528,27 @@ pub mod test { #[test] fn test_max_allowed_uni_streams() { assert_eq!( - compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32 + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); assert_eq!( - compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Staked(10), 0), - QUIC_MIN_STAKED_CONCURRENT_STREAMS as u32 + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), + QUIC_MIN_STAKED_CONCURRENT_STREAMS ); let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; assert_eq!( - compute_max_allowed_uni_streams( - REFERENCE_RTT_MS, - ConnectionPeerType::Staked(1000), - 10000 - ), - QUIC_MAX_STAKED_CONCURRENT_STREAMS as u32, + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), + QUIC_MAX_STAKED_CONCURRENT_STREAMS, ); assert_eq!( - compute_max_allowed_uni_streams( - REFERENCE_RTT_MS, - ConnectionPeerType::Staked(100), - 10000 - ), + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) as u32 + .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) ); assert_eq!( - compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32 + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS ); } }