Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 71 additions & 95 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use {
indexmap::map::{Entry, IndexMap},
percentage::Percentage,
quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
smallvec::SmallVec,
solana_keypair::Keypair,
Expand All @@ -25,22 +24,18 @@ use {
solana_perf::packet::{BytesPacket, BytesPacketBatch, PacketBatch, PACKETS_PER_BATCH},
solana_pubkey::Pubkey,
solana_quic_definitions::{
QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
QUIC_MAX_STAKED_CONCURRENT_STREAMS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_CONCURRENT_STREAMS, QUIC_TOTAL_STAKED_CONCURRENT_STREAMS,
},
solana_signature::Signature,
solana_time_utils as timing,
solana_tls_utils::get_pubkey_from_tls_certificate,
solana_transaction_metrics_tracker::signature_if_should_track_packet,
std::{
array,
fmt,
array, fmt,
iter::repeat_with,
net::{IpAddr, SocketAddr, UdpSocket},
pin::Pin,
// CAUTION: be careful not to introduce any awaits while holding an RwLock.
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -85,6 +80,16 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
const CONNECTION_CLOSE_CODE_INVALID_STREAM: u32 = 5;
const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream";

/// Target bitrate for an unstaked connection
const TARGET_UNSTAKED_KBPS: u64 = 5000;

/// Target bitrate for a staked connection with maximal possible
/// stake amount
const TARGET_MAX_STAKED_KBPS: u64 = TARGET_UNSTAKED_KBPS * 4;

/// Maximal allowed RTT for SWQOS calculations (to limit abuse)
const MAX_ALLOWED_RTT: Duration = Duration::from_millis(300);

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are just placeholder values to be tuned.

/// Total new connection counts per second. Heuristically taken from
/// the default staked and unstaked connection limits. Might be adjusted
/// later.
Expand Down Expand Up @@ -449,7 +454,7 @@ pub fn get_remote_pubkey(connection: &Connection) -> Option<Pubkey> {
fn get_connection_stake(
connection: &Connection,
staked_nodes: &RwLock<StakedNodes>,
) -> Option<(Pubkey, u64, u64, u64, u64)> {
) -> Option<(Pubkey, u64, u64, u64)> {
let pubkey = get_remote_pubkey(connection)?;
debug!("Peer public key is {pubkey:?}");
let staked_nodes = staked_nodes.read().unwrap();
Expand All @@ -458,7 +463,6 @@ fn get_connection_stake(
staked_nodes.get_node_stake(&pubkey)?,
staked_nodes.total_stake(),
staked_nodes.max_stake(),
staked_nodes.min_stake(),
))
}

Expand All @@ -469,7 +473,7 @@ fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u
if total_stake == 0 || peer_stake > total_stake {
warn!(
"Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \
{total_stake:?}"
{total_stake:?}"
);

QUIC_MIN_STAKED_CONCURRENT_STREAMS
Expand Down Expand Up @@ -503,7 +507,6 @@ struct NewConnectionHandlerParams {
max_connections_per_peer: usize,
stats: Arc<StreamerStats>,
max_stake: u64,
min_stake: u64,
}

impl NewConnectionHandlerParams {
Expand All @@ -520,7 +523,6 @@ impl NewConnectionHandlerParams {
max_connections_per_peer,
stats,
max_stake: 0,
min_stake: 0,
}
}
}
Expand All @@ -540,15 +542,12 @@ fn handle_and_cache_new_connection(
) as u64)
{
let remote_addr = connection.remote_address();
let receive_window =
compute_recieve_window(params.max_stake, params.min_stake, params.peer_type);

debug!(
"Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}",
"Peer type {:?}, total stake {}, max streams {} from peer {}",
params.peer_type,
params.total_stake,
max_uni_streams.into_inner(),
receive_window,
remote_addr,
);

Expand All @@ -565,9 +564,6 @@ fn handle_and_cache_new_connection(
{
drop(connection_table_l);

if let Ok(receive_window) = receive_window {
connection.set_receive_window(receive_window);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we now set it a bit later

}
connection.set_max_concurrent_uni_streams(max_uni_streams);

tokio::spawn(handle_connection(
Expand Down Expand Up @@ -634,47 +630,30 @@ async fn prune_unstaked_connections_and_add_new_connection(
}
}

/// Calculate the ratio for per connection receive window from a staked peer
fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 {
// Testing shows the maximum througput from a connection is achieved at receive_window =
// PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the
// stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to
// QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r'
// for stake 's' is,
// r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find
// a and b.

if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}
/// Calculate the intended bandwidth allocation for a given peer in kbps
fn compute_max_receive_rate_kbps(max_stake: u64, peer: ConnectionPeerType) -> u64 {
let stake = match peer {
ConnectionPeerType::Unstaked => 0,
ConnectionPeerType::Staked(peer_stake) => peer_stake,
};

let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
if stake >= max_stake {
return TARGET_MAX_STAKED_KBPS;
}

let max_rate = TARGET_MAX_STAKED_KBPS;
let min_rate = TARGET_UNSTAKED_KBPS;
// Linear interpolation between min and max as
// stake approaches max_stake
min_rate + (stake * (max_rate - min_rate) + max_stake / 2) / max_stake
}

fn compute_recieve_window(
max_stake: u64,
min_stake: u64,
peer_type: ConnectionPeerType,
) -> Result<VarInt, VarIntBoundsExceeded> {
match peer_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO)
}
ConnectionPeerType::Staked(peer_stake) => {
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio)
}
}
/// Compute the RX window based on bandwidth-delay-product
fn compute_receive_window_bdp(max_receive_rate_kbps: u64, rtt: Duration) -> VarInt {
// max(1) is needed on localhost to avoid zero result
let millis = rtt.as_millis().max(1).min(MAX_ALLOWED_RTT.as_millis()) as u64;
let receive_window = (max_receive_rate_kbps * millis) / 8;
VarInt::from_u64(receive_window).unwrap_or(VarInt::MAX)
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -739,7 +718,7 @@ async fn setup_connection(
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
|(pubkey, stake, total_stake, max_stake)| {
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
let min_stake_ratio =
Expand All @@ -759,7 +738,6 @@ async fn setup_connection(
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
}
},
);
Expand Down Expand Up @@ -1059,15 +1037,18 @@ async fn handle_connection(
..
} = params;

let max_receive_rate_kbps = compute_max_receive_rate_kbps(params.max_stake, params.peer_type);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we compute the RX window immediately after connection is confirmed

let initial_rx_window = compute_receive_window_bdp(max_receive_rate_kbps, connection.rtt());
connection.set_receive_window(initial_rx_window);
debug!(
"quic new connection {} streams: {} connections: {}",
remote_addr,
stats.total_streams.load(Ordering::Relaxed),
stats.total_connections.load(Ordering::Relaxed),
"quic new connection {remote_addr}, max_receive_rate {max_receive_rate_kbps} Kbps (receive_window= {initial_rx_window} RTT={rtt}ms), streams: {streams} connections: {connections}",
rtt = connection.rtt().as_millis(),
streams=stats.total_streams.load(Ordering::Relaxed),
connections=stats.total_connections.load(Ordering::Relaxed),
);
stats.total_connections.fetch_add(1, Ordering::Relaxed);

'conn: loop {
'conn: for stream_number in 0.. {
// Wait for new streams. If the peer is disconnected we get a cancellation signal and stop
// the connection task.
let mut stream = select! {
Expand Down Expand Up @@ -1159,7 +1140,11 @@ async fn handle_connection(
}
// timeout elapsed
Err(_) => {
debug!("Timeout in receiving on stream");
debug!(
"Timeout in receiving on stream {} from {}",
stream.id(),
connection.remote_address()
);
stats
.total_stream_read_timeouts
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1199,6 +1184,12 @@ async fn handle_connection(

stats.total_streams.fetch_sub(1, Ordering::Relaxed);
stream_load_ema.update_ema_if_needed();
if (stream_number % 64) == 0 {
let new_window = compute_receive_window_bdp(max_receive_rate_kbps, connection.rtt());
trace!("Updating receive window for {remote_addr:?} to {new_window:?} based on rtt {:?} and target bitrate {} kbps",
connection.rtt(), max_receive_rate_kbps);
connection.set_receive_window(new_window);
}
}

let stable_id = connection.stable_id();
Expand Down Expand Up @@ -2354,36 +2345,21 @@ pub mod test {

#[test]
fn test_cacluate_receive_window_ratio_for_staked_node() {
let mut max_stake = 10000;
let mut min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);

let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
assert_eq!(ratio, max_ratio);

let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2);
let average_ratio =
(QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2;
assert_eq!(ratio, average_ratio);

max_stake = 10000;
min_stake = 10000;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we embrace min_stake == 0 we can eliminate this part of the test.

max_stake = 0;
min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);

max_stake = 1000;
min_stake = 10;
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
let max_stake = 10000;
let rate = compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Unstaked);
assert_eq!(rate, TARGET_UNSTAKED_KBPS);

let rate = compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake));
assert_eq!(rate, TARGET_MAX_STAKED_KBPS);

let rate =
compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake / 2));
let average_ratio = (TARGET_MAX_STAKED_KBPS + TARGET_UNSTAKED_KBPS) / 2;
assert_eq!(rate, average_ratio);

let rate =
compute_max_receive_rate_kbps(max_stake, ConnectionPeerType::Staked(max_stake + 10));
assert_eq!(rate, TARGET_MAX_STAKED_KBPS);
}

