Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ const MAX_CONNECTION_BURST: u64 = 1000;
/// peer, and is canceled when we get a Handshake packet from them.
const QUIC_CONNECTION_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(2);

/// Absolute max RTT to allow for a legitimate connection.
/// Enough to cover any non-malicious link on Earth.
pub(crate) const MAX_RTT: Duration = Duration::from_millis(320);
/// Prevent connections from having 0 RTT when RTT is too small,
/// as this would break some BDP calculations and assign zero bandwidth
pub(crate) const MIN_RTT: Duration = Duration::from_millis(2);

// A struct to accumulate the bytes making up
// a packet, along with their offsets, and the
// packet metadata. We use this accumulator to avoid
Expand Down Expand Up @@ -391,16 +398,14 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option<Pubkey> {
pub fn get_connection_stake(
connection: &Connection,
staked_nodes: &RwLock<StakedNodes>,
) -> Option<(Pubkey, u64, u64, u64, u64)> {
) -> Option<(Pubkey, u64, u64)> {
let pubkey = get_remote_pubkey(connection)?;
debug!("Peer public key is {pubkey:?}");
let staked_nodes = staked_nodes.read().unwrap();
Some((
pubkey,
staked_nodes.get_node_stake(&pubkey)?,
staked_nodes.total_stake(),
staked_nodes.max_stake(),
staked_nodes.min_stake(),
))
}

Expand Down
22 changes: 18 additions & 4 deletions streamer/src/nonblocking/simple_qos.rs
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For SimpleQos we either set max_streams to some constant, or compute based on RTT. Computing based on RTT seems less brittle.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
quic::{
get_connection_stake, update_open_connections_stat, ClientConnectionTracker,
ConnectionHandlerError, ConnectionPeerType, ConnectionTable, ConnectionTableKey,
ConnectionTableType,
ConnectionTableType, MAX_RTT, MIN_RTT,
},
},
quic::{
Expand All @@ -14,7 +14,7 @@ use {
},
streamer::StakedNodes,
},
quinn::Connection,
quinn::{Connection, VarInt},
solana_net_utils::token_bucket::TokenBucket,
solana_time_utils as timing,
std::{
Expand All @@ -32,6 +32,10 @@ use {
tokio_util::sync::CancellationToken,
};

/// Allow for extra streams "in flight" on top of the nominal
/// send rate in case of bursty traffic from the sender side.
const STREAMS_IN_FLIGHT_MARGIN: u32 = 2;

#[derive(Clone)]
pub struct SimpleQosConfig {
pub max_streams_per_second: u64,
Expand Down Expand Up @@ -85,8 +89,18 @@ impl SimpleQos {
) -> Result<(Arc<AtomicU64>, CancellationToken, Arc<TokenBucket>), ConnectionHandlerError> {
let remote_addr = connection.remote_address();

// this will never overflow u32 for reasonable MAX_RTT
let rtt = connection.rtt().clamp(MIN_RTT, MAX_RTT).as_millis() as u32;
Comment thread
stablebits marked this conversation as resolved.
let max_streams_in_flight = (self.config.max_streams_per_second as u32).saturating_mul(rtt)
Comment thread
stablebits marked this conversation as resolved.
/ 1000
* STREAMS_IN_FLIGHT_MARGIN;
// for very low values of max_streams_per_second, prevent connections from having zero
// streams in flight
let max_streams_in_flight = max_streams_in_flight.max(STREAMS_IN_FLIGHT_MARGIN);
Comment thread
stablebits marked this conversation as resolved.
connection.set_max_concurrent_uni_streams(VarInt::from_u32(max_streams_in_flight));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the receive window remained to be set to PACKET_DATA_SIZE (default option in configure_server(), thus limiting votes TPS to 1 per RTT.

Copy link
Copy Markdown
Author

@alexpyattaev alexpyattaev Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the receive window is set for both QoS modules in the common code, see async fn handle_connection<Q, C>(...)

Copy link
Copy Markdown

@stablebits stablebits Nov 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I wanted to say the 1transaction/RTT problem exists in the current (before this PR) code.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does exist today indeed. And we're fixing it =)


debug!(
"Peer type {:?}, from peer {}",
"Peer type {:?}, from peer {}, max_streams {max_streams_in_flight}",
conn_context.peer_type(),
remote_addr,
);
Expand Down Expand Up @@ -145,7 +159,7 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
let (peer_type, remote_pubkey, _total_stake) =
get_connection_stake(connection, &self.staked_nodes).map_or(
(ConnectionPeerType::Unstaked, None, 0),
|(pubkey, stake, total_stake, _max_stake, _min_stake)| {
|(pubkey, stake, total_stake)| {
(ConnectionPeerType::Staked(stake), Some(pubkey), total_stake)
},
);
Expand Down
105 changes: 5 additions & 100 deletions streamer/src/nonblocking/swqos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,10 @@ use {
streamer::StakedNodes,
},
percentage::Percentage,
quinn::{Connection, VarInt, VarIntBoundsExceeded},
solana_packet::PACKET_DATA_SIZE,
quinn::{Connection, VarInt},
solana_quic_definitions::{
QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
},
solana_time_utils as timing,
std::{
Expand Down Expand Up @@ -87,8 +84,6 @@ pub struct SwQos {
#[derive(Clone)]
pub struct SwQosConnectionContext {
peer_type: ConnectionPeerType,
max_stake: u64,
min_stake: u64,
remote_pubkey: Option<solana_pubkey::Pubkey>,
total_stake: u64,
in_staked_table: bool,
Expand Down Expand Up @@ -135,49 +130,6 @@ impl SwQos {
}
}

/// Calculate the ratio for per connection receive window from a staked peer
fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 {
// Testing shows the maximum throughput from a connection is achieved at receive_window =
// PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the
// stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to
// QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r'
// for stake 's' is,
// r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find
// a and b.

if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}

let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
}
}

fn compute_recieve_window(
max_stake: u64,
min_stake: u64,
peer_type: ConnectionPeerType,
) -> Result<VarInt, VarIntBoundsExceeded> {
match peer_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO)
}
ConnectionPeerType::Staked(peer_stake) => {
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio)
}
}
}

fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
Expand Down Expand Up @@ -226,18 +178,12 @@ impl SwQos {
) as u64)
{
let remote_addr = connection.remote_address();
let receive_window = compute_recieve_window(
conn_context.max_stake,
conn_context.min_stake,
conn_context.peer_type(),
);

debug!(
"Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}",
"Peer type {:?}, total stake {}, max streams {} from peer {}",
conn_context.peer_type(),
conn_context.total_stake,
max_uni_streams.into_inner(),
receive_window,
remote_addr,
);

Expand All @@ -260,9 +206,6 @@ impl SwQos {
update_open_connections_stat(&self.stats, &connection_table_l);
drop(connection_table_l);

if let Ok(receive_window) = receive_window {
connection.set_receive_window(receive_window);
}
connection.set_max_concurrent_uni_streams(max_uni_streams);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not apply RTT-based scaling like we introduce in simple_qos? In this case, we should also ensure that senders don’t receive less than before -- at least until we have auto-tuning, and maybe even afterward unless these defaults turn out to be inadequate.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a separate PR. One small change at a time.


Ok((last_update, cancel_connection, stream_counter))
Expand Down Expand Up @@ -350,16 +293,14 @@ impl QosController<SwQosConnectionContext> for SwQos {
get_connection_stake(connection, &self.staked_nodes).map_or(
SwQosConnectionContext {
peer_type: ConnectionPeerType::Unstaked,
max_stake: 0,
min_stake: 0,
total_stake: 0,
remote_pubkey: None,
in_staked_table: false,
remote_address: connection.remote_address(),
stream_counter: None,
last_update: Arc::new(AtomicU64::new(timing::timestamp())),
},
|(pubkey, stake, total_stake, max_stake, min_stake)| {
|(pubkey, stake, total_stake)| {
// The heuristic is that the stake should be large enough to have 1 stream pass through within one throttle
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.

Expand All @@ -378,8 +319,6 @@ impl QosController<SwQosConnectionContext> for SwQos {

SwQosConnectionContext {
peer_type,
max_stake,
min_stake,
total_stake,
remote_pubkey: Some(pubkey),
in_staked_table: false,
Expand Down Expand Up @@ -576,40 +515,6 @@ impl QosController<SwQosConnectionContext> for SwQos {
pub mod test {
use super::*;

#[test]
fn test_cacluate_receive_window_ratio_for_staked_node() {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testing removed code

let mut max_stake = 10000;
let mut min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);

let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
assert_eq!(ratio, max_ratio);

let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2);
let average_ratio =
(QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2;
assert_eq!(ratio, average_ratio);

max_stake = 10000;
min_stake = 10000;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);

max_stake = 0;
min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);

max_stake = 1000;
min_stake = 10;
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
}

#[test]
fn test_max_allowed_uni_streams() {
assert_eq!(
Expand Down
28 changes: 18 additions & 10 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ use {
pem::Pem,
quinn::{
crypto::rustls::{NoInitialCipherSuite, QuicServerConfig},
Endpoint, IdleTimeout, ServerConfig,
Endpoint, IdleTimeout, ServerConfig, VarInt,
},
rustls::KeyLogFile,
solana_keypair::Keypair,
solana_packet::PACKET_DATA_SIZE,
solana_perf::packet::PacketBatch,
solana_quic_definitions::{
NotifyKeyUpdate, QUIC_MAX_TIMEOUT, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
},
solana_quic_definitions::{NotifyKeyUpdate, QUIC_MAX_TIMEOUT},
solana_tls_utils::{new_dummy_x509_certificate, tls_server_config_builder},
std::{
net::UdpSocket,
Expand Down Expand Up @@ -57,6 +55,14 @@ pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8;
// This will be adjusted and parameterized in follow-on PRs.
pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;

/// Allow for 8 MB QUIC connection receive window (MAX_DATA). This is sufficient to
/// support 200 Mbps upload rate at 320 ms RTT. It is unreasonable to expect a single
/// connection to require more bandwidth. This prevents MAX_DATA from affecting
/// the bitrate achieved by a single connection. Actual throttling is achieved based
/// on the number of concurrent streams. This does not affect the memory allocation
/// in Quinn, that is driven primarily by MAX_STREAMS, not MAX_DATA.
const CONNECTION_RECEIVE_WINDOW_BYTES: VarInt = VarInt::from_u32(8 * 1024 * 1024);

pub fn default_num_tpu_transaction_forward_receive_threads() -> usize {
num_cpus::get().min(16)
}
Expand Down Expand Up @@ -100,18 +106,20 @@ pub(crate) fn configure_server(

let config = Arc::get_mut(&mut server_config.transport).unwrap();

// QUIC_MAX_CONCURRENT_STREAMS doubled, which was found to improve reliability
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was doing nothing since we override these settings after handshake anyway.

const MAX_CONCURRENT_UNI_STREAMS: u32 =
(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS.saturating_mul(2)) as u32;
config.max_concurrent_uni_streams(MAX_CONCURRENT_UNI_STREAMS.into());
// Set STREAM_MAX_DATA to fit at most 1 transaction.
// This should match the maximal TX size.
config.stream_receive_window((PACKET_DATA_SIZE as u32).into());
// set the receive window really small initially to prevent the fresh connections
// from slamming us with traffic.
config.receive_window((PACKET_DATA_SIZE as u32).into());
// disable uni_streams until handshake is complete
config.max_concurrent_uni_streams(0u32.into());
config.receive_window(CONNECTION_RECEIVE_WINDOW_BYTES);
let timeout = IdleTimeout::try_from(QUIC_MAX_TIMEOUT).unwrap();
config.max_idle_timeout(Some(timeout));

// disable bidi & datagrams
const MAX_CONCURRENT_BIDI_STREAMS: u32 = 0;
config.max_concurrent_bidi_streams(MAX_CONCURRENT_BIDI_STREAMS.into());
config.max_concurrent_bidi_streams(0u32.into());
config.datagram_receive_buffer_size(None);

// Disable GSO. The server only accepts inbound unidirectional streams initiated by clients,
Expand Down
Loading
Loading