Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions streamer/src/nonblocking/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub(crate) trait QosController<C: ConnectionContext> {
context: &mut C,
) -> impl Future<Output = Option<CancellationToken>> + Send;

/// The reference TPS desired for a connection
/// on the QUIC level (affects RX buffer allocations)
fn reference_tps(&self, context: &C) -> u64;

/// Called when a new stream is received on a connection
fn on_new_stream(&self, context: &C) -> impl Future<Output = ()> + Send;

Expand Down
119 changes: 101 additions & 18 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
crossbeam_channel::{bounded, Receiver, Sender, TryRecvError, TrySendError},
futures::{stream::FuturesUnordered, Future, StreamExt as _},
indexmap::map::{Entry, IndexMap},
quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime},
quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
rand::{thread_rng, Rng},
smallvec::SmallVec,
solana_keypair::Keypair,
Expand All @@ -24,12 +24,10 @@ use {
solana_tls_utils::get_pubkey_from_tls_certificate,
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
array,
fmt,
array, fmt,
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
pin::Pin,
// CAUTION: be careful not to introduce any awaits while holding an RwLock.
sync::{
atomic::{AtomicU64, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -63,16 +61,55 @@ const CONNECTION_CLOSE_REASON_DROPPED_ENTRY: &[u8] = b"dropped";
pub(crate) const CONNECTION_CLOSE_CODE_DISALLOWED: u32 = 2;
pub(crate) const CONNECTION_CLOSE_REASON_DISALLOWED: &[u8] = b"disallowed";

pub(crate) const CONNECTION_CLOSE_CODE_EXCEED_MAX_STREAM_COUNT: u32 = 3;
pub(crate) const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] =
b"exceed_max_stream_count";

const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";

const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5;
const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream";

/// Maximal allowed RTT for SWQOS calculations.
///
/// The larger the RTT, the more RX buffer space we need to sustain the same
/// throughput. Since RTT can be spoofed, we do not want the clients to have
/// full control over it to avoid memory exhaustion. 400ms is more than any
/// reasonable RTT in the internet, and yet small enough to prevent memory
/// exhaustion attacks.
pub(crate) const MAX_ALLOWED_RTT_MS: u64 = 400;

/// If the client RTT is less than this, we will allocate bandwidth as if
/// the client actually had this RTT. This is to prevent accidentally
/// throttling connections when RTT is very low, as RTT measurements
/// are in integer milliseconds, and 1ms vs 2ms is a 50% differnce in the
/// bandwidth allocation.
///
/// TODO: Currently this is set way higher than necessary to avoid errors.
/// This mimics the legacy behavior where very low-latency connections
/// would get more bandwidth.
pub(crate) const MIN_ALLOWED_RTT_MS: u64 = 50;

/// Allow an absolute max of 2 MB RX window
///
/// This puts an upper bound on the total amount of bytes we may have to buffer
/// for a given client, irrespective of stake and RTT. This serves as an upper
/// bound on memory consumption.
pub(crate) const MAX_ALLOWED_RX_WINDOW: u32 = 2048 * 1024;

/// Expected mean size of a transaction for the purpose of
/// receive window <=> streams conversions;
/// essentially, num_rx_streams = rx_window/MEAN_TRANSACTION_SIZE.
/// Based on MNB statistics it should be ~600,
/// but we allow some margin to allow for smaller TXs.
pub(crate) const MEAN_TRANSACTION_SIZE: u32 = 400;

/// Maximal possible amount of streams to allocate per connection
///
/// Each connection gets a certain quota of QUIC streams allocated for it.
/// Each transaction "in flight" from client to server occupies a stream.
/// The total amount needed depends on desired bandwidth, transaction size
/// and RTT. This constant puts an upper bound on number of streams to
/// limit memory consumption.
const MAX_ALLOWED_UNI_STREAMS: u32 = MAX_ALLOWED_RX_WINDOW / MEAN_TRANSACTION_SIZE;

/// Total new connection counts per second. Heuristically taken from
/// the default staked and unstaked connection limits. Might be adjusted
/// later.
Expand Down Expand Up @@ -382,7 +419,7 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option<Pubkey> {
pub fn get_connection_stake(
connection: &Connection,
staked_nodes: &RwLock<StakedNodes>,
) -> Option<(Pubkey, u64, u64, u64, u64)> {
) -> Option<(Pubkey, u64, u64, u64)> {
let pubkey = get_remote_pubkey(connection)?;
debug!("Peer public key is {pubkey:?}");
let staked_nodes = staked_nodes.read().unwrap();
Expand All @@ -391,14 +428,12 @@ pub fn get_connection_stake(
staked_nodes.get_node_stake(&pubkey)?,
staked_nodes.total_stake(),
staked_nodes.max_stake(),
staked_nodes.min_stake(),
))
}

#[derive(Debug)]
pub(crate) enum ConnectionHandlerError {
ConnectionAddError,
MaxStreamError,
}

pub(crate) fn update_open_connections_stat(
Expand All @@ -416,6 +451,27 @@ pub(crate) fn update_open_connections_stat(
}
}

/// Compute the RX window based on bandwidth-delay-product
///
/// This will attempt to match reference TPS for high-latency connections,
/// and will not artifically slow down connections with latency < [`MIN_ALLOWED_RTT_MS`]
fn compute_receive_window_and_max_streams(reference_tps: u64, rtt: Duration) -> (VarInt, VarInt) {
// the desired transfer rate in bytes/second for RX window computation
let reference_transfer_rate = reference_tps * MEAN_TRANSACTION_SIZE as u64;
// truncate here is safe since u64 millis is an eternity
let rtt_milliseconds = (rtt.as_millis() as u64).clamp(MIN_ALLOWED_RTT_MS, MAX_ALLOWED_RTT_MS);
// Compute the receive window in bytes as transfer_rate * rtt,
let receive_window = reference_transfer_rate * rtt_milliseconds / 1000;
// hard constraint the RX window to avoid excess memory use
let receive_window = receive_window.min(MAX_ALLOWED_RX_WINDOW as u64) as u32;
// compute max_streams in flight
let max_streams = (receive_window / MEAN_TRANSACTION_SIZE).min(MAX_ALLOWED_UNI_STREAMS);
(
VarInt::from_u32(receive_window),
VarInt::from_u32(max_streams),
)
}

#[allow(clippy::too_many_arguments)]
async fn setup_connection<Q, C>(
connecting: Connecting,
Expand Down Expand Up @@ -704,15 +760,23 @@ async fn handle_connection<Q, C>(
{
let peer_type = context.peer_type();
let remote_addr = connection.remote_address();
connection.set_max_concurrent_bi_streams(VarInt::from_u32(0));
let max_target_tps_per_connection = qos.reference_tps(&context);
let (initial_rx_window, max_streams) =
compute_receive_window_and_max_streams(max_target_tps_per_connection, connection.rtt());
connection.set_receive_window(initial_rx_window);
connection.set_max_concurrent_uni_streams(max_streams);
debug!(
"quic new connection {} streams: {} connections: {}",
remote_addr,
stats.active_streams.load(Ordering::Relaxed),
stats.total_connections.load(Ordering::Relaxed),
"quic new connection {remote_addr}, max_receive_rate {max_target_tps_per_connection} Kbps \
(receive_window {initial_rx_window} max_streams {max_streams:?} RTT {rtt}ms), streams: \
{streams} connections: {connections}",
rtt = connection.rtt().as_millis(),
streams = stats.active_streams.load(Ordering::Relaxed),
connections = stats.total_connections.load(Ordering::Relaxed),
);
stats.total_connections.fetch_add(1, Ordering::Relaxed);

'conn: loop {
'conn: for stream_number in 0u64.. {
// Wait for new streams. If the peer is disconnected we get a cancellation signal and stop
// the connection task.
let mut stream = select! {
Expand Down Expand Up @@ -770,7 +834,11 @@ async fn handle_connection<Q, C>(
}
// timeout elapsed
Err(_) => {
debug!("Timeout in receiving on stream");
debug!(
"Timeout in receiving on stream {} from {}",
stream.id(),
connection.remote_address()
);
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -808,6 +876,21 @@ async fn handle_connection<Q, C>(

stats.active_streams.fetch_sub(1, Ordering::Relaxed);
qos.on_stream_closed(&context);
if (stream_number % 128) == 0 {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How was this number chosen? It feels like it's way too often.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on whether you are staked or not. For staked it may be a bit too often, for unstaked it is once per second (unless they have cheated on the RTT). Do you think this should be based on stake?

let (new_window, _) = compute_receive_window_and_max_streams(
max_target_tps_per_connection,
connection.rtt(),
);
trace!(
"Updating receive window for {remote_addr:?} to {new_window:?} based on rtt {:?} \
and target bitrate {} kbps",
connection.rtt(),
max_target_tps_per_connection
);
connection.set_receive_window(new_window);
// we do not update number of allowed streams here since
Comment thread
alexpyattaev marked this conversation as resolved.
// it may cause extra allocations.
}
}

let removed_connection_count = qos.remove_connection(&context, connection).await;
Expand Down Expand Up @@ -2042,7 +2125,7 @@ pub mod test {

let client_connection = make_client_endpoint(&server_address, None).await;

// unstaked connection can handle up to 100tps, so we should send in ~1s.
// unstaked connection can handle > 100tps, so we should send in <1s.
let expected_num_txs = 100;
let start_time = tokio::time::Instant::now();
for i in 0..expected_num_txs {
Expand Down
7 changes: 6 additions & 1 deletion streamer/src/nonblocking/simple_qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
let (peer_type, remote_pubkey, _total_stake) =
get_connection_stake(connection, &self.staked_nodes).map_or(
(ConnectionPeerType::Unstaked, None, 0),
|(pubkey, stake, total_stake, _max_stake, _min_stake)| {
|(pubkey, stake, total_stake, _max_stake)| {
(ConnectionPeerType::Staked(stake), Some(pubkey), total_stake)
},
);
Expand All @@ -160,6 +160,11 @@ impl QosController<SimpleQosConnectionContext> for SimpleQos {
}
}

fn reference_tps(&self, _context: &SimpleQosConnectionContext) -> u64 {
// allocate network bandwidth with 4x margin
self.max_streams_per_second * 4
}

#[allow(clippy::manual_async_fn)]
fn try_add_connection(
&self,
Expand Down
Loading