From f6d1d08f714b75b3fe34694b9e1a8e982fcb35a6 Mon Sep 17 00:00:00 2001 From: Dmitry Adamushka Date: Wed, 10 Dec 2025 17:13:57 +0400 Subject: [PATCH 1/4] =?UTF-8?q?Don=E2=80=99t=20apply=20throttling=20unless?= =?UTF-8?q?=20a=20specific=20load=20threshold=20is=20reached.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The approach taken here is simply not to apply any throttling until a configurable load threshold has been reached. For the v3.1.x fix, it's probably safer to use a minimal change that does not remove (yet) the curernt throttling entirely. For v4.0.0, we can replace it with a better mechanism. Simulations with the current EMA load mechanism (stream_throttle.rs) show that staked connections with very low stake (e.g., ~0.01% of total stake) receive streams-per-100ms quotas that are similar to unstaked connections even in no-load scenarios. Example: step load_in_5ms ema quota_0.01% quota_0.1% quota_1% 0 0 0 21 160 1600 1 3000 544 21 160 1600 2 1000 626 21 160 1600 Data collected on mds1 (over a few leader slots) also showed these low-stake connections being throttled: [2025-12-04T22:56:59.929547468Z ERROR solana_streamer::nonblocking::stream_throttle] Throttling tpu stream from 3.66.188.50:8016, peer type: Staked(30314578869242), current_load: 11, total_stake: 415746706271632896, max_streams_per_interval: 28, read_interval_streams: 28 throttle_duration: 99.948899ms In all observed cases, the effective load was basically 0 (3–25 streams per 5ms) while affected connections had quotas of 28–64 streams per 100ms, and stakes of ~0.007–0.016% of total stake. With the change in this PR, mds1 has been running for multiple days without any staked connections being throttled. --- streamer/src/nonblocking/stream_throttle.rs | 48 ++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 7329e426b96..9a4aeaa74d8 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -27,6 +27,12 @@ const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10; const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; +const STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 50; + +// Unstaked nodes must contribute to the EMA load for this threshold to be meaningful. +// See increment_load(). +const UNSTAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 0; + pub(crate) struct StakedStreamLoadEMA { current_load_ema: AtomicU64, load_in_recent_interval: AtomicU64, @@ -40,6 +46,11 @@ pub(crate) struct StakedStreamLoadEMA { // Maximum number of streams for an unstaked connection in stream throttling window max_unstaked_load_in_throttling_window: u64, max_streams_per_ms: u64, + + // No throttling for staked connections below this load. + staked_stream_throttling_load_threshold: u64, + // No throttling for unstaked connections below this load. + unstaked_stream_throttling_load_threshold: u64, } impl StakedStreamLoadEMA { @@ -63,6 +74,14 @@ impl StakedStreamLoadEMA { 0 }; + let staked_stream_throttling_load_threshold = + Percentage::from(STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT) + .apply_to(max_staked_load_in_ema_window); + + let unstaked_stream_throttling_load_threshold = + Percentage::from(UNSTAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT) + .apply_to(max_staked_load_in_ema_window); + Self { current_load_ema: AtomicU64::default(), load_in_recent_interval: AtomicU64::default(), @@ -71,6 +90,8 @@ impl StakedStreamLoadEMA { max_staked_load_in_ema_window, max_unstaked_load_in_throttling_window, max_streams_per_ms, + staked_stream_throttling_load_threshold, + unstaked_stream_throttling_load_threshold, } } @@ -148,13 +169,38 @@ impl StakedStreamLoadEMA { &self, peer_type: ConnectionPeerType, total_stake: u64, + ) -> u64 { + let current_load = self.current_load_ema.load(Ordering::Relaxed); + match peer_type { + ConnectionPeerType::Unstaked => { + if current_load < self.unstaked_stream_throttling_load_threshold { + MAX_UNSTAKED_TPS + } else { + self.available_throttled_load_capacity(peer_type, total_stake, current_load) + } + } + ConnectionPeerType::Staked(_) => { + if current_load < self.staked_stream_throttling_load_threshold { + self.max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS + } else { + self.available_throttled_load_capacity(peer_type, total_stake, current_load) + } + } + } + } + + pub fn available_throttled_load_capacity( + &self, + peer_type: ConnectionPeerType, + total_stake: u64, + current_load: u64, ) -> u64 { match peer_type { ConnectionPeerType::Unstaked => self.max_unstaked_load_in_throttling_window, ConnectionPeerType::Staked(stake) => { // If the current load is low, cap it to 25% of max_load. let current_load = u128::from(cmp::max( - self.current_load_ema.load(Ordering::Relaxed), + current_load, self.max_staked_load_in_ema_window / 4, )); From e11747927ed2c0ed6cae6862afecb3c1fcc0b101 Mon Sep 17 00:00:00 2001 From: Dmitry Adamushka Date: Mon, 15 Dec 2025 13:19:50 +0100 Subject: [PATCH 2/4] Fixed unit tests --- streamer/src/nonblocking/stream_throttle.rs | 71 +++++++++++++-------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 9a4aeaa74d8..b522436e13e 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -342,9 +342,10 @@ pub mod test { )); // 50K packets per ms * 20% / 500 max unstaked connections assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Unstaked, 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 20 ); @@ -370,9 +371,10 @@ pub mod test { // ema_load = 20K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 15 / 10K = 60 assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 60 ); @@ -380,9 +382,10 @@ pub mod test { // ema_load = 20K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 1K / 10K = 4K assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 4000 ); @@ -391,9 +394,10 @@ pub mod test { // ema_load = 5K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 15 / 10K = 240 assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 240 ); @@ -401,9 +405,10 @@ pub mod test { // ema_load = 5K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 1K / 10K = 16000 assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 16000 ); @@ -413,18 +418,20 @@ pub mod test { load_ema.current_load_ema.store(4000, Ordering::Relaxed); // function = ((20K * 20K) / 25% of 20K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 240 ); // function = ((20K * 20K) / 25% of 20K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 16000 ); @@ -432,9 +439,10 @@ pub mod test { // At 1/40000 stake weight, and minimum load, it should still allow // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1), 40000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), load_ema .max_unstaked_load_in_throttling_window @@ -463,58 +471,64 @@ pub mod test { // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 20K) * 15 / 10K = 93.75 // Loss of precision occurs here because max streams is computed for 50ms window and then doubled. assert!( - (92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration( + (92u64..=94).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) )) ); // ema_load = 20K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 20K) * 1K / 10K = 6250 - assert!((6249u64..=6250).contains( - &load_ema.available_load_capacity_in_throttling_duration( + assert!( + (6249u64..=6250).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), - 10000 - ) - )); + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) + )) + ); load_ema.current_load_ema.store(10000, Ordering::Relaxed); // ema_load = 10K, stake = 15, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 10K) * 15 / 10K = 187.5 // Loss of precision occurs here because max streams is computed for 50ms window and then doubled. assert!( - (186u64..=188).contains(&load_ema.available_load_capacity_in_throttling_duration( + (186u64..=188).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) )) ); // ema_load = 10K, stake = 1K, total_stake = 10K // max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 10K) * 1K / 10K = 12500 - assert!((12499u64..=12500).contains( - &load_ema.available_load_capacity_in_throttling_duration( + assert!( + (12499u64..=12500).contains(&load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), - 10000 - ) - )); + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) + )) + ); // At 4000, the load is less than 25% of max_load (25K). // Test that we cap it to 25%, yielding the same result as if load was 25K/4. load_ema.current_load_ema.store(4000, Ordering::Relaxed); // function = ((20K * 20K) / 25% of 25K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(15), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 300 ); // function = ((25K * 25K) / 25% of 25K) * stake / total_stake assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1000), - 10000 + 10000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), 20000 ); @@ -522,9 +536,10 @@ pub mod test { // At 1/400000 stake weight, and minimum load, it should still allow // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( + load_ema.available_throttled_load_capacity( ConnectionPeerType::Staked(1), - 400000 + 400000, + load_ema.current_load_ema.load(Ordering::Relaxed) ), load_ema .max_unstaked_load_in_throttling_window From 0b7b967f8bd1d91ba44911133b24f0a0e89c7bf8 Mon Sep 17 00:00:00 2001 From: Dmitry Adamushka Date: Tue, 16 Dec 2025 09:27:02 +0100 Subject: [PATCH 3/4] current_load needs to be converted from streams/5ms to streams/50ms (ema-window) to be properly used in available_throttled_load_capacity(). The old implementation was giving quotas incorrectly (larger than expected). Set the no-throttling threshold to 90%. --- streamer/src/nonblocking/stream_throttle.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index b522436e13e..a7dc5c044b6 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -27,7 +27,7 @@ const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10; const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; -const STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 50; +const STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 90; // Unstaked nodes must contribute to the EMA load for this threshold to be meaningful. // See increment_load(). @@ -170,7 +170,11 @@ impl StakedStreamLoadEMA { peer_type: ConnectionPeerType, total_stake: u64, ) -> u64 { - let current_load = self.current_load_ema.load(Ordering::Relaxed); + let current_load = self + .current_load_ema + .load(Ordering::Relaxed) + // translating from streams/STREAM_LOAD_EMA_INTERVAL_MS to streams/EMA_WINDOW_MS + .saturating_mul(STREAM_LOAD_EMA_INTERVAL_COUNT); match peer_type { ConnectionPeerType::Unstaked => { if current_load < self.unstaked_stream_throttling_load_threshold { From 1e74e13f67aa1fa0d8cdb734ea323c67a28bc46f Mon Sep 17 00:00:00 2001 From: Dmitry Adamushka Date: Tue, 16 Dec 2025 09:37:32 +0100 Subject: [PATCH 4/4] Use max_staked_load_in_ema_window as quota when throttling isn't applied. --- streamer/src/nonblocking/stream_throttle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index a7dc5c044b6..253eb07f277 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -185,7 +185,7 @@ impl StakedStreamLoadEMA { } ConnectionPeerType::Staked(_) => { if current_load < self.staked_stream_throttling_load_threshold { - self.max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS + self.max_staked_load_in_ema_window } else { self.available_throttled_load_capacity(peer_type, total_stake, current_load) }