diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index b638154864240e..c1f2a419f191e3 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -16,6 +16,7 @@ use { indexmap::map::{Entry, IndexMap}, percentage::Percentage, quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, + quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, smallvec::SmallVec, solana_keypair::Keypair, @@ -23,15 +24,23 @@ use { solana_packet::{Meta, PACKET_DATA_SIZE}, solana_perf::packet::{BytesPacket, BytesPacketBatch, PacketBatch, PACKETS_PER_BATCH}, solana_pubkey::Pubkey, + solana_quic_definitions::{ + QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, + QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, + }, solana_signature::Signature, solana_time_utils as timing, solana_tls_utils::get_pubkey_from_tls_certificate, solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ - array, fmt, + array, + fmt, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, pin::Pin, + // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, @@ -67,29 +76,15 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped"; const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2; const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed"; +const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3; +const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stream_count"; + const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5; const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream"; -/// Target bitrate for an unstaked connection -const TARGET_UNSTAKED_KBPS: u64 = 5000; // about 200 TPS - -/// Target bitrate for a staked connection with maximum -/// stake amount through the cluster -const TARGET_MAX_STAKED_KBPS: u64 = TARGET_UNSTAKED_KBPS * 10; // about 2000 TPS - -/// Maximal allowed RTT for SWQOS calculations (to limit abuse) -const MAX_ALLOWED_RTT: Duration = Duration::from_millis(300); - -/// Maximal possible amount of streams to allocate per connection -const MAX_ALLOWED_UNI_STREAMS: u64 = 1024; - -/// Expected mean size of a transaction for the purpose of -/// receive window => streams conversion -const MEAN_TRANSACTION_SIZE: usize = 400; - /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted /// later. @@ -454,7 +449,7 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option { fn get_connection_stake( connection: &Connection, staked_nodes: &RwLock, -) -> Option<(Pubkey, u64, u64, u64)> { +) -> Option<(Pubkey, u64, u64, u64, u64)> { let pubkey = get_remote_pubkey(connection)?; debug!("Peer public key is {pubkey:?}"); let staked_nodes = staked_nodes.read().unwrap(); @@ -463,11 +458,40 @@ fn get_connection_stake( staked_nodes.get_node_stake(&pubkey)?, staked_nodes.total_stake(), staked_nodes.max_stake(), + staked_nodes.min_stake(), )) } +fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { + match peer_type { + ConnectionPeerType::Staked(peer_stake) => { + // No checked math for f64 type. So let's explicitly check for 0 here + if total_stake == 0 || peer_stake > total_stake { + warn!( + "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \ + {total_stake:?}" + ); + + QUIC_MIN_STAKED_CONCURRENT_STREAMS + } else { + let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS + - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + + (((peer_stake as f64 / total_stake as f64) * delta) as usize + + QUIC_MIN_STAKED_CONCURRENT_STREAMS) + .clamp( + QUIC_MIN_STAKED_CONCURRENT_STREAMS, + QUIC_MAX_STAKED_CONCURRENT_STREAMS, + ) + } + } + ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, + } +} + enum ConnectionHandlerError { ConnectionAddError, + MaxStreamError, } #[derive(Clone)] @@ -479,6 +503,7 @@ struct NewConnectionHandlerParams { max_connections_per_peer: usize, stats: Arc, max_stake: u64, + min_stake: u64, } impl NewConnectionHandlerParams { @@ -495,6 +520,7 @@ impl NewConnectionHandlerParams { max_connections_per_peer, stats, max_stake: 0, + min_stake: 0, } } } @@ -508,44 +534,71 @@ fn handle_and_cache_new_connection( wait_for_chunk_timeout: Duration, stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { - let remote_addr = connection.remote_address(); - - debug!( - "Peer type {:?}, total stake {}, from peer {}", - params.peer_type, params.total_stake, remote_addr, - ); - - if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l - .try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), - remote_addr.port(), - client_connection_tracker, - Some(connection.clone()), - params.peer_type, - timing::timestamp(), - params.max_connections_per_peer, - ) + if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( + params.peer_type, + params.total_stake, + ) as u64) { - drop(connection_table_l); + let remote_addr = connection.remote_address(); + let receive_window = + compute_recieve_window(params.max_stake, params.min_stake, params.peer_type); - tokio::spawn(handle_connection( - connection, + debug!( + "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}", + params.peer_type, + params.total_stake, + max_uni_streams.into_inner(), + receive_window, remote_addr, - last_update, - connection_table, - cancel_connection, - params.clone(), - wait_for_chunk_timeout, - stream_load_ema, - stream_counter, - )); - Ok(()) + ); + + if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), + remote_addr.port(), + client_connection_tracker, + Some(connection.clone()), + params.peer_type, + timing::timestamp(), + params.max_connections_per_peer, + ) + { + drop(connection_table_l); + + if let Ok(receive_window) = receive_window { + connection.set_receive_window(receive_window); + } + connection.set_max_concurrent_uni_streams(max_uni_streams); + + tokio::spawn(handle_connection( + connection, + remote_addr, + last_update, + connection_table, + cancel_connection, + params.clone(), + wait_for_chunk_timeout, + stream_load_ema, + stream_counter, + )); + Ok(()) + } else { + params + .stats + .connection_add_failed + .fetch_add(1, Ordering::Relaxed); + Err(ConnectionHandlerError::ConnectionAddError) + } } else { + connection.close( + CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(), + CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, + ); params .stats - .connection_add_failed + .connection_add_failed_invalid_stream_count .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::ConnectionAddError) + Err(ConnectionHandlerError::MaxStreamError) } } @@ -581,30 +634,47 @@ async fn prune_unstaked_connections_and_add_new_connection( } } -/// Calculate the intended bandwidth allocation for a given peer in kbps -fn compute_max_receive_rate_kbps(max_stake: u64, peer: ConnectionPeerType) -> u64 { - let stake = match peer { - ConnectionPeerType::Unstaked => 0, - ConnectionPeerType::Staked(peer_stake) => peer_stake, - }; - - if stake >= max_stake { - return TARGET_MAX_STAKED_KBPS; +/// Calculate the ratio for per connection receive window from a staked peer +fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 { + // Testing shows the maximum througput from a connection is achieved at receive_window = + // PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the + // stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to + // QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r' + // for stake 's' is, + // r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find + // a and b. + + if stake > max_stake { + return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; } - let max_rate = TARGET_MAX_STAKED_KBPS; - let min_rate = TARGET_UNSTAKED_KBPS; - // Linear interpolation between min and max as - // stake approaches max_stake - min_rate + (stake * (max_rate - min_rate) + max_stake / 2) / max_stake + let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; + if max_stake > min_stake { + let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; + let b = max_ratio as f64 - ((max_stake as f64) * a); + let ratio = (a * stake as f64) + b; + ratio.round() as u64 + } else { + QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + } } -/// Compute the RX window based on bandwidth-delay-product -fn compute_receive_window_bdp(max_receive_rate_kbps: u64, rtt: Duration) -> VarInt { - // max(1) is needed on localhost to avoid zero result - let millis = rtt.as_millis().max(1).min(MAX_ALLOWED_RTT.as_millis()) as u64; - let receive_window = (max_receive_rate_kbps * millis) / 8; - VarInt::from_u64(receive_window).unwrap_or(VarInt::MAX) +fn compute_recieve_window( + max_stake: u64, + min_stake: u64, + peer_type: ConnectionPeerType, +) -> Result { + match peer_type { + ConnectionPeerType::Unstaked => { + VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) + } + ConnectionPeerType::Staked(peer_stake) => { + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); + VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) + } + } } #[allow(clippy::too_many_arguments)] @@ -669,7 +739,7 @@ async fn setup_connection( max_connections_per_peer, stats.clone(), ), - |(pubkey, stake, total_stake, max_stake)| { + |(pubkey, stake, total_stake, max_stake, min_stake)| { // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. let min_stake_ratio = @@ -689,6 +759,7 @@ async fn setup_connection( max_connections_per_peer, stats: stats.clone(), max_stake, + min_stake, } }, ); @@ -988,26 +1059,15 @@ async fn handle_connection( .. } = params; - connection.set_max_concurrent_bi_streams(VarInt::from_u32(0)); - let max_receive_rate_kbps = compute_max_receive_rate_kbps(params.max_stake, params.peer_type); - let initial_rx_window = compute_receive_window_bdp(max_receive_rate_kbps, connection.rtt()); - connection.set_receive_window(initial_rx_window); - connection.set_max_concurrent_uni_streams( - VarInt::from_u64( - (initial_rx_window.into_inner() / MEAN_TRANSACTION_SIZE as u64) - .min(MAX_ALLOWED_UNI_STREAMS), - ) - .expect("dividing VarInt by positive integer will never overflow VarInt"), - ); debug!( - "quic new connection {remote_addr}, max_receive_rate {max_receive_rate_kbps} Kbps (receive_window= {initial_rx_window} RTT={rtt}ms), streams: {streams} connections: {connections}", - rtt = connection.rtt().as_millis(), - streams=stats.total_streams.load(Ordering::Relaxed), - connections=stats.total_connections.load(Ordering::Relaxed), + "quic new connection {} streams: {} connections: {}", + remote_addr, + stats.total_streams.load(Ordering::Relaxed), + stats.total_connections.load(Ordering::Relaxed), ); stats.total_connections.fetch_add(1, Ordering::Relaxed); - 'conn: for stream_number in 0u64.. { + 'conn: loop { // Wait for new streams. If the peer is disconnected we get a cancellation signal and stop // the connection task. let mut stream = select! { @@ -1099,11 +1159,7 @@ async fn handle_connection( } // timeout elapsed Err(_) => { - debug!( - "Timeout in receiving on stream {} from {}", - stream.id(), - connection.remote_address() - ); + debug!("Timeout in receiving on stream"); stats .total_stream_read_timeouts .fetch_add(1, Ordering::Relaxed); @@ -1141,14 +1197,6 @@ async fn handle_connection( stats.total_streams.fetch_sub(1, Ordering::Relaxed); stream_load_ema.update_ema_if_needed(); - if (stream_number % 128) == 0 { - let new_window = compute_receive_window_bdp(max_receive_rate_kbps, connection.rtt()); - trace!("Updating receive window for {remote_addr:?} to {new_window:?} based on rtt {:?} and target bitrate {} kbps", - connection.rtt(), max_receive_rate_kbps); - connection.set_receive_window(new_window); - // we do not update number of allowed streams here since - // it may cause extra allocations. - } } let stable_id = connection.stable_id(); @@ -1510,9 +1558,12 @@ pub mod test { use { super::*, crate::{ - nonblocking::testing_utilities::{ - check_multiple_streams, get_client_config, make_client_endpoint, setup_quic_server, - SpawnTestServerResult, + nonblocking::{ + quic::compute_max_allowed_uni_streams, + testing_utilities::{ + check_multiple_streams, get_client_config, make_client_endpoint, + setup_quic_server, SpawnTestServerResult, + }, }, quic::DEFAULT_TPU_COALESCE, }, @@ -2272,22 +2323,65 @@ pub mod test { } #[test] - fn test_cacluate_receive_window_ratio_for_staked_node() { - let max_stake = 10000; - let rate = compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Unstaked); - assert_eq!(rate, TARGET_UNSTAKED_KBPS); - let rate = compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake)); - assert_eq!(rate, TARGET_MAX_STAKED_KBPS); - - let rate = - compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake / 2)); - let average_ratio = (TARGET_MAX_STAKED_KBPS + TARGET_UNSTAKED_KBPS) / 2; - assert_eq!(rate, average_ratio); + fn test_max_allowed_uni_streams() { + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), + QUIC_MIN_STAKED_CONCURRENT_STREAMS + ); + let delta = + (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), + QUIC_MAX_STAKED_CONCURRENT_STREAMS, + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), + ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) + .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) + ); + assert_eq!( + compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), + QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS + ); + } - let rate = - compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake + 10)); - assert_eq!(rate, TARGET_MAX_STAKED_KBPS); + #[test] + fn test_cacluate_receive_window_ratio_for_staked_node() { + let mut max_stake = 10000; + let mut min_stake = 0; + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake); + assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO); + + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); + let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; + assert_eq!(ratio, max_ratio); + + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2); + let average_ratio = + (QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2; + assert_eq!(ratio, average_ratio); + + max_stake = 10000; + min_stake = 10000; + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); + assert_eq!(ratio, max_ratio); + + max_stake = 0; + min_stake = 0; + let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); + assert_eq!(ratio, max_ratio); + + max_stake = 1000; + min_stake = 10; + let ratio = + compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10); + assert_eq!(ratio, max_ratio); } #[tokio::test(flavor = "multi_thread")] diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index a7f13ba14ff2ea..c4770b44b427b1 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -55,7 +55,6 @@ impl StakedStreamLoadEMA { } else { 0 }; - dbg!(max_unstaked_load_in_throttling_window); Self { current_load_ema: AtomicU64::default(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 83ddec37261250..00b3d71ce95a90 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -11,6 +11,7 @@ use { }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, histogram::Histogram, + itertools::Itertools, solana_net_utils::multihomed_sockets::{ BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, SocketProvider, }, @@ -80,6 +81,7 @@ pub struct StakedNodes { overrides: HashMap, total_stake: u64, max_stake: u64, + min_stake: u64, } pub type PacketBatchReceiver = Receiver; @@ -427,11 +429,11 @@ impl StreamerSendStats { } impl StakedNodes { - /// Calculate the stake stats: return the new (total_stake and max_stake) tuple + /// Calculate the stake stats: return the new (total_stake, min_stake and max_stake) tuple fn calculate_stake_stats( stakes: &Arc>, overrides: &HashMap, - ) -> (u64, u64) { + ) -> (u64, u64, u64) { let values = stakes .iter() .filter(|(pubkey, _)| !overrides.contains_key(pubkey)) @@ -439,17 +441,18 @@ impl StakedNodes { .chain(overrides.values().copied()) .filter(|&stake| stake > 0); let total_stake = values.clone().sum(); - let max_stake = values.max().unwrap_or_default(); - (total_stake, max_stake) + let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default(); + (total_stake, min_stake, max_stake) } pub fn new(stakes: Arc>, overrides: HashMap) -> Self { - let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); + let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); Self { stakes, overrides, total_stake, max_stake, + min_stake, } } @@ -466,6 +469,11 @@ impl StakedNodes { self.total_stake } + #[inline] + pub(super) fn min_stake(&self) -> u64 { + self.min_stake + } + #[inline] pub(super) fn max_stake(&self) -> u64 { self.max_stake @@ -473,9 +481,11 @@ impl StakedNodes { // Update the stake map given a new stakes map pub fn update_stake_map(&mut self, stakes: Arc>) { - let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &self.overrides); + let (total_stake, min_stake, max_stake) = + Self::calculate_stake_stats(&stakes, &self.overrides); self.total_stake = total_stake; + self.min_stake = min_stake; self.max_stake = max_stake; self.stakes = stakes; }