Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async fn run_server(
debug!("spawn quic server");
let mut last_datapoint = Instant::now();
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
Arc::new(Mutex::new(ConnectionTable::new(false)));
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
stats.clone(),
quic_server_params.max_unstaked_connections,
Expand All @@ -282,7 +282,7 @@ async fn run_server(
.quic_endpoints_count
.store(endpoints.len(), Ordering::Relaxed);
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
Arc::new(Mutex::new(ConnectionTable::new(true)));
let (sender, receiver) = bounded(quic_server_params.coalesce_channel_size);

thread::spawn({
Expand Down Expand Up @@ -506,6 +506,18 @@ impl NewConnectionHandlerParams {
}
}

fn update_open_connections_stat(stats: &StreamerStats, connection_table: &ConnectionTable) {
if connection_table.is_staked() {
stats
.open_staked_connections
.store(connection_table.table_size(), Ordering::Relaxed);
} else {
stats
.open_unstaked_connections
.store(connection_table.table_size(), Ordering::Relaxed);
}
}

fn handle_and_cache_new_connection(
client_connection_tracker: ClientConnectionTracker,
connection: Connection,
Expand Down Expand Up @@ -543,6 +555,7 @@ fn handle_and_cache_new_connection(
params.max_connections_per_peer,
)
{
update_open_connections_stat(&params.stats, &connection_table_l);
drop(connection_table_l);

if let Ok(receive_window) = receive_window {
Expand Down Expand Up @@ -752,6 +765,7 @@ async fn setup_connection(
let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
update_open_connections_stat(&stats, &connection_table_l);
}

if connection_table_l.total_size < quic_server_params.max_staked_connections
Expand Down Expand Up @@ -1175,11 +1189,17 @@ async fn handle_connection(
}

let stable_id = connection.stable_id();
let removed_connection_count = connection_table.lock().await.remove_connection(
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
remote_addr.port(),
stable_id,
);
let removed_connection_count = {
let mut connection_table = connection_table.lock().await;
let removed_connection_count = connection_table.remove_connection(
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
remote_addr.port(),
stable_id,
);
update_open_connections_stat(&stats, &connection_table);
removed_connection_count
};

if removed_connection_count > 0 {
stats
.connection_removed
Expand Down Expand Up @@ -1372,18 +1392,28 @@ impl ConnectionTableKey {
struct ConnectionTable {
table: IndexMap<ConnectionTableKey, Vec<ConnectionEntry>>,
total_size: usize,
is_staked: bool,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the readability, I personally would prefer enum with Staked/Unstaked. More like an opinion, not really insisting on it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in a follow up PR

}

// Prune the connection which has the oldest update
// Return number pruned
impl ConnectionTable {
fn new() -> Self {
fn new(is_staked: bool) -> Self {
Self {
table: IndexMap::default(),
total_size: 0,
is_staked,
}
}

fn table_size(&self) -> usize {
self.total_size
}

fn is_staked(&self) -> bool {
self.is_staked
}

fn prune_oldest(&mut self, max_size: usize) -> usize {
let mut num_pruned = 0;
let key = |(_, connections): &(_, &Vec<_>)| {
Expand Down Expand Up @@ -2037,7 +2067,7 @@ pub mod test {
fn test_prune_table_with_ip() {
use std::net::Ipv4Addr;
solana_logger::setup();
let mut table = ConnectionTable::new();
let mut table = ConnectionTable::new(false);
let mut num_entries = 5;
let max_connections_per_peer = 10;
let sockets: Vec<_> = (0..num_entries)
Expand Down Expand Up @@ -2090,7 +2120,7 @@ pub mod test {
#[test]
fn test_prune_table_with_unique_pubkeys() {
solana_logger::setup();
let mut table = ConnectionTable::new();
let mut table = ConnectionTable::new(false);

// We should be able to add more entries than max_connections_per_peer, since each entry is
// from a different peer pubkey.
Expand Down Expand Up @@ -2128,7 +2158,7 @@ pub mod test {
#[test]
fn test_prune_table_with_non_unique_pubkeys() {
solana_logger::setup();
let mut table = ConnectionTable::new();
let mut table = ConnectionTable::new(false);

let max_connections_per_peer = 10;
let pubkey = Pubkey::new_unique();
Expand Down Expand Up @@ -2194,7 +2224,7 @@ pub mod test {
fn test_prune_table_random() {
use std::net::Ipv4Addr;
solana_logger::setup();
let mut table = ConnectionTable::new();
let mut table = ConnectionTable::new(true);
let num_entries = 5;
let max_connections_per_peer = 10;
let sockets: Vec<_> = (0..num_entries)
Expand Down Expand Up @@ -2236,7 +2266,7 @@ pub mod test {
fn test_remove_connections() {
use std::net::Ipv4Addr;
solana_logger::setup();
let mut table = ConnectionTable::new();
let mut table = ConnectionTable::new(false);
let num_ips = 5;
let max_connections_per_peer = 10;
let mut sockets: Vec<_> = (0..num_ips)
Expand Down
12 changes: 12 additions & 0 deletions streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ pub struct StreamerStats {
pub(crate) connection_rate_limiter_length: AtomicUsize,
// All connections in various states such as Incoming, Connecting, Connection
pub(crate) open_connections: AtomicUsize,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we axe open_connections ? it gets updated differently and can be very confusing. If someone ever needs the sum, they can trivially add the two in SQL in metrics.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do in a follow up PR

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexpyattaev, I reviewed the code again. I think there is a value of having open_connections which can track all open connections in various state -- even the connections which did not make into the connection table yet like those spawn into the async tasks before they were allowed into the connection table. This can help understand the worst case connection load in the system.

pub(crate) open_staked_connections: AtomicUsize,
pub(crate) open_unstaked_connections: AtomicUsize,
pub(crate) refused_connections_too_many_open_connections: AtomicUsize,
pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
pub(crate) total_incoming_connection_attempts: AtomicUsize,
Expand Down Expand Up @@ -566,6 +568,16 @@ impl StreamerStats {
self.open_connections.load(Ordering::Relaxed),
i64
),
(
"open_staked_connections",
self.open_staked_connections.load(Ordering::Relaxed),
i64
),
(
"open_unstaked_connections",
self.open_unstaked_connections.load(Ordering::Relaxed),
i64
),
(
"refused_connections_too_many_open_connections",
self.refused_connections_too_many_open_connections
Expand Down
Loading