From 8c815237c2457fd0a5bc5c54d5d57e3fc2351650 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Mon, 8 Sep 2025 14:53:41 +0000 Subject: [PATCH 1/3] Streamer/TPU: scale amount of bytes in flight with peer RTT (#7745) * use BDP to compute the rx window before SWQOS throttling is applied, helps high-latency senders (>50ms RTT) get reasonable TPS * set number of streamer streams based on BDP (for same reason) * For any RTT below 400ms, target up to 80 Mbps service rate per max-staked connection * add a workaround to keep giving higher bandwidth to close-by nodes * update RX window every 128 TXs in case someone tries to spoof it --- streamer/src/nonblocking/qos.rs | 3 + streamer/src/nonblocking/quic.rs | 110 ++++++++-- streamer/src/nonblocking/simple_qos.rs | 9 +- streamer/src/nonblocking/swqos.rs | 275 ++++++++----------------- streamer/src/streamer.rs | 22 +- 5 files changed, 195 insertions(+), 224 deletions(-) diff --git a/streamer/src/nonblocking/qos.rs b/streamer/src/nonblocking/qos.rs index 35e501e355f..a6940c5ea2e 100644 --- a/streamer/src/nonblocking/qos.rs +++ b/streamer/src/nonblocking/qos.rs @@ -30,6 +30,9 @@ pub(crate) trait QosController { context: &mut C, ) -> impl Future> + Send; + /// Calculate the intended bandwidth allocation for a given peer in kbps + fn get_max_bitrate_kbps(&self, context: &C) -> u64; + /// Called when a new stream is received on a connection fn on_new_stream(&self, context: &C) -> impl Future + Send; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 5f4f20720e1..bcfb3046745 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -12,7 +12,7 @@ use { crossbeam_channel::{bounded, Receiver, Sender, TryRecvError, TrySendError}, futures::{stream::FuturesUnordered, Future, StreamExt as _}, indexmap::map::{Entry, IndexMap}, - quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime}, + quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, rand::{thread_rng, Rng}, smallvec::SmallVec, solana_keypair::Keypair, @@ -24,12 +24,10 @@ use { solana_tls_utils::get_pubkey_from_tls_certificate, solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ - array, - fmt, + array, fmt, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, pin::Pin, - // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, @@ -63,16 +61,55 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped"; pub(crate) const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2; pub(crate) const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed"; -pub(crate) const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3; -pub(crate) const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = - b"exceed_max_stream_count"; - const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5; const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream"; +/// Maximal allowed RTT for SWQOS calculations. +/// +/// The larger the RTT, the more RX buffer space we need to sustain the same +/// throughput. Since RTT can be spoofed, we do not want the clients to have +/// full control over it to avoid memory exhaustion. 400ms is more than any +/// reasonable RTT in the internet, and yet small enough to prevent memory +/// exhaustion attacks. +pub(crate) const MAX_ALLOWED_RTT_MS: u64 = 400; + +/// If the client RTT is less than this, we will allocate bandwidth as if +/// the client actually had this RTT. This is to prevent accidentally +/// throttling connections when RTT is very low, as RTT measurements +/// are in integer milliseconds, and 1ms vs 2ms is a 50% differnce in the +/// bandwidth allocation. +/// +/// TODO: Currently this is set way higher than necessary to avoid errors. +/// This mimics the legacy behavior where very low-latency connections +/// would get more bandwidth. +pub(crate) const MIN_ALLOWED_RTT_MS: u64 = 50; + +/// Allow an absolute max of 2 MB RX window +/// +/// This puts an upper bound on the total amount of bytes we may have to buffer +/// for a given client, irrespective of stake and RTT. This serves as an upper +/// bound on memory consumption. +pub(crate) const MAX_ALLOWED_RX_WINDOW: u32 = 2000 * 1000; + +/// Expected mean size of a transaction for the purpose of +/// receive window <=> streams conversions; +/// essentially, num_rx_streams = rx_window/MEAN_TRANSACTION_SIZE. +/// Based on MNB statistics it should be ~600, +/// but we allow some margin to allow for smaller TXs. +pub(crate) const MEAN_TRANSACTION_SIZE: usize = 400; + +/// Maximal possible amount of streams to allocate per connection +/// +/// Each connection gets a certain quota of QUIC streams allocated for it. +/// Each transaction "in flight" from client to server occupies a stream. +/// The total amount needed depends on desired bandwidth, transaction size +/// and RTT. This constant puts an upper bound on number of streams to +/// limit memory consumption. +const MAX_ALLOWED_UNI_STREAMS: u64 = MAX_ALLOWED_RX_WINDOW as u64 / MEAN_TRANSACTION_SIZE as u64; + /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted /// later. @@ -382,7 +419,7 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option { pub fn get_connection_stake( connection: &Connection, staked_nodes: &RwLock, -) -> Option<(Pubkey, u64, u64, u64, u64)> { +) -> Option<(Pubkey, u64, u64, u64)> { let pubkey = get_remote_pubkey(connection)?; debug!("Peer public key is {pubkey:?}"); let staked_nodes = staked_nodes.read().unwrap(); @@ -391,14 +428,12 @@ pub fn get_connection_stake( staked_nodes.get_node_stake(&pubkey)?, staked_nodes.total_stake(), staked_nodes.max_stake(), - staked_nodes.min_stake(), )) } #[derive(Debug)] pub(crate) enum ConnectionHandlerError { ConnectionAddError, - MaxStreamError, } pub(crate) fn update_open_connections_stat( @@ -416,6 +451,17 @@ pub(crate) fn update_open_connections_stat( } } +/// Compute the RX window based on bandwidth-delay-product +fn compute_receive_window(max_receive_rate_kbps: u64, rtt: Duration) -> VarInt { + // truncate here is safe since u64 millis is an eternity + let millis = (rtt.as_millis() as u64).clamp(MIN_ALLOWED_RTT_MS, MAX_ALLOWED_RTT_MS); + // Compute the receive window in bytes as max_rx_rate * rtt / 8, + let receive_window = (max_receive_rate_kbps * millis) / 8; + // hard constraint the RX window to avoid excess memory use + let receive_window = receive_window.min(MAX_ALLOWED_RX_WINDOW as u64) as u32; + VarInt::from_u32(receive_window) +} + #[allow(clippy::too_many_arguments)] async fn setup_connection( connecting: Connecting, @@ -704,15 +750,27 @@ async fn handle_connection( { let peer_type = context.peer_type(); let remote_addr = connection.remote_address(); + connection.set_max_concurrent_bi_streams(VarInt::from_u32(0)); + let max_receive_rate_kbps = qos.get_max_bitrate_kbps(&context); + let initial_rx_window = compute_receive_window(max_receive_rate_kbps, connection.rtt()); + connection.set_receive_window(initial_rx_window); + let max_streams = VarInt::from_u64( + (initial_rx_window.into_inner() / MEAN_TRANSACTION_SIZE as u64) + .min(MAX_ALLOWED_UNI_STREAMS), + ) + .expect("dividing VarInt by positive integer will never overflow VarInt"); + connection.set_max_concurrent_uni_streams(max_streams); debug!( - "quic new connection {} streams: {} connections: {}", - remote_addr, - stats.active_streams.load(Ordering::Relaxed), - stats.total_connections.load(Ordering::Relaxed), + "quic new connection {remote_addr}, max_receive_rate {max_receive_rate_kbps} Kbps \ + (receive_window {initial_rx_window} max_streams {max_streams:?} RTT {rtt}ms), streams: \ + {streams} connections: {connections}", + rtt = connection.rtt().as_millis(), + streams = stats.active_streams.load(Ordering::Relaxed), + connections = stats.total_connections.load(Ordering::Relaxed), ); stats.total_connections.fetch_add(1, Ordering::Relaxed); - 'conn: loop { + 'conn: for stream_number in 0u64.. { // Wait for new streams. If the peer is disconnected we get a cancellation signal and stop // the connection task. let mut stream = select! { @@ -770,7 +828,11 @@ async fn handle_connection( } // timeout elapsed Err(_) => { - debug!("Timeout in receiving on stream"); + debug!( + "Timeout in receiving on stream {} from {}", + stream.id(), + connection.remote_address() + ); stats .total_stream_read_timeouts .fetch_add(1, Ordering::Relaxed); @@ -808,6 +870,18 @@ async fn handle_connection( stats.active_streams.fetch_sub(1, Ordering::Relaxed); qos.on_stream_closed(&context); + if (stream_number % 128) == 0 { + let new_window = compute_receive_window(max_receive_rate_kbps, connection.rtt()); + trace!( + "Updating receive window for {remote_addr:?} to {new_window:?} based on rtt {:?} \ + and target bitrate {} kbps", + connection.rtt(), + max_receive_rate_kbps + ); + connection.set_receive_window(new_window); + // we do not update number of allowed streams here since + // it may cause extra allocations. + } } let removed_connection_count = qos.remove_connection(&context, connection).await; @@ -2042,7 +2116,7 @@ pub mod test { let client_connection = make_client_endpoint(&server_address, None).await; - // unstaked connection can handle up to 100tps, so we should send in ~1s. + // unstaked connection can handle > 100tps, so we should send in <1s. let expected_num_txs = 100; let start_time = tokio::time::Instant::now(); for i in 0..expected_num_txs { diff --git a/streamer/src/nonblocking/simple_qos.rs b/streamer/src/nonblocking/simple_qos.rs index 4f171dd557e..a982cda94b9 100644 --- a/streamer/src/nonblocking/simple_qos.rs +++ b/streamer/src/nonblocking/simple_qos.rs @@ -5,7 +5,7 @@ use { quic::{ get_connection_stake, update_open_connections_stat, ClientConnectionTracker, ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey, - ConnectionTableType, + ConnectionTableType, MEAN_TRANSACTION_SIZE, }, stream_throttle::{ throttle_stream, ConnectionStreamCounter, STREAM_THROTTLING_INTERVAL, @@ -146,7 +146,7 @@ impl QosController for SimpleQos { let (peer_type, remote_pubkey, _total_stake) = get_connection_stake(connection, &self.staked_nodes).map_or( (ConnectionPeerType::Unstaked, None, 0), - |(pubkey, stake, total_stake, _max_stake, _min_stake)| { + |(pubkey, stake, total_stake, _max_stake)| { (ConnectionPeerType::Staked(stake), Some(pubkey), total_stake) }, ); @@ -160,6 +160,11 @@ impl QosController for SimpleQos { } } + fn get_max_bitrate_kbps(&self, _context: &SimpleQosConnectionContext) -> u64 { + // choose max_bitrate with 4x margin + self.max_streams_per_second * 4 * MEAN_TRANSACTION_SIZE as u64 * 8 / 1000 + } + #[allow(clippy::manual_async_fn)] fn try_add_connection( &self, diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index 59ec14bf577..587f35e4061 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -6,8 +6,7 @@ use { get_connection_stake, update_open_connections_stat, ClientConnectionTracker, ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey, ConnectionTableType, CONNECTION_CLOSE_CODE_DISALLOWED, - CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT, CONNECTION_CLOSE_REASON_DISALLOWED, - CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, + CONNECTION_CLOSE_REASON_DISALLOWED, }, stream_throttle::{ throttle_stream, ConnectionStreamCounter, StakedStreamLoadEMA, @@ -18,14 +17,7 @@ use { streamer::StakedNodes, }, percentage::Percentage, - quinn::{Connection, VarInt, VarIntBoundsExceeded}, - solana_packet::PACKET_DATA_SIZE, - solana_quic_definitions::{ - QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, - QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, - }, + quinn::Connection, solana_time_utils as timing, std::{ future::Future, @@ -38,6 +30,34 @@ use { tokio_util::sync::CancellationToken, }; +/// Target max bitrate for an unstaked connection +/// This is max raw network bitrare, actual TPS will be enforced +/// by the throttling logic. This directly impacts the amount of RAM +/// that a connection can require to support RX buffers. +/// +/// This needs to be constrained to ensure that we can admit a lot of unstaked +/// connections without consuming too much RAM. The 4Mbps target for unstaked connection +/// allows for about 1200 TPS with mean TX size of 400 bytes, and will still allow 100 TPS +/// at maximal planned TX size of 4096 bytes. +pub(crate) const TARGET_UNSTAKED_KBPS: u64 = 4000; + +/// Target max bitrate for a staked connection with maximum +/// stake amount through the cluster. Connections will get +/// bitrates interpolated between this and [`TARGET_UNSTAKED_KBPS`] +/// based on their actual stake amount relative to maximum. +/// This is only the max network bitrare, actual TPS will be enforced +/// by the throttling logic. +/// +/// We want to allocate more bandwidth to staked nodes, since we have a +/// fairly small number of staked nodes. We want to keep this as large +/// as is reasonable to ensure that the staked connections can submit as +/// many transactions as they have available. Of course we do not want +/// to make this infinitely large to avoid the TPU bandwidth exhaustion +/// if all staked nodes decide to use their allowance at the same time. +/// Current value chosen to allow about 25000 TPS, and limit the total +/// TPU bandwidth per identity to 80 Mbps. +pub(crate) const TARGET_MAX_STAKED_KBPS: u64 = TARGET_UNSTAKED_KBPS * 20; + #[derive(Clone)] pub struct SwQosConfig { pub max_streams_per_ms: u64, @@ -67,7 +87,6 @@ pub struct SwQos { pub struct SwQosConnectionContext { peer_type: ConnectionPeerType, max_stake: u64, - min_stake: u64, remote_pubkey: Option, total_stake: u64, in_staked_table: bool, @@ -119,76 +138,6 @@ impl SwQos { } } -/// Calculate the ratio for per connection receive window from a staked peer -fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 { - // Testing shows the maximum throughput from a connection is achieved at receive_window = - // PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the - // stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to - // QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r' - // for stake 's' is, - // r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find - // a and b. - - if stake > max_stake { - return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - } - - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; - if max_stake > min_stake { - let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; - let b = max_ratio as f64 - ((max_stake as f64) * a); - let ratio = (a * stake as f64) + b; - ratio.round() as u64 - } else { - QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO - } -} - -fn compute_recieve_window( - max_stake: u64, - min_stake: u64, - peer_type: ConnectionPeerType, -) -> Result { - match peer_type { - ConnectionPeerType::Unstaked => { - VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) - } - ConnectionPeerType::Staked(peer_stake) => { - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); - VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) - } - } -} - -fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { - match peer_type { - ConnectionPeerType::Staked(peer_stake) => { - // No checked math for f64 type. So let's explicitly check for 0 here - if total_stake == 0 || peer_stake > total_stake { - warn!( - "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \ - {total_stake:?}" - ); - - QUIC_MIN_STAKED_CONCURRENT_STREAMS - } else { - let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - - (((peer_stake as f64 / total_stake as f64) * delta) as usize - + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .clamp( - QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ) - } - } - ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - } -} - impl SwQos { fn cache_new_connection( &self, @@ -204,62 +153,33 @@ impl SwQos { ), ConnectionHandlerError, > { - if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( + let remote_addr = connection.remote_address(); + debug!( + "Peer {remote_addr}: type {:?}, total stake {}", conn_context.peer_type(), conn_context.total_stake, - ) as u64) - { - let remote_addr = connection.remote_address(); - let receive_window = compute_recieve_window( - conn_context.max_stake, - conn_context.min_stake, - conn_context.peer_type(), - ); + ); - debug!( - "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}", + if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey), + remote_addr.port(), + client_connection_tracker, + Some(connection.clone()), conn_context.peer_type(), - conn_context.total_stake, - max_uni_streams.into_inner(), - receive_window, - remote_addr, - ); - - if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l - .try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey), - remote_addr.port(), - client_connection_tracker, - Some(connection.clone()), - conn_context.peer_type(), - conn_context.last_update.clone(), - self.max_connections_per_peer, - ) - { - update_open_connections_stat(&self.stats, &connection_table_l); - drop(connection_table_l); - - if let Ok(receive_window) = receive_window { - connection.set_receive_window(receive_window); - } - connection.set_max_concurrent_uni_streams(max_uni_streams); + conn_context.last_update.clone(), + self.max_connections_per_peer, + ) + { + update_open_connections_stat(&self.stats, &connection_table_l); + drop(connection_table_l); - Ok((last_update, cancel_connection, stream_counter)) - } else { - self.stats - .connection_add_failed - .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::ConnectionAddError) - } + Ok((last_update, cancel_connection, stream_counter)) } else { - connection.close( - CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(), - CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, - ); self.stats - .connection_add_failed_invalid_stream_count + .connection_add_failed .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::MaxStreamError) + Err(ConnectionHandlerError::ConnectionAddError) } } @@ -330,7 +250,6 @@ impl QosController for SwQos { SwQosConnectionContext { peer_type: ConnectionPeerType::Unstaked, max_stake: 0, - min_stake: 0, total_stake: 0, remote_pubkey: None, in_staked_table: false, @@ -338,7 +257,7 @@ impl QosController for SwQos { stream_counter: None, last_update: Arc::new(AtomicU64::new(timing::timestamp())), }, - |(pubkey, stake, total_stake, max_stake, min_stake)| { + |(pubkey, stake, total_stake, max_stake)| { // The heuristic is that the stake should be large enough to have 1 stream pass through within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. @@ -358,7 +277,6 @@ impl QosController for SwQos { SwQosConnectionContext { peer_type, max_stake, - min_stake, total_stake, remote_pubkey: Some(pubkey), in_staked_table: false, @@ -543,6 +461,31 @@ impl QosController for SwQos { .await; } } + + fn get_max_bitrate_kbps(&self, context: &SwQosConnectionContext) -> u64 { + get_max_bitrate_kbps(context.peer_type, context.max_stake) + } +} + +// computes max_bitrate as a linear interpolation between TARGET_UNSTAKED_KBPS and +// TARGET_MAX_STAKED_KBPS based on stake amount +fn get_max_bitrate_kbps(peer_type: ConnectionPeerType, max_stake: u64) -> u64 { + let stake = match peer_type { + ConnectionPeerType::Unstaked => 0, + ConnectionPeerType::Staked(peer_stake) => peer_stake, + } + .min(max_stake); + + // real cluster always has staked nodes, but unittests may have 100% unstaked + if max_stake == 0 { + return TARGET_UNSTAKED_KBPS; + } + + let max_rate = TARGET_MAX_STAKED_KBPS; + let min_rate = TARGET_UNSTAKED_KBPS; + // Linear interpolation between min and max as + // stake approaches max_stake + min_rate + ((stake as u128 * (max_rate - min_rate) as u128) / (max_stake as u128)) as u64 } #[cfg(test)] @@ -551,63 +494,19 @@ pub mod test { #[test] fn test_cacluate_receive_window_ratio_for_staked_node() { - let mut max_stake = 10000; - let mut min_stake = 0; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake); - assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO); - - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - assert_eq!(ratio, max_ratio); - - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2); - let average_ratio = - (QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2; - assert_eq!(ratio, average_ratio); - - max_stake = 10000; - min_stake = 10000; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - assert_eq!(ratio, max_ratio); - - max_stake = 0; - min_stake = 0; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - assert_eq!(ratio, max_ratio); - - max_stake = 1000; - min_stake = 10; - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10); - assert_eq!(ratio, max_ratio); - } + let max_stake = 10000; - #[test] + let rate = get_max_bitrate_kbps(ConnectionPeerType::Unstaked, max_stake); + assert_eq!(rate, TARGET_UNSTAKED_KBPS); - fn test_max_allowed_uni_streams() { - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), - QUIC_MIN_STAKED_CONCURRENT_STREAMS - ); - let delta = - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), - ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); + let rate = get_max_bitrate_kbps(ConnectionPeerType::Staked(max_stake), max_stake); + assert_eq!(rate, TARGET_MAX_STAKED_KBPS); + + let rate = get_max_bitrate_kbps(ConnectionPeerType::Staked(max_stake / 2), max_stake); + let average_ratio = (TARGET_MAX_STAKED_KBPS + TARGET_UNSTAKED_KBPS) / 2; + assert_eq!(rate, average_ratio); + + let rate = get_max_bitrate_kbps(ConnectionPeerType::Staked(max_stake + 10), max_stake); + assert_eq!(rate, TARGET_MAX_STAKED_KBPS); } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 00b3d71ce95..83ddec37261 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -11,7 +11,6 @@ use { }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, histogram::Histogram, - itertools::Itertools, solana_net_utils::multihomed_sockets::{ BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, SocketProvider, }, @@ -81,7 +80,6 @@ pub struct StakedNodes { overrides: HashMap, total_stake: u64, max_stake: u64, - min_stake: u64, } pub type PacketBatchReceiver = Receiver; @@ -429,11 +427,11 @@ impl StreamerSendStats { } impl StakedNodes { - /// Calculate the stake stats: return the new (total_stake, min_stake and max_stake) tuple + /// Calculate the stake stats: return the new (total_stake and max_stake) tuple fn calculate_stake_stats( stakes: &Arc>, overrides: &HashMap, - ) -> (u64, u64, u64) { + ) -> (u64, u64) { let values = stakes .iter() .filter(|(pubkey, _)| !overrides.contains_key(pubkey)) @@ -441,18 +439,17 @@ impl StakedNodes { .chain(overrides.values().copied()) .filter(|&stake| stake > 0); let total_stake = values.clone().sum(); - let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default(); - (total_stake, min_stake, max_stake) + let max_stake = values.max().unwrap_or_default(); + (total_stake, max_stake) } pub fn new(stakes: Arc>, overrides: HashMap) -> Self { - let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); + let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); Self { stakes, overrides, total_stake, max_stake, - min_stake, } } @@ -469,11 +466,6 @@ impl StakedNodes { self.total_stake } - #[inline] - pub(super) fn min_stake(&self) -> u64 { - self.min_stake - } - #[inline] pub(super) fn max_stake(&self) -> u64 { self.max_stake @@ -481,11 +473,9 @@ impl StakedNodes { // Update the stake map given a new stakes map pub fn update_stake_map(&mut self, stakes: Arc>) { - let (total_stake, min_stake, max_stake) = - Self::calculate_stake_stats(&stakes, &self.overrides); + let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &self.overrides); self.total_stake = total_stake; - self.min_stake = min_stake; self.max_stake = max_stake; self.stakes = stakes; } From 16597baf1b2c7710e13b491fa6b353f460ba2da2 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Thu, 6 Nov 2025 09:12:09 +0000 Subject: [PATCH 2/3] switch from kbps to reference TPS --- streamer/src/nonblocking/qos.rs | 5 +- streamer/src/nonblocking/quic.rs | 45 ++++++++++------- streamer/src/nonblocking/simple_qos.rs | 8 +-- streamer/src/nonblocking/swqos.rs | 67 ++++++++++++++------------ 4 files changed, 69 insertions(+), 56 deletions(-) diff --git a/streamer/src/nonblocking/qos.rs b/streamer/src/nonblocking/qos.rs index a6940c5ea2e..f2b54c1940e 100644 --- a/streamer/src/nonblocking/qos.rs +++ b/streamer/src/nonblocking/qos.rs @@ -30,8 +30,9 @@ pub(crate) trait QosController { context: &mut C, ) -> impl Future> + Send; - /// Calculate the intended bandwidth allocation for a given peer in kbps - fn get_max_bitrate_kbps(&self, context: &C) -> u64; + /// The reference TPS desired for a connection + /// on the QUIC level (affects RX buffer allocations) + fn reference_tps(&self, context: &C) -> u64; /// Called when a new stream is received on a connection fn on_new_stream(&self, context: &C) -> impl Future + Send; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index bcfb3046745..c74ad0b3c82 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -92,14 +92,14 @@ pub(crate) const MIN_ALLOWED_RTT_MS: u64 = 50; /// This puts an upper bound on the total amount of bytes we may have to buffer /// for a given client, irrespective of stake and RTT. This serves as an upper /// bound on memory consumption. -pub(crate) const MAX_ALLOWED_RX_WINDOW: u32 = 2000 * 1000; +pub(crate) const MAX_ALLOWED_RX_WINDOW: u32 = 2048 * 1024; /// Expected mean size of a transaction for the purpose of /// receive window <=> streams conversions; /// essentially, num_rx_streams = rx_window/MEAN_TRANSACTION_SIZE. /// Based on MNB statistics it should be ~600, /// but we allow some margin to allow for smaller TXs. -pub(crate) const MEAN_TRANSACTION_SIZE: usize = 400; +pub(crate) const MEAN_TRANSACTION_SIZE: u32 = 400; /// Maximal possible amount of streams to allocate per connection /// @@ -108,7 +108,7 @@ pub(crate) const MEAN_TRANSACTION_SIZE: usize = 400; /// The total amount needed depends on desired bandwidth, transaction size /// and RTT. This constant puts an upper bound on number of streams to /// limit memory consumption. -const MAX_ALLOWED_UNI_STREAMS: u64 = MAX_ALLOWED_RX_WINDOW as u64 / MEAN_TRANSACTION_SIZE as u64; +const MAX_ALLOWED_UNI_STREAMS: u32 = MAX_ALLOWED_RX_WINDOW / MEAN_TRANSACTION_SIZE; /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted @@ -452,14 +452,24 @@ pub(crate) fn update_open_connections_stat( } /// Compute the RX window based on bandwidth-delay-product -fn compute_receive_window(max_receive_rate_kbps: u64, rtt: Duration) -> VarInt { +/// +/// This will attempt to match reference TPS for high-latency connections, +/// and will not artifically slow down connections with latency < [`MIN_ALLOWED_RTT_MS`] +fn compute_receive_window_and_max_streams(reference_tps: u64, rtt: Duration) -> (VarInt, VarInt) { + // the desired transfer rate in bytes/second for RX window computation + let reference_transfer_rate = reference_tps * MEAN_TRANSACTION_SIZE as u64; // truncate here is safe since u64 millis is an eternity - let millis = (rtt.as_millis() as u64).clamp(MIN_ALLOWED_RTT_MS, MAX_ALLOWED_RTT_MS); - // Compute the receive window in bytes as max_rx_rate * rtt / 8, - let receive_window = (max_receive_rate_kbps * millis) / 8; + let rtt_milliseconds = (rtt.as_millis() as u64).clamp(MIN_ALLOWED_RTT_MS, MAX_ALLOWED_RTT_MS); + // Compute the receive window in bytes as reference_transfer_rate * rtt, + let receive_window = reference_transfer_rate * rtt_milliseconds; // hard constraint the RX window to avoid excess memory use let receive_window = receive_window.min(MAX_ALLOWED_RX_WINDOW as u64) as u32; - VarInt::from_u32(receive_window) + // compute max_streams in flight + let max_streams = (receive_window / MEAN_TRANSACTION_SIZE).min(MAX_ALLOWED_UNI_STREAMS); + ( + VarInt::from_u32(receive_window), + VarInt::from_u32(max_streams), + ) } #[allow(clippy::too_many_arguments)] @@ -751,17 +761,13 @@ async fn handle_connection( let peer_type = context.peer_type(); let remote_addr = connection.remote_address(); connection.set_max_concurrent_bi_streams(VarInt::from_u32(0)); - let max_receive_rate_kbps = qos.get_max_bitrate_kbps(&context); - let initial_rx_window = compute_receive_window(max_receive_rate_kbps, connection.rtt()); + let max_target_tps_per_connection = qos.reference_tps(&context); + let (initial_rx_window, max_streams) = + compute_receive_window_and_max_streams(max_target_tps_per_connection, connection.rtt()); connection.set_receive_window(initial_rx_window); - let max_streams = VarInt::from_u64( - (initial_rx_window.into_inner() / MEAN_TRANSACTION_SIZE as u64) - .min(MAX_ALLOWED_UNI_STREAMS), - ) - .expect("dividing VarInt by positive integer will never overflow VarInt"); connection.set_max_concurrent_uni_streams(max_streams); debug!( - "quic new connection {remote_addr}, max_receive_rate {max_receive_rate_kbps} Kbps \ + "quic new connection {remote_addr}, max_receive_rate {max_target_tps_per_connection} Kbps \ (receive_window {initial_rx_window} max_streams {max_streams:?} RTT {rtt}ms), streams: \ {streams} connections: {connections}", rtt = connection.rtt().as_millis(), @@ -871,12 +877,15 @@ async fn handle_connection( stats.active_streams.fetch_sub(1, Ordering::Relaxed); qos.on_stream_closed(&context); if (stream_number % 128) == 0 { - let new_window = compute_receive_window(max_receive_rate_kbps, connection.rtt()); + let (new_window, _) = compute_receive_window_and_max_streams( + max_target_tps_per_connection, + connection.rtt(), + ); trace!( "Updating receive window for {remote_addr:?} to {new_window:?} based on rtt {:?} \ and target bitrate {} kbps", connection.rtt(), - max_receive_rate_kbps + max_target_tps_per_connection ); connection.set_receive_window(new_window); // we do not update number of allowed streams here since diff --git a/streamer/src/nonblocking/simple_qos.rs b/streamer/src/nonblocking/simple_qos.rs index a982cda94b9..af8b7b83b52 100644 --- a/streamer/src/nonblocking/simple_qos.rs +++ b/streamer/src/nonblocking/simple_qos.rs @@ -5,7 +5,7 @@ use { quic::{ get_connection_stake, update_open_connections_stat, ClientConnectionTracker, ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey, - ConnectionTableType, MEAN_TRANSACTION_SIZE, + ConnectionTableType, }, stream_throttle::{ throttle_stream, ConnectionStreamCounter, STREAM_THROTTLING_INTERVAL, @@ -160,9 +160,9 @@ impl QosController for SimpleQos { } } - fn get_max_bitrate_kbps(&self, _context: &SimpleQosConnectionContext) -> u64 { - // choose max_bitrate with 4x margin - self.max_streams_per_second * 4 * MEAN_TRANSACTION_SIZE as u64 * 8 / 1000 + fn reference_tps(&self, _context: &SimpleQosConnectionContext) -> u64 { + // allocate network bandwidth with 4x margin + self.max_streams_per_second * 4 } #[allow(clippy::manual_async_fn)] diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index 587f35e4061..29df8a05c8e 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -30,33 +30,35 @@ use { tokio_util::sync::CancellationToken, }; -/// Target max bitrate for an unstaked connection -/// This is max raw network bitrare, actual TPS will be enforced +/// Reference TPS for an unstaked (or very low-staked) connection. +/// This defines the raw network buffer allocations, actual TPS will be enforced /// by the throttling logic. This directly impacts the amount of RAM /// that a connection can require to support RX buffers. /// /// This needs to be constrained to ensure that we can admit a lot of unstaked -/// connections without consuming too much RAM. The 4Mbps target for unstaked connection -/// allows for about 1200 TPS with mean TX size of 400 bytes, and will still allow 100 TPS -/// at maximal planned TX size of 4096 bytes. -pub(crate) const TARGET_UNSTAKED_KBPS: u64 = 4000; +/// connections without consuming too much RAM. +pub(crate) const UNSTAKED_CONNECTION_REFERENCE_TPS: u64 = 1250; -/// Target max bitrate for a staked connection with maximum +/// Reference max TPS for a single staked connection with maximum /// stake amount through the cluster. Connections will get -/// bitrates interpolated between this and [`TARGET_UNSTAKED_KBPS`] +/// buffer allocations based on a TPS target interpolated +/// between this and [`TARGET_BASE_TPS_PER_CONNECTION`], /// based on their actual stake amount relative to maximum. -/// This is only the max network bitrare, actual TPS will be enforced +/// This affects the RX bufffers only, true TPS will be enforced /// by the throttling logic. /// -/// We want to allocate more bandwidth to staked nodes, since we have a -/// fairly small number of staked nodes. We want to keep this as large +/// We generally want to allocate more bandwidth to staked nodes, since +/// we have fairly small number of them. We want to keep this as large /// as is reasonable to ensure that the staked connections can submit as /// many transactions as they have available. Of course we do not want /// to make this infinitely large to avoid the TPU bandwidth exhaustion -/// if all staked nodes decide to use their allowance at the same time. -/// Current value chosen to allow about 25000 TPS, and limit the total -/// TPU bandwidth per identity to 80 Mbps. -pub(crate) const TARGET_MAX_STAKED_KBPS: u64 = TARGET_UNSTAKED_KBPS * 20; +/// if all staked nodes decide to use their allowance at the same time, +/// as it takes time for them to receive feedback from the server and slow +/// down. +/// +/// Current value chosen to allow about 25000 TPS per connection (200K TPS +/// per staked identity if it uses all 8 connections). +pub(crate) const MAX_STAKED_CONNECTION_REFERENCE_TPS: u64 = UNSTAKED_CONNECTION_REFERENCE_TPS * 20; #[derive(Clone)] pub struct SwQosConfig { @@ -462,14 +464,14 @@ impl QosController for SwQos { } } - fn get_max_bitrate_kbps(&self, context: &SwQosConnectionContext) -> u64 { - get_max_bitrate_kbps(context.peer_type, context.max_stake) + fn reference_tps(&self, context: &SwQosConnectionContext) -> u64 { + get_reference_tps(context.peer_type, context.max_stake) } } -// computes max_bitrate as a linear interpolation between TARGET_UNSTAKED_KBPS and -// TARGET_MAX_STAKED_KBPS based on stake amount -fn get_max_bitrate_kbps(peer_type: ConnectionPeerType, max_stake: u64) -> u64 { +/// computes refernce TPS as a linear interpolation between [`UNSTAKED_CONNECTION_REFERENCE_TPS`] and +/// [`MAX_STAKED_CONNECTION_REFERENCE_TPS`] based on the actual stake amount +fn get_reference_tps(peer_type: ConnectionPeerType, max_stake: u64) -> u64 { let stake = match peer_type { ConnectionPeerType::Unstaked => 0, ConnectionPeerType::Staked(peer_stake) => peer_stake, @@ -478,11 +480,11 @@ fn get_max_bitrate_kbps(peer_type: ConnectionPeerType, max_stake: u64) -> u64 { // real cluster always has staked nodes, but unittests may have 100% unstaked if max_stake == 0 { - return TARGET_UNSTAKED_KBPS; + return UNSTAKED_CONNECTION_REFERENCE_TPS; } - let max_rate = TARGET_MAX_STAKED_KBPS; - let min_rate = TARGET_UNSTAKED_KBPS; + let max_rate = MAX_STAKED_CONNECTION_REFERENCE_TPS; + let min_rate = UNSTAKED_CONNECTION_REFERENCE_TPS; // Linear interpolation between min and max as // stake approaches max_stake min_rate + ((stake as u128 * (max_rate - min_rate) as u128) / (max_stake as u128)) as u64 @@ -493,20 +495,21 @@ pub mod test { use super::*; #[test] - fn test_cacluate_receive_window_ratio_for_staked_node() { + fn test_cacluate_refernce_tps_for_staked_node() { let max_stake = 10000; - let rate = get_max_bitrate_kbps(ConnectionPeerType::Unstaked, max_stake); - assert_eq!(rate, TARGET_UNSTAKED_KBPS); + let rate = get_reference_tps(ConnectionPeerType::Unstaked, max_stake); + assert_eq!(rate, UNSTAKED_CONNECTION_REFERENCE_TPS); - let rate = get_max_bitrate_kbps(ConnectionPeerType::Staked(max_stake), max_stake); - assert_eq!(rate, TARGET_MAX_STAKED_KBPS); + let rate = get_reference_tps(ConnectionPeerType::Staked(max_stake), max_stake); + assert_eq!(rate, MAX_STAKED_CONNECTION_REFERENCE_TPS); - let rate = get_max_bitrate_kbps(ConnectionPeerType::Staked(max_stake / 2), max_stake); - let average_ratio = (TARGET_MAX_STAKED_KBPS + TARGET_UNSTAKED_KBPS) / 2; + let rate = get_reference_tps(ConnectionPeerType::Staked(max_stake / 2), max_stake); + let average_ratio = + (MAX_STAKED_CONNECTION_REFERENCE_TPS + UNSTAKED_CONNECTION_REFERENCE_TPS) / 2; assert_eq!(rate, average_ratio); - let rate = get_max_bitrate_kbps(ConnectionPeerType::Staked(max_stake + 10), max_stake); - assert_eq!(rate, TARGET_MAX_STAKED_KBPS); + let rate = get_reference_tps(ConnectionPeerType::Staked(max_stake + 10), max_stake); + assert_eq!(rate, MAX_STAKED_CONNECTION_REFERENCE_TPS); } } From 0b59170d0155471968e19be6841bb8b284b09125 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Thu, 6 Nov 2025 11:24:25 +0000 Subject: [PATCH 3/3] go back to 4x ratio between staked and unstaked --- streamer/src/nonblocking/quic.rs | 4 ++-- streamer/src/nonblocking/swqos.rs | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index c74ad0b3c82..720c8ca604d 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -460,8 +460,8 @@ fn compute_receive_window_and_max_streams(reference_tps: u64, rtt: Duration) -> let reference_transfer_rate = reference_tps * MEAN_TRANSACTION_SIZE as u64; // truncate here is safe since u64 millis is an eternity let rtt_milliseconds = (rtt.as_millis() as u64).clamp(MIN_ALLOWED_RTT_MS, MAX_ALLOWED_RTT_MS); - // Compute the receive window in bytes as reference_transfer_rate * rtt, - let receive_window = reference_transfer_rate * rtt_milliseconds; + // Compute the receive window in bytes as transfer_rate * rtt, + let receive_window = reference_transfer_rate * rtt_milliseconds / 1000; // hard constraint the RX window to avoid excess memory use let receive_window = receive_window.min(MAX_ALLOWED_RX_WINDOW as u64) as u32; // compute max_streams in flight diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index 29df8a05c8e..53642cf8b22 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -37,7 +37,7 @@ use { /// /// This needs to be constrained to ensure that we can admit a lot of unstaked /// connections without consuming too much RAM. -pub(crate) const UNSTAKED_CONNECTION_REFERENCE_TPS: u64 = 1250; +pub(crate) const UNSTAKED_CONNECTION_REFERENCE_TPS: u64 = 6000; /// Reference max TPS for a single staked connection with maximum /// stake amount through the cluster. Connections will get @@ -56,9 +56,8 @@ pub(crate) const UNSTAKED_CONNECTION_REFERENCE_TPS: u64 = 1250; /// as it takes time for them to receive feedback from the server and slow /// down. /// -/// Current value chosen to allow about 25000 TPS per connection (200K TPS -/// per staked identity if it uses all 8 connections). -pub(crate) const MAX_STAKED_CONNECTION_REFERENCE_TPS: u64 = UNSTAKED_CONNECTION_REFERENCE_TPS * 20; +/// Current value chosen based on legacy values (todo:increase it at least 5x) +pub(crate) const MAX_STAKED_CONNECTION_REFERENCE_TPS: u64 = UNSTAKED_CONNECTION_REFERENCE_TPS * 4; #[derive(Clone)] pub struct SwQosConfig {