From 7abf156240e0057df7b5104417e7d734daa1afa0 Mon Sep 17 00:00:00 2001 From: Alex Pyattaev Date: Wed, 19 Nov 2025 10:16:58 +0000 Subject: [PATCH] streamer: set fixed RX window for all connections --- streamer/src/nonblocking/quic.rs | 11 ++- streamer/src/nonblocking/simple_qos.rs | 22 +++++- streamer/src/nonblocking/swqos.rs | 105 ++----------------------- streamer/src/quic.rs | 28 ++++--- streamer/src/streamer.rs | 37 ++------- 5 files changed, 56 insertions(+), 147 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 6b96e0760be5e2..f46f401cf9a92d 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -83,6 +83,13 @@ const MAX_CONNECTION_BURST: u64 = 1000; /// peer, and is canceled when we get a Handshake packet from them. const QUIC_CONNECTION_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(2); +/// Absolute max RTT to allow for a legitimate connection. +/// Enough to cover any non-malicious link on Earth. +pub(crate) const MAX_RTT: Duration = Duration::from_millis(320); +/// Prevent connections from having 0 RTT when RTT is too small, +/// as this would break some BDP calculations and assign zero bandwidth +pub(crate) const MIN_RTT: Duration = Duration::from_millis(2); + // A struct to accumulate the bytes making up // a packet, along with their offsets, and the // packet metadata. We use this accumulator to avoid @@ -391,7 +398,7 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option { pub fn get_connection_stake( connection: &Connection, staked_nodes: &RwLock, -) -> Option<(Pubkey, u64, u64, u64, u64)> { +) -> Option<(Pubkey, u64, u64)> { let pubkey = get_remote_pubkey(connection)?; debug!("Peer public key is {pubkey:?}"); let staked_nodes = staked_nodes.read().unwrap(); @@ -399,8 +406,6 @@ pub fn get_connection_stake( pubkey, staked_nodes.get_node_stake(&pubkey)?, staked_nodes.total_stake(), - staked_nodes.max_stake(), - staked_nodes.min_stake(), )) } diff --git a/streamer/src/nonblocking/simple_qos.rs b/streamer/src/nonblocking/simple_qos.rs index bf5061e2fd9c15..352f4d93ade434 100644 --- a/streamer/src/nonblocking/simple_qos.rs +++ b/streamer/src/nonblocking/simple_qos.rs @@ -5,7 +5,7 @@ use { quic::{ get_connection_stake, update_open_connections_stat, ClientConnectionTracker, ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey, - ConnectionTableType, + ConnectionTableType, MAX_RTT, MIN_RTT, }, }, quic::{ @@ -14,7 +14,7 @@ use { }, streamer::StakedNodes, }, - quinn::Connection, + quinn::{Connection, VarInt}, solana_net_utils::token_bucket::TokenBucket, solana_time_utils as timing, std::{ @@ -32,6 +32,10 @@ use { tokio_util::sync::CancellationToken, }; +/// Allow for extra streams "in flight" on top of the nominal +/// send rate in case of bursty traffic from the sender side. +const STREAMS_IN_FLIGHT_MARGIN: u32 = 2; + #[derive(Clone)] pub struct SimpleQosConfig { pub max_streams_per_second: u64, @@ -85,8 +89,18 @@ impl SimpleQos { ) -> Result<(Arc, CancellationToken, Arc), ConnectionHandlerError> { let remote_addr = connection.remote_address(); + // this will never overflow u32 for reasonable MAX_RTT + let rtt = connection.rtt().clamp(MIN_RTT, MAX_RTT).as_millis() as u32; + let max_streams_in_flight = (self.config.max_streams_per_second as u32).saturating_mul(rtt) + / 1000 + * STREAMS_IN_FLIGHT_MARGIN; + // for very low values of max_streams_per_second, prevent connections from having zero + // streams in flight + let max_streams_in_flight = max_streams_in_flight.max(STREAMS_IN_FLIGHT_MARGIN); + connection.set_max_concurrent_uni_streams(VarInt::from_u32(max_streams_in_flight)); + debug!( - "Peer type {:?}, from peer {}", + "Peer type {:?}, from peer {}, max_streams {max_streams_in_flight}", conn_context.peer_type(), remote_addr, ); @@ -145,7 +159,7 @@ impl QosController for SimpleQos { let (peer_type, remote_pubkey, _total_stake) = get_connection_stake(connection, &self.staked_nodes).map_or( (ConnectionPeerType::Unstaked, None, 0), - |(pubkey, stake, total_stake, _max_stake, _min_stake)| { + |(pubkey, stake, total_stake)| { (ConnectionPeerType::Staked(stake), Some(pubkey), total_stake) }, ); diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index 73c5c926c533f1..99a8dd8582de1c 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -22,13 +22,10 @@ use { streamer::StakedNodes, }, percentage::Percentage, - quinn::{Connection, VarInt, VarIntBoundsExceeded}, - solana_packet::PACKET_DATA_SIZE, + quinn::{Connection, VarInt}, solana_quic_definitions::{ - QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, - QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, + QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, }, solana_time_utils as timing, std::{ @@ -87,8 +84,6 @@ pub struct SwQos { #[derive(Clone)] pub struct SwQosConnectionContext { peer_type: ConnectionPeerType, - max_stake: u64, - min_stake: u64, remote_pubkey: Option, total_stake: u64, in_staked_table: bool, @@ -135,49 +130,6 @@ impl SwQos { } } -/// Calculate the ratio for per connection receive window from a staked peer -fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 { - // Testing shows the maximum throughput from a connection is achieved at receive_window = - // PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the - // stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to - // QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r' - // for stake 's' is, - // r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find - // a and b. - - if stake > max_stake { - return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - } - - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; - if max_stake > min_stake { - let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; - let b = max_ratio as f64 - ((max_stake as f64) * a); - let ratio = (a * stake as f64) + b; - ratio.round() as u64 - } else { - QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO - } -} - -fn compute_recieve_window( - max_stake: u64, - min_stake: u64, - peer_type: ConnectionPeerType, -) -> Result { - match peer_type { - ConnectionPeerType::Unstaked => { - VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) - } - ConnectionPeerType::Staked(peer_stake) => { - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); - VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) - } - } -} - fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { match peer_type { ConnectionPeerType::Staked(peer_stake) => { @@ -226,18 +178,12 @@ impl SwQos { ) as u64) { let remote_addr = connection.remote_address(); - let receive_window = compute_recieve_window( - conn_context.max_stake, - conn_context.min_stake, - conn_context.peer_type(), - ); debug!( - "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}", + "Peer type {:?}, total stake {}, max streams {} from peer {}", conn_context.peer_type(), conn_context.total_stake, max_uni_streams.into_inner(), - receive_window, remote_addr, ); @@ -260,9 +206,6 @@ impl SwQos { update_open_connections_stat(&self.stats, &connection_table_l); drop(connection_table_l); - if let Ok(receive_window) = receive_window { - connection.set_receive_window(receive_window); - } connection.set_max_concurrent_uni_streams(max_uni_streams); Ok((last_update, cancel_connection, stream_counter)) @@ -350,8 +293,6 @@ impl QosController for SwQos { get_connection_stake(connection, &self.staked_nodes).map_or( SwQosConnectionContext { peer_type: ConnectionPeerType::Unstaked, - max_stake: 0, - min_stake: 0, total_stake: 0, remote_pubkey: None, in_staked_table: false, @@ -359,7 +300,7 @@ impl QosController for SwQos { stream_counter: None, last_update: Arc::new(AtomicU64::new(timing::timestamp())), }, - |(pubkey, stake, total_stake, max_stake, min_stake)| { + |(pubkey, stake, total_stake)| { // The heuristic is that the stake should be large enough to have 1 stream pass through within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. @@ -378,8 +319,6 @@ impl QosController for SwQos { SwQosConnectionContext { peer_type, - max_stake, - min_stake, total_stake, remote_pubkey: Some(pubkey), in_staked_table: false, @@ -576,40 +515,6 @@ impl QosController for SwQos { pub mod test { use super::*; - #[test] - fn test_cacluate_receive_window_ratio_for_staked_node() { - let mut max_stake = 10000; - let mut min_stake = 0; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake); - assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO); - - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - assert_eq!(ratio, max_ratio); - - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2); - let average_ratio = - (QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2; - assert_eq!(ratio, average_ratio); - - max_stake = 10000; - min_stake = 10000; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - assert_eq!(ratio, max_ratio); - - max_stake = 0; - min_stake = 0; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - assert_eq!(ratio, max_ratio); - - max_stake = 1000; - min_stake = 10; - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10); - assert_eq!(ratio, max_ratio); - } - #[test] fn test_max_allowed_uni_streams() { assert_eq!( diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 72b7477641631c..0e2305aeecf63d 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -12,15 +12,13 @@ use { pem::Pem, quinn::{ crypto::rustls::{NoInitialCipherSuite, QuicServerConfig}, - Endpoint, IdleTimeout, ServerConfig, + Endpoint, IdleTimeout, ServerConfig, VarInt, }, rustls::KeyLogFile, solana_keypair::Keypair, solana_packet::PACKET_DATA_SIZE, solana_perf::packet::PacketBatch, - solana_quic_definitions::{ - NotifyKeyUpdate, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - }, + solana_quic_definitions::{NotifyKeyUpdate, QUIC_MAX_TIMEOUT}, solana_tls_utils::{new_dummy_x509_certificate, tls_server_config_builder}, std::{ net::UdpSocket, @@ -57,6 +55,14 @@ pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8; // This will be adjusted and parameterized in follow-on PRs. pub const DEFAULT_QUIC_ENDPOINTS: usize = 1; +/// Allow for 8 MB QUIC connection receive window (MAX_DATA). This is sufficient to +/// support 200 Mbps upload rate at 320 ms RTT. It is unreasonable to expect a single +/// connection to require more bandwidth. This prevents MAX_DATA from affecting +/// the bitrate achieved by a single connection. Actual throttling is achieved based +/// on the number of concurrent streams. This does not affect the memory allocation +/// in Quinn, that is driven primarily by MAX_STREAMS, not MAX_DATA. +const CONNECTION_RECEIVE_WINDOW_BYTES: VarInt = VarInt::from_u32(8 * 1024 * 1024); + pub fn default_num_tpu_transaction_forward_receive_threads() -> usize { num_cpus::get().min(16) } @@ -100,18 +106,20 @@ pub(crate) fn configure_server( let config = Arc::get_mut(&mut server_config.transport).unwrap(); - // QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability - const MAX_CONCURRENT_UNI_STREAMS: u32 = - (QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS.saturating_mul(2)) as u32; - config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into()); + // Set STREAM_MAX_DATA to fit at most 1 transaction. + // This should match the maximal TX size. config.stream_receive_window((PACKET_DATA_SIZE as u32).into()); + // set the receive window really small initially to prevent the fresh connections + // from slamming us with traffic. config.receive_window((PACKET_DATA_SIZE as u32).into()); + // disable uni_streams until handshake is complete + config.max_concurrent_uni_streams(0u32.into()); + config.receive_window(CONNECTION_RECEIVE_WINDOW_BYTES); let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap(); config.max_idle_timeout(Some(timeout)); // disable bidi & datagrams - const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0; - config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into()); + config.max_concurrent_bidi_streams(0u32.into()); config.datagram_receive_buffer_size(None); // Disable GSO. The server only accepts inbound unidirectional streams initiated by clients, diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 074436bf0c45f0..0fb370db480245 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -11,7 +11,6 @@ use { }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, histogram::Histogram, - itertools::Itertools, solana_net_utils::{ multihomed_sockets::{ BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, @@ -83,8 +82,6 @@ pub struct StakedNodes { stakes: Arc>, overrides: HashMap, total_stake: u64, - max_stake: u64, - min_stake: u64, } pub type PacketBatchReceiver = Receiver; @@ -432,30 +429,24 @@ impl StreamerSendStats { } impl StakedNodes { - /// Calculate the stake stats: return the new (total_stake, min_stake and max_stake) tuple - fn calculate_stake_stats( - stakes: &Arc>, + fn calculate_total_stake( + stakes: &HashMap, overrides: &HashMap, - ) -> (u64, u64, u64) { - let values = stakes + ) -> u64 { + stakes .iter() .filter(|(pubkey, _)| !overrides.contains_key(pubkey)) .map(|(_, &stake)| stake) .chain(overrides.values().copied()) - .filter(|&stake| stake > 0); - let total_stake = values.clone().sum(); - let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default(); - (total_stake, min_stake, max_stake) + .sum() } pub fn new(stakes: Arc>, overrides: HashMap) -> Self { - let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); + let total_stake = Self::calculate_total_stake(&stakes, &overrides); Self { stakes, overrides, total_stake, - max_stake, - min_stake, } } @@ -472,24 +463,10 @@ impl StakedNodes { self.total_stake } - #[inline] - pub(super) fn min_stake(&self) -> u64 { - self.min_stake - } - - #[inline] - pub(super) fn max_stake(&self) -> u64 { - self.max_stake - } - // Update the stake map given a new stakes map pub fn update_stake_map(&mut self, stakes: Arc>) { - let (total_stake, min_stake, max_stake) = - Self::calculate_stake_stats(&stakes, &self.overrides); - + let total_stake = Self::calculate_total_stake(&stakes, &self.overrides); self.total_stake = total_stake; - self.min_stake = min_stake; - self.max_stake = max_stake; self.stakes = stakes; } }