diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 7329e426b96..253eb07f277 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -27,6 +27,12 @@ const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10; const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; +const STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 90; + +// Unstaked nodes must contribute to the EMA load for this threshold to be meaningful. +// See increment_load(). +const UNSTAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 0; + pub(crate) struct StakedStreamLoadEMA { current_load_ema: AtomicU64, load_in_recent_interval: AtomicU64, @@ -40,6 +46,11 @@ pub(crate) struct StakedStreamLoadEMA { // Maximum number of streams for an unstaked connection in stream throttling window max_unstaked_load_in_throttling_window: u64, max_streams_per_ms: u64, + + // No throttling for staked connections below this load. + staked_stream_throttling_load_threshold: u64, + // No throttling for unstaked connections below this load. + unstaked_stream_throttling_load_threshold: u64, } impl StakedStreamLoadEMA { @@ -63,6 +74,14 @@ impl StakedStreamLoadEMA { 0 }; + let staked_stream_throttling_load_threshold = + Percentage::from(STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT) + .apply_to(max_staked_load_in_ema_window); + + let unstaked_stream_throttling_load_threshold = + Percentage::from(UNSTAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT) + .apply_to(max_staked_load_in_ema_window); + Self { current_load_ema: AtomicU64::default(), load_in_recent_interval: AtomicU64::default(), @@ -71,6 +90,8 @@ impl StakedStreamLoadEMA { max_staked_load_in_ema_window, max_unstaked_load_in_throttling_window, max_streams_per_ms, + staked_stream_throttling_load_threshold, + unstaked_stream_throttling_load_threshold, } } @@ -148,13 +169,42 @@ impl StakedStreamLoadEMA { &self, peer_type: ConnectionPeerType, total_stake: u64, + ) -> u64 { + let current_load = self + .current_load_ema + .load(Ordering::Relaxed) + // translating from streams/STREAM_LOAD_EMA_INTERVAL_MS to streams/EMA_WINDOW_MS + .saturating_mul(STREAM_LOAD_EMA_INTERVAL_COUNT); + match peer_type { + ConnectionPeerType::Unstaked => { + if current_load < self.unstaked_stream_throttling_load_threshold { + MAX_UNSTAKED_TPS + } else { + self.available_throttled_load_capacity(peer_type, total_stake, current_load) + } + } + ConnectionPeerType::Staked(_) => { + if current_load < self.staked_stream_throttling_load_threshold { + self.max_staked_load_in_ema_window + } else { + self.available_throttled_load_capacity(peer_type, total_stake, current_load) + } + } + } + } + + pub fn available_throttled_load_capacity( + &self, + peer_type: ConnectionPeerType, + total_stake: u64, + current_load: u64, ) -> u64 { match peer_type { ConnectionPeerType::Unstaked => self.max_unstaked_load_in_throttling_window, ConnectionPeerType::Staked(stake) => { // If the current load is low, cap it to 25% of max_load. let current_load = u128::from(cmp::max( - self.current_load_ema.load(Ordering::Relaxed), + current_load, self.max_staked_load_in_ema_window / 4, )); @@ -296,9 +346,10 @@ pub mod test { )); // 50K packets per ms * 20% / 500 max unstaked connections assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Unstaked, 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 20 ); @@ -324,9 +375,10 @@ pub mod test { // ema_load = 20K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 15 / 10K = 60 assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 60 ); @@ -334,9 +386,10 @@ pub mod test { // ema_load = 20K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 1K / 10K = 4K assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 4000 ); @@ -345,9 +398,10 @@ pub mod test { // ema_load = 5K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 15 / 10K = 240 assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 240 ); @@ -355,9 +409,10 @@ pub mod test { // ema_load = 5K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 1K / 10K = 16000 assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 16000 ); @@ -367,18 +422,20 @@ pub mod test { load_ema.current_load_ema.store(4000, Ordering::Relaxed); // function = ((20K * 20K) / 25% of 20K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 240 ); // function = ((20K * 20K) / 25% of 20K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 16000 ); @@ -386,9 +443,10 @@ pub mod test { // At 1/40000 stake weight, and minimum load, it should still allow // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1), 40000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), load_ema .max_unstaked_load_in_throttling_window @@ -417,58 +475,64 @@ pub mod test { // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 20K) * 15 / 10K = 93.75 // Loss of precision occurs here because max streams is computed for 50ms window and then doubled. assert!( - (92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration( + (92u64..=94).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) )) ); // ema_load = 20K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 20K) * 1K / 10K = 6250 - assert!((6249u64..=6250).contains( - &load_ema.available_load_capacity_in_throttling_duration( + assert!( + (6249u64..=6250).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), - 10000 - ) - )); + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) + )) + ); load_ema.current_load_ema.store(10000, Ordering::Relaxed); // ema_load = 10K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 10K) * 15 / 10K = 187.5 // Loss of precision occurs here because max streams is computed for 50ms window and then doubled. assert!( - (186u64..=188).contains(&load_ema.available_load_capacity_in_throttling_duration( + (186u64..=188).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) )) ); // ema_load = 10K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 10K) * 1K / 10K = 12500 - assert!((12499u64..=12500).contains( - &load_ema.available_load_capacity_in_throttling_duration( + assert!( + (12499u64..=12500).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), - 10000 - ) - )); + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) + )) + ); // At 4000, the load is less than 25% of max_load (25K). // Test that we cap it to 25%, yielding the same result as if load was 25K/4. load_ema.current_load_ema.store(4000, Ordering::Relaxed); // function = ((20K * 20K) / 25% of 25K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 300 ); // function = ((25K * 25K) / 25% of 25K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 20000 ); @@ -476,9 +540,10 @@ pub mod test { // At 1/400000 stake weight, and minimum load, it should still allow // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1), - 400000 + 400000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), load_ema .max_unstaked_load_in_throttling_window