diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index eedcc269be00d2..dab92349c92526 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -272,7 +272,7 @@ async fn run_server( debug!("spawn quic server"); let mut last_datapoint = Instant::now(); let unstaked_connection_table: Arc> = - Arc::new(Mutex::new(ConnectionTable::new())); + Arc::new(Mutex::new(ConnectionTable::new(false))); let stream_load_ema = Arc::new(StakedStreamLoadEMA::new( stats.clone(), quic_server_params.max_unstaked_connections, @@ -282,7 +282,7 @@ async fn run_server( .quic_endpoints_count .store(endpoints.len(), Ordering::Relaxed); let staked_connection_table: Arc> = - Arc::new(Mutex::new(ConnectionTable::new())); + Arc::new(Mutex::new(ConnectionTable::new(true))); let (sender, receiver) = bounded(quic_server_params.coalesce_channel_size); thread::spawn({ @@ -506,6 +506,18 @@ impl NewConnectionHandlerParams { } } +fn update_open_connections_stat(stats: &StreamerStats, connection_table: &ConnectionTable) { + if connection_table.is_staked() { + stats + .open_staked_connections + .store(connection_table.table_size(), Ordering::Relaxed); + } else { + stats + .open_unstaked_connections + .store(connection_table.table_size(), Ordering::Relaxed); + } +} + fn handle_and_cache_new_connection( client_connection_tracker: ClientConnectionTracker, connection: Connection, @@ -543,6 +555,7 @@ fn handle_and_cache_new_connection( params.max_connections_per_peer, ) { + update_open_connections_stat(¶ms.stats, &connection_table_l); drop(connection_table_l); if let Ok(receive_window) = receive_window { @@ -752,6 +765,7 @@ async fn setup_connection( let num_pruned = connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + update_open_connections_stat(&stats, &connection_table_l); } if connection_table_l.total_size < quic_server_params.max_staked_connections @@ -1175,11 +1189,17 @@ async fn handle_connection( } let stable_id = connection.stable_id(); - let removed_connection_count = connection_table.lock().await.remove_connection( - ConnectionTableKey::new(remote_addr.ip(), remote_pubkey), - remote_addr.port(), - stable_id, - ); + let removed_connection_count = { + let mut connection_table = connection_table.lock().await; + let removed_connection_count = connection_table.remove_connection( + ConnectionTableKey::new(remote_addr.ip(), remote_pubkey), + remote_addr.port(), + stable_id, + ); + update_open_connections_stat(&stats, &connection_table); + removed_connection_count + }; + if removed_connection_count > 0 { stats .connection_removed @@ -1372,18 +1392,28 @@ impl ConnectionTableKey { struct ConnectionTable { table: IndexMap>, total_size: usize, + is_staked: bool, } // Prune the connection which has the oldest update // Return number pruned impl ConnectionTable { - fn new() -> Self { + fn new(is_staked: bool) -> Self { Self { table: IndexMap::default(), total_size: 0, + is_staked, } } + fn table_size(&self) -> usize { + self.total_size + } + + fn is_staked(&self) -> bool { + self.is_staked + } + fn prune_oldest(&mut self, max_size: usize) -> usize { let mut num_pruned = 0; let key = |(_, connections): &(_, &Vec<_>)| { @@ -2037,7 +2067,7 @@ pub mod test { fn test_prune_table_with_ip() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(); + let mut table = ConnectionTable::new(false); let mut num_entries = 5; let max_connections_per_peer = 10; let sockets: Vec<_> = (0..num_entries) @@ -2090,7 +2120,7 @@ pub mod test { #[test] fn test_prune_table_with_unique_pubkeys() { solana_logger::setup(); - let mut table = ConnectionTable::new(); + let mut table = ConnectionTable::new(false); // We should be able to add more entries than max_connections_per_peer, since each entry is // from a different peer pubkey. @@ -2128,7 +2158,7 @@ pub mod test { #[test] fn test_prune_table_with_non_unique_pubkeys() { solana_logger::setup(); - let mut table = ConnectionTable::new(); + let mut table = ConnectionTable::new(false); let max_connections_per_peer = 10; let pubkey = Pubkey::new_unique(); @@ -2194,7 +2224,7 @@ pub mod test { fn test_prune_table_random() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(); + let mut table = ConnectionTable::new(true); let num_entries = 5; let max_connections_per_peer = 10; let sockets: Vec<_> = (0..num_entries) @@ -2236,7 +2266,7 @@ pub mod test { fn test_remove_connections() { use std::net::Ipv4Addr; solana_logger::setup(); - let mut table = ConnectionTable::new(); + let mut table = ConnectionTable::new(false); let num_ips = 5; let max_connections_per_peer = 10; let mut sockets: Vec<_> = (0..num_ips) diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 5703cb7dd66ac0..6bb74a268e1af6 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -210,6 +210,8 @@ pub struct StreamerStats { pub(crate) connection_rate_limiter_length: AtomicUsize, // All connections in various states such as Incoming, Connecting, Connection pub(crate) open_connections: AtomicUsize, + pub(crate) open_staked_connections: AtomicUsize, + pub(crate) open_unstaked_connections: AtomicUsize, pub(crate) refused_connections_too_many_open_connections: AtomicUsize, pub(crate) outstanding_incoming_connection_attempts: AtomicUsize, pub(crate) total_incoming_connection_attempts: AtomicUsize, @@ -566,6 +568,16 @@ impl StreamerStats { self.open_connections.load(Ordering::Relaxed), i64 ), + ( + "open_staked_connections", + self.open_staked_connections.load(Ordering::Relaxed), + i64 + ), + ( + "open_unstaked_connections", + self.open_unstaked_connections.load(Ordering::Relaxed), + i64 + ), ( "refused_connections_too_many_open_connections", self.refused_connections_too_many_open_connections