#[tokio::test(flavor = "multi_thread")]
Expand Down
22 changes: 6 additions & 16 deletions streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use {
},
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender, TrySendError},
histogram::Histogram,
itertools::Itertools,
solana_net_utils::multihomed_sockets::{
BindIpAddrs, CurrentSocket, FixedSocketProvider, MultihomedSocketProvider, SocketProvider,
},
Expand Down Expand Up @@ -81,7 +80,6 @@ pub struct StakedNodes {
overrides: HashMap<Pubkey, u64>,
total_stake: u64,
max_stake: u64,
min_stake: u64,
}

pub type PacketBatchReceiver = Receiver<PacketBatch>;
Expand Down Expand Up @@ -429,30 +427,29 @@ impl StreamerSendStats {
}

impl StakedNodes {
/// Calculate the stake stats: return the new (total_stake, min_stake and max_stake) tuple
/// Calculate the stake stats: return the new (total_stake and max_stake) tuple
fn calculate_stake_stats(
stakes: &Arc<HashMap<Pubkey, u64>>,
overrides: &HashMap<Pubkey, u64>,
) -> (u64, u64, u64) {
) -> (u64, u64) {
let values = stakes
.iter()
.filter(|(pubkey, _)| !overrides.contains_key(pubkey))
.map(|(_, &stake)| stake)
.chain(overrides.values().copied())
.filter(|&stake| stake > 0);
let total_stake = values.clone().sum();
let (min_stake, max_stake) = values.minmax().into_option().unwrap_or_default();
(total_stake, min_stake, max_stake)
let max_stake = values.max().unwrap_or_default();
(total_stake, max_stake)
}

pub fn new(stakes: Arc<HashMap<Pubkey, u64>>, overrides: HashMap<Pubkey, u64>) -> Self {
let (total_stake, min_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides);
let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &overrides);
Self {
stakes,
overrides,
total_stake,
max_stake,
min_stake,
}
}

Expand All @@ -469,23 +466,16 @@ impl StakedNodes {
self.total_stake
}

#[inline]
pub(super) fn min_stake(&self) -> u64 {
self.min_stake
}

#[inline]
pub(super) fn max_stake(&self) -> u64 {
self.max_stake
}

// Update the stake map given a new stakes map
pub fn update_stake_map(&mut self, stakes: Arc<HashMap<Pubkey, u64>>) {
let (total_stake, min_stake, max_stake) =
Self::calculate_stake_stats(&stakes, &self.overrides);
let (total_stake, max_stake) = Self::calculate_stake_stats(&stakes, &self.overrides);

self.total_stake = total_stake;
self.min_stake = min_stake;
self.max_stake = max_stake;
self.stakes = stakes;
}
Expand Down
Loading