Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 94 additions & 29 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5;
const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10;
const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT;

const STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 90;

// Unstaked nodes must contribute to the EMA load for this threshold to be meaningful.
// See increment_load().
const UNSTAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT: u64 = 0;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not introduce a const if it does nothing. We want to keep diff minimal for backport.


pub(crate) struct StakedStreamLoadEMA {
current_load_ema: AtomicU64,
load_in_recent_interval: AtomicU64,
Expand All @@ -40,6 +46,11 @@ pub(crate) struct StakedStreamLoadEMA {
// Maximum number of streams for an unstaked connection in stream throttling window
max_unstaked_load_in_throttling_window: u64,
max_streams_per_ms: u64,

// No throttling for staked connections below this load.
staked_stream_throttling_load_threshold: u64,
// No throttling for unstaked connections below this load.
unstaked_stream_throttling_load_threshold: u64,
}

impl StakedStreamLoadEMA {
Expand All @@ -63,6 +74,14 @@ impl StakedStreamLoadEMA {
0
};

let staked_stream_throttling_load_threshold =
Percentage::from(STAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT)
.apply_to(max_staked_load_in_ema_window);

let unstaked_stream_throttling_load_threshold =
Percentage::from(UNSTAKED_STREAM_THROTTLING_LOAD_THRESHOLD_PERCENT)
.apply_to(max_staked_load_in_ema_window);

Self {
current_load_ema: AtomicU64::default(),
load_in_recent_interval: AtomicU64::default(),
Expand All @@ -71,6 +90,8 @@ impl StakedStreamLoadEMA {
max_staked_load_in_ema_window,
max_unstaked_load_in_throttling_window,
max_streams_per_ms,
staked_stream_throttling_load_threshold,
unstaked_stream_throttling_load_threshold,
}
}

Expand Down Expand Up @@ -148,13 +169,42 @@ impl StakedStreamLoadEMA {
&self,
peer_type: ConnectionPeerType,
total_stake: u64,
) -> u64 {
let current_load = self
.current_load_ema
.load(Ordering::Relaxed)
// translating from streams/STREAM_LOAD_EMA_INTERVAL_MS to streams/EMA_WINDOW_MS
.saturating_mul(STREAM_LOAD_EMA_INTERVAL_COUNT);
match peer_type {
ConnectionPeerType::Unstaked => {
if current_load < self.unstaked_stream_throttling_load_threshold {
MAX_UNSTAKED_TPS
} else {
self.available_throttled_load_capacity(peer_type, total_stake, current_load)
}
}
ConnectionPeerType::Staked(_) => {
if current_load < self.staked_stream_throttling_load_threshold {
self.max_staked_load_in_ema_window
} else {
self.available_throttled_load_capacity(peer_type, total_stake, current_load)
}
}
}
}

pub fn available_throttled_load_capacity(
&self,
peer_type: ConnectionPeerType,
total_stake: u64,
current_load: u64,
) -> u64 {
match peer_type {
ConnectionPeerType::Unstaked => self.max_unstaked_load_in_throttling_window,
ConnectionPeerType::Staked(stake) => {
// If the current load is low, cap it to 25% of max_load.
let current_load = u128::from(cmp::max(
self.current_load_ema.load(Ordering::Relaxed),
current_load,
self.max_staked_load_in_ema_window / 4,
));

Expand Down Expand Up @@ -296,9 +346,10 @@ pub mod test {
));
// 50K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Unstaked,
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
20
);
Expand All @@ -324,19 +375,21 @@ pub mod test {
// ema_load = 20K, stake = 15, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 15 / 10K = 60
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(15),
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
60
);

// ema_load = 20K, stake = 1K, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 20K) * 1K / 10K = 4K
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1000),
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
4000
);
Expand All @@ -345,19 +398,21 @@ pub mod test {
// ema_load = 5K, stake = 15, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 15 / 10K = 240
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(15),
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
240
);

// ema_load = 5K, stake = 1K, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((20K * 20K) / 5K) * 1K / 10K = 16000
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1000),
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
16000
);
Expand All @@ -367,28 +422,31 @@ pub mod test {
load_ema.current_load_ema.store(4000, Ordering::Relaxed);
// function = ((20K * 20K) / 25% of 20K) * stake / total_stake
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(15),
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
240
);

// function = ((20K * 20K) / 25% of 20K) * stake / total_stake
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1000),
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
16000
);

// At 1/40000 stake weight, and minimum load, it should still allow
// max_unstaked_load_in_throttling_window + 1 streams.
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1),
40000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
load_ema
.max_unstaked_load_in_throttling_window
Expand Down Expand Up @@ -417,68 +475,75 @@ pub mod test {
// max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 20K) * 15 / 10K = 93.75
// Loss of precision occurs here because max streams is computed for 50ms window and then doubled.
assert!(
(92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration(
(92u64..=94).contains(&load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(15),
10000
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
))
);

// ema_load = 20K, stake = 1K, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 20K) * 1K / 10K = 6250
assert!((6249u64..=6250).contains(
&load_ema.available_load_capacity_in_throttling_duration(
assert!(
(6249u64..=6250).contains(&load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1000),
10000
)
));
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
))
);

load_ema.current_load_ema.store(10000, Ordering::Relaxed);
// ema_load = 10K, stake = 15, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 10K) * 15 / 10K = 187.5
// Loss of precision occurs here because max streams is computed for 50ms window and then doubled.
assert!(
(186u64..=188).contains(&load_ema.available_load_capacity_in_throttling_duration(
(186u64..=188).contains(&load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(15),
10000
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
))
);

// ema_load = 10K, stake = 1K, total_stake = 10K
// max_streams in 100ms (throttling window) = 2 * ((25K * 25K) / 10K) * 1K / 10K = 12500
assert!((12499u64..=12500).contains(
&load_ema.available_load_capacity_in_throttling_duration(
assert!(
(12499u64..=12500).contains(&load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1000),
10000
)
));
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
))
);

// At 4000, the load is less than 25% of max_load (25K).
// Test that we cap it to 25%, yielding the same result as if load was 25K/4.
load_ema.current_load_ema.store(4000, Ordering::Relaxed);
// function = ((20K * 20K) / 25% of 25K) * stake / total_stake
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(15),
10000
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
300
);

// function = ((25K * 25K) / 25% of 25K) * stake / total_stake
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1000),
10000
10000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
20000
);

// At 1/400000 stake weight, and minimum load, it should still allow
// max_unstaked_load_in_throttling_window + 1 streams.
assert_eq!(
load_ema.available_load_capacity_in_throttling_duration(
load_ema.available_throttled_load_capacity(
ConnectionPeerType::Staked(1),
400000
400000,
load_ema.current_load_ema.load(Ordering::Relaxed)
),
load_ema
.max_unstaked_load_in_throttling_window
Expand Down
Loading