diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index c1f2a419f191e3..b638154864240e 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -16,7 +16,6 @@ use { indexmap::map::{Entry, IndexMap}, percentage::Percentage, quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, - quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, smallvec::SmallVec, solana_keypair::Keypair, @@ -24,23 +23,15 @@ use { solana_packet::{Meta, PACKET_DATA_SIZE}, solana_perf::packet::{BytesPacket, BytesPacketBatch, PacketBatch, PACKETS_PER_BATCH}, solana_pubkey::Pubkey, - solana_quic_definitions::{ - QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO, - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS, - QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO, - }, solana_signature::Signature, solana_time_utils as timing, solana_tls_utils::get_pubkey_from_tls_certificate, solana_transaction_metrics_tracker::signature_if_should_track_packet, std::{ - array, - fmt, + array, fmt, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, pin::Pin, - // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, @@ -76,15 +67,29 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped"; const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2; const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed"; -const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3; -const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stream_count"; - const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5; const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream"; +/// Target bitrate for an unstaked connection +const TARGET_UNSTAKED_KBPS: u64 = 5000; // about 200 TPS + +/// Target bitrate for a staked connection with maximum +/// stake amount through the cluster +const TARGET_MAX_STAKED_KBPS: u64 = TARGET_UNSTAKED_KBPS * 10; // about 2000 TPS + +/// Maximal allowed RTT for SWQOS calculations (to limit abuse) +const MAX_ALLOWED_RTT: Duration = Duration::from_millis(300); + +/// Maximal possible amount of streams to allocate per connection +const MAX_ALLOWED_UNI_STREAMS: u64 = 1024; + +/// Expected mean size of a transaction for the purpose of +/// receive window => streams conversion +const MEAN_TRANSACTION_SIZE: usize = 400; + /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted /// later. @@ -449,7 +454,7 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option { fn get_connection_stake( connection: &Connection, staked_nodes: &RwLock, -) -> Option<(Pubkey, u64, u64, u64, u64)> { +) -> Option<(Pubkey, u64, u64, u64)> { let pubkey = get_remote_pubkey(connection)?; debug!("Peer public key is {pubkey:?}"); let staked_nodes = staked_nodes.read().unwrap(); @@ -458,40 +463,11 @@ fn get_connection_stake( staked_nodes.get_node_stake(&pubkey)?, staked_nodes.total_stake(), staked_nodes.max_stake(), - staked_nodes.min_stake(), )) } -fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize { - match peer_type { - ConnectionPeerType::Staked(peer_stake) => { - // No checked math for f64 type. So let's explicitly check for 0 here - if total_stake == 0 || peer_stake > total_stake { - warn!( - "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \ - {total_stake:?}" - ); - - QUIC_MIN_STAKED_CONCURRENT_STREAMS - } else { - let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - - (((peer_stake as f64 / total_stake as f64) * delta) as usize - + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .clamp( - QUIC_MIN_STAKED_CONCURRENT_STREAMS, - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ) - } - } - ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, - } -} - enum ConnectionHandlerError { ConnectionAddError, - MaxStreamError, } #[derive(Clone)] @@ -503,7 +479,6 @@ struct NewConnectionHandlerParams { max_connections_per_peer: usize, stats: Arc, max_stake: u64, - min_stake: u64, } impl NewConnectionHandlerParams { @@ -520,7 +495,6 @@ impl NewConnectionHandlerParams { max_connections_per_peer, stats, max_stake: 0, - min_stake: 0, } } } @@ -534,71 +508,44 @@ fn handle_and_cache_new_connection( wait_for_chunk_timeout: Duration, stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { - if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( - params.peer_type, - params.total_stake, - ) as u64) - { - let remote_addr = connection.remote_address(); - let receive_window = - compute_recieve_window(params.max_stake, params.min_stake, params.peer_type); - - debug!( - "Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}", - params.peer_type, - params.total_stake, - max_uni_streams.into_inner(), - receive_window, - remote_addr, - ); + let remote_addr = connection.remote_address(); - if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l - .try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), - remote_addr.port(), - client_connection_tracker, - Some(connection.clone()), - params.peer_type, - timing::timestamp(), - params.max_connections_per_peer, - ) - { - drop(connection_table_l); + debug!( + "Peer type {:?}, total stake {}, from peer {}", + params.peer_type, params.total_stake, remote_addr, + ); - if let Ok(receive_window) = receive_window { - connection.set_receive_window(receive_window); - } - connection.set_max_concurrent_uni_streams(max_uni_streams); + if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), + remote_addr.port(), + client_connection_tracker, + Some(connection.clone()), + params.peer_type, + timing::timestamp(), + params.max_connections_per_peer, + ) + { + drop(connection_table_l); - tokio::spawn(handle_connection( - connection, - remote_addr, - last_update, - connection_table, - cancel_connection, - params.clone(), - wait_for_chunk_timeout, - stream_load_ema, - stream_counter, - )); - Ok(()) - } else { - params - .stats - .connection_add_failed - .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::ConnectionAddError) - } + tokio::spawn(handle_connection( + connection, + remote_addr, + last_update, + connection_table, + cancel_connection, + params.clone(), + wait_for_chunk_timeout, + stream_load_ema, + stream_counter, + )); + Ok(()) } else { - connection.close( - CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(), - CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT, - ); params .stats - .connection_add_failed_invalid_stream_count + .connection_add_failed .fetch_add(1, Ordering::Relaxed); - Err(ConnectionHandlerError::MaxStreamError) + Err(ConnectionHandlerError::ConnectionAddError) } } @@ -634,47 +581,30 @@ async fn prune_unstaked_connections_and_add_new_connection( } } -/// Calculate the ratio for per connection receive window from a staked peer -fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 { - // Testing shows the maximum througput from a connection is achieved at receive_window = - // PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the - // stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to - // QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r' - // for stake 's' is, - // r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find - // a and b. - - if stake > max_stake { - return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - } +/// Calculate the intended bandwidth allocation for a given peer in kbps +fn compute_max_receive_rate_kbps(max_stake: u64, peer: ConnectionPeerType) -> u64 { + let stake = match peer { + ConnectionPeerType::Unstaked => 0, + ConnectionPeerType::Staked(peer_stake) => peer_stake, + }; - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO; - if max_stake > min_stake { - let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64; - let b = max_ratio as f64 - ((max_stake as f64) * a); - let ratio = (a * stake as f64) + b; - ratio.round() as u64 - } else { - QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + if stake >= max_stake { + return TARGET_MAX_STAKED_KBPS; } + + let max_rate = TARGET_MAX_STAKED_KBPS; + let min_rate = TARGET_UNSTAKED_KBPS; + // Linear interpolation between min and max as + // stake approaches max_stake + min_rate + (stake * (max_rate - min_rate) + max_stake / 2) / max_stake } -fn compute_recieve_window( - max_stake: u64, - min_stake: u64, - peer_type: ConnectionPeerType, -) -> Result { - match peer_type { - ConnectionPeerType::Unstaked => { - VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) - } - ConnectionPeerType::Staked(peer_stake) => { - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake); - VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio) - } - } +/// Compute the RX window based on bandwidth-delay-product +fn compute_receive_window_bdp(max_receive_rate_kbps: u64, rtt: Duration) -> VarInt { + // max(1) is needed on localhost to avoid zero result + let millis = rtt.as_millis().max(1).min(MAX_ALLOWED_RTT.as_millis()) as u64; + let receive_window = (max_receive_rate_kbps * millis) / 8; + VarInt::from_u64(receive_window).unwrap_or(VarInt::MAX) } #[allow(clippy::too_many_arguments)] @@ -739,7 +669,7 @@ async fn setup_connection( max_connections_per_peer, stats.clone(), ), - |(pubkey, stake, total_stake, max_stake, min_stake)| { + |(pubkey, stake, total_stake, max_stake)| { // The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle // interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams. let min_stake_ratio = @@ -759,7 +689,6 @@ async fn setup_connection( max_connections_per_peer, stats: stats.clone(), max_stake, - min_stake, } }, ); @@ -1059,15 +988,26 @@ async fn handle_connection( .. } = params; + connection.set_max_concurrent_bi_streams(VarInt::from_u32(0)); + let max_receive_rate_kbps = compute_max_receive_rate_kbps(params.max_stake, params.peer_type); + let initial_rx_window = compute_receive_window_bdp(max_receive_rate_kbps, connection.rtt()); + connection.set_receive_window(initial_rx_window); + connection.set_max_concurrent_uni_streams( + VarInt::from_u64( + (initial_rx_window.into_inner() / MEAN_TRANSACTION_SIZE as u64) + .min(MAX_ALLOWED_UNI_STREAMS), + ) + .expect("dividing VarInt by positive integer will never overflow VarInt"), + ); debug!( - "quic new connection {} streams: {} connections: {}", - remote_addr, - stats.total_streams.load(Ordering::Relaxed), - stats.total_connections.load(Ordering::Relaxed), + "quic new connection {remote_addr}, max_receive_rate {max_receive_rate_kbps} Kbps (receive_window= {initial_rx_window} RTT={rtt}ms), streams: {streams} connections: {connections}", + rtt = connection.rtt().as_millis(), + streams=stats.total_streams.load(Ordering::Relaxed), + connections=stats.total_connections.load(Ordering::Relaxed), ); stats.total_connections.fetch_add(1, Ordering::Relaxed); - 'conn: loop { + 'conn: for stream_number in 0u64.. { // Wait for new streams. If the peer is disconnected we get a cancellation signal and stop // the connection task. let mut stream = select! { @@ -1159,7 +1099,11 @@ async fn handle_connection( } // timeout elapsed Err(_) => { - debug!("Timeout in receiving on stream"); + debug!( + "Timeout in receiving on stream {} from {}", + stream.id(), + connection.remote_address() + ); stats .total_stream_read_timeouts .fetch_add(1, Ordering::Relaxed); @@ -1197,6 +1141,14 @@ async fn handle_connection( stats.total_streams.fetch_sub(1, Ordering::Relaxed); stream_load_ema.update_ema_if_needed(); + if (stream_number % 128) == 0 { + let new_window = compute_receive_window_bdp(max_receive_rate_kbps, connection.rtt()); + trace!("Updating receive window for {remote_addr:?} to {new_window:?} based on rtt {:?} and target bitrate {} kbps", + connection.rtt(), max_receive_rate_kbps); + connection.set_receive_window(new_window); + // we do not update number of allowed streams here since + // it may cause extra allocations. + } } let stable_id = connection.stable_id(); @@ -1558,12 +1510,9 @@ pub mod test { use { super::*, crate::{ - nonblocking::{ - quic::compute_max_allowed_uni_streams, - testing_utilities::{ - check_multiple_streams, get_client_config, make_client_endpoint, - setup_quic_server, SpawnTestServerResult, - }, + nonblocking::testing_utilities::{ + check_multiple_streams, get_client_config, make_client_endpoint, setup_quic_server, + SpawnTestServerResult, }, quic::DEFAULT_TPU_COALESCE, }, @@ -2323,65 +2272,22 @@ pub mod test { } #[test] + fn test_cacluate_receive_window_ratio_for_staked_node() { + let max_stake = 10000; + let rate = compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Unstaked); + assert_eq!(rate, TARGET_UNSTAKED_KBPS); - fn test_max_allowed_uni_streams() { - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0), - QUIC_MIN_STAKED_CONCURRENT_STREAMS - ); - let delta = - (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64; - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000), - QUIC_MAX_STAKED_CONCURRENT_STREAMS, - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000), - ((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS) - .min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) - ); - assert_eq!( - compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000), - QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS - ); - } + let rate = compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake)); + assert_eq!(rate, TARGET_MAX_STAKED_KBPS); - #[test] - fn test_cacluate_receive_window_ratio_for_staked_node() { - let mut max_stake = 10000; - let mut min_stake = 0; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake); - assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO); - - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO; - assert_eq!(ratio, max_ratio); - - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2); - let average_ratio = - (QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2; - assert_eq!(ratio, average_ratio); - - max_stake = 10000; - min_stake = 10000; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - assert_eq!(ratio, max_ratio); - - max_stake = 0; - min_stake = 0; - let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake); - assert_eq!(ratio, max_ratio); - - max_stake = 1000; - min_stake = 10; - let ratio = - compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10); - assert_eq!(ratio, max_ratio); + let rate = + compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake / 2)); + let average_ratio = (TARGET_MAX_STAKED_KBPS + TARGET_UNSTAKED_KBPS) / 2; + assert_eq!(rate, average_ratio); + + let rate = + compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake + 10)); + assert_eq!(rate, TARGET_MAX_STAKED_KBPS); } #[tokio::test(flavor = "multi_thread")] diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index c4770b44b427b1..a7f13ba14ff2ea 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -55,6 +55,7 @@ impl StakedStreamLoadEMA { } else { 0 }; + dbg!(max_unstaked_load_in_throttling_window); Self { current_load_ema: AtomicU64::default(), diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 00b3d71ce95a90..83ddec37261250 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -11,7 +11,6 @@ use { }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError}, histogram::Histogram, - itertools::Itertools, solana_net_utils::multihomed_sockets::{ BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, SocketProvider, }, @@ -81,7 +80,6 @@ pub struct StakedNodes { overrides: HashMap, total_stake: u64, max_stake: u64, - min_stake: u64, } pub type PacketBatchReceiver = Receiver; @@ -429,11 +427,11 @@ impl StreamerSendStats { } impl StakedNodes { - /// Calculate the stake stats: return the new (total_stake, min_stake and max_stake) tuple + /// Calculate the stake stats: return the new (total_stake and max_stake) tuple fn calculate_stake_stats( stakes: &Arc>, overrides: &HashMap, - ) -> (u64, u64, u64) { + ) -> (u64, u64) { let values = stakes .iter() .filter(|(pubkey, _)| !overrides.contains_key(pubkey)) @@ -441,18 +439,17 @@ impl StakedNodes { .chain(overrides.values().copied()) .filter(|&stake| stake > 0); let total_stake = values.clone().sum(); - let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default(); - (total_stake, min_stake, max_stake) + let max_stake = values.max().unwrap_or_default(); + (total_stake, max_stake) } pub fn new(stakes: Arc>, overrides: HashMap) -> Self { - let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); + let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides); Self { stakes, overrides, total_stake, max_stake, - min_stake, } } @@ -469,11 +466,6 @@ impl StakedNodes { self.total_stake } - #[inline] - pub(super) fn min_stake(&self) -> u64 { - self.min_stake - } - #[inline] pub(super) fn max_stake(&self) -> u64 { self.max_stake @@ -481,11 +473,9 @@ impl StakedNodes { // Update the stake map given a new stakes map pub fn update_stake_map(&mut self, stakes: Arc>) { - let (total_stake, min_stake, max_stake) = - Self::calculate_stake_stats(&stakes, &self.overrides); + let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &self.overrides); self.total_stake = total_stake; - self.min_stake = min_stake; self.max_stake = max_stake; self.stakes = stakes; }