Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped";
pub(crate) const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2;
pub(crate) const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed";

pub(crate) const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3;
pub(crate) const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] =
b"exceed_max_stream_count";

const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

Expand Down Expand Up @@ -408,6 +412,7 @@ pub fn get_connection_stake(
#[derive(Debug)]
pub(crate) enum ConnectionHandlerError {
ConnectionAddError,
MaxStreamError,
}

pub(crate) fn update_open_connections_stat<S: OpaqueStreamerCounter>(
Expand Down
132 changes: 59 additions & 73 deletions streamer/src/nonblocking/swqos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use {
get_connection_stake, update_open_connections_stat, ClientConnectionTracker,
ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey,
ConnectionTableType, CONNECTION_CLOSE_CODE_DISALLOWED,
CONNECTION_CLOSE_REASON_DISALLOWED,
CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT, CONNECTION_CLOSE_REASON_DISALLOWED,
CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
},
stream_throttle::{
throttle_stream, ConnectionStreamCounter, StakedStreamLoadEMA,
Expand Down Expand Up @@ -47,13 +48,6 @@ pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128;
pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512;

pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000;
/// Below this RTT, we apply the legacy logic (no BDP scaling)
/// Above this RTT, we increase the RX window and number of streams
/// as RTT increases to preserve reasonable bandwidth.
const REFERENCE_RTT_MS: u64 = 50;

/// Above this RTT we stop scaling for BDP
const MAX_RTT_MS: u64 = 350;

#[derive(Clone)]
pub struct SwQosConfig {
Expand Down Expand Up @@ -146,12 +140,8 @@ impl SwQos {
}
}

fn compute_max_allowed_uni_streams(
rtt_millis: u64,
peer_type: ConnectionPeerType,
total_stake: u64,
) -> u32 {
let streams = match peer_type {
fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
Expand All @@ -174,10 +164,7 @@ fn compute_max_allowed_uni_streams(
}
}
ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
};
let streams =
streams as u64 * rtt_millis.clamp(REFERENCE_RTT_MS, MAX_RTT_MS) / REFERENCE_RTT_MS;
streams.min(u32::MAX as u64) as u32
}
}

impl SwQos {
Expand All @@ -195,51 +182,58 @@ impl SwQos {
),
ConnectionHandlerError,
> {
// get current RTT and limit it to MAX_RTT_MS
let rtt_millis = connection.rtt().as_millis() as u64;
let max_uni_streams = VarInt::from_u32(compute_max_allowed_uni_streams(
rtt_millis,
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
conn_context.peer_type(),
conn_context.total_stake,
));

let remote_addr = connection.remote_address();

debug!(
"Peer type {:?}, total stake {}, max streams {} from peer {}",
conn_context.peer_type(),
conn_context.total_stake,
max_uni_streams.into_inner(),
remote_addr,
);

let max_connections_per_peer = match conn_context.peer_type() {
ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer,
ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer,
};
if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
remote_addr.port(),
client_connection_tracker,
Some(connection.clone()),
conn_context.peer_type(),
conn_context.last_update.clone(),
max_connections_per_peer,
|| Arc::new(ConnectionStreamCounter::new()),
)
) as u64)
{
update_open_connections_stat(&self.stats, &connection_table_l);
drop(connection_table_l);
let remote_addr = connection.remote_address();

connection.set_max_concurrent_uni_streams(max_uni_streams);
debug!(
"Peer type {:?}, total stake {}, max streams {} from peer {}",
conn_context.peer_type(),
conn_context.total_stake,
max_uni_streams.into_inner(),
remote_addr,
);

Ok((last_update, cancel_connection, stream_counter))
let max_connections_per_peer = match conn_context.peer_type() {
ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer,
ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer,
};
if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l
.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), conn_context.remote_pubkey),
remote_addr.port(),
client_connection_tracker,
Some(connection.clone()),
conn_context.peer_type(),
conn_context.last_update.clone(),
max_connections_per_peer,
|| Arc::new(ConnectionStreamCounter::new()),
)
{
update_open_connections_stat(&self.stats, &connection_table_l);
drop(connection_table_l);

connection.set_max_concurrent_uni_streams(max_uni_streams);

Ok((last_update, cancel_connection, stream_counter))
} else {
self.stats
.connection_add_failed
.fetch_add(1, Ordering::Relaxed);
Err(ConnectionHandlerError::ConnectionAddError)
}
} else {
connection.close(
CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT.into(),
CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT,
);
self.stats
.connection_add_failed
.connection_add_failed_invalid_stream_count
.fetch_add(1, Ordering::Relaxed);
Err(ConnectionHandlerError::ConnectionAddError)
Err(ConnectionHandlerError::MaxStreamError)
}
}

Expand Down Expand Up @@ -534,35 +528,27 @@ pub mod test {
#[test]
fn test_max_allowed_uni_streams() {
assert_eq!(
compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Staked(10), 0),
QUIC_MIN_STAKED_CONCURRENT_STREAMS as u32
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
let delta =
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
assert_eq!(
compute_max_allowed_uni_streams(
REFERENCE_RTT_MS,
ConnectionPeerType::Staked(1000),
10000
),
QUIC_MAX_STAKED_CONCURRENT_STREAMS as u32,
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000),
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
);
assert_eq!(
compute_max_allowed_uni_streams(
REFERENCE_RTT_MS,
ConnectionPeerType::Staked(100),
10000
),
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000),
((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.min(QUIC_MAX_STAKED_CONCURRENT_STREAMS) as u32
.min(QUIC_MAX_STAKED_CONCURRENT_STREAMS)
);
assert_eq!(
compute_max_allowed_uni_streams(REFERENCE_RTT_MS, ConnectionPeerType::Unstaked, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u32
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}
}
Loading