diff --git a/bench-vote/src/main.rs b/bench-vote/src/main.rs index 02c4568c945..90820c33e9f 100644 --- a/bench-vote/src/main.rs +++ b/bench-vote/src/main.rs @@ -267,12 +267,14 @@ fn main() -> Result<()> { max_connections_per_ipaddr_per_min: max_connections_per_ipaddr_per_min .try_into() .unwrap(), + ..Default::default() + }; + let qos_config = SwQosConfig { max_connections_per_unstaked_peer: max_connections_per_peer, max_staked_connections: max_connections, max_unstaked_connections: 0, ..Default::default() }; - let qos_config = SwQosConfig::default(); let (s_reader, r_reader) = unbounded(); read_channels.push(r_reader); diff --git a/core/src/validator.rs b/core/src/validator.rs index cd1d4d06573..e5e9865f8ad 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -589,17 +589,18 @@ impl ValidatorTpuConfig { let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig { quic_streamer_config: QuicStreamerConfig { max_connections_per_ipaddr_per_min: 32, + ..Default::default() + }, + qos_config: SwQosConfig { max_unstaked_connections: 0, ..Default::default() }, - qos_config: SwQosConfig::default(), }; // vote and tpu_fwd share the same characteristics -- disallow non-staked connections: let vote_quic_server_config = SimpleQosQuicStreamerConfig { quic_streamer_config: QuicStreamerConfig { max_connections_per_ipaddr_per_min: 32, - max_unstaked_connections: 0, ..Default::default() }, qos_config: SimpleQosConfig::default(), diff --git a/streamer/examples/swqos.rs b/streamer/examples/swqos.rs index 8dabcd51ed4..e617a471630 100644 --- a/streamer/examples/swqos.rs +++ b/streamer/examples/swqos.rs @@ -68,8 +68,10 @@ pub fn load_staked_nodes_overrides(path: &String) -> anyhow::Result anyhow::Result<()> { sender, staked_nodes, QuicStreamerConfig { - max_connections_per_unstaked_peer: cli.max_connections_per_peer, ..QuicStreamerConfig::default() }, - SwQosConfig::default(), + SwQosConfig { + max_connections_per_staked_peer: cli.max_connections_per_staked_peer, + max_connections_per_unstaked_peer: cli.max_connections_per_unstaked_peer, + ..Default::default() + }, cancel.clone(), )?; info!("Server listening on {}", socket.local_addr()?); diff --git a/streamer/src/nonblocking/qos.rs b/streamer/src/nonblocking/qos.rs index 35e501e355f..6e62b93d66a 100644 --- a/streamer/src/nonblocking/qos.rs +++ b/streamer/src/nonblocking/qos.rs @@ -51,4 +51,9 @@ pub(crate) trait QosController { context: &C, connection: Connection, ) -> impl Future + Send; + + /// How many concurrent + fn max_concurrent_connections(&self) -> usize; } + +pub trait QosConfig {} diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 63963f52492..16cc09855c5 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -157,7 +157,7 @@ where }) .collect::, _>>()?; - let max_concurrent_connections = quic_server_params.max_concurrent_connections(); + let max_concurrent_connections = qos.max_concurrent_connections(); let handle = tokio::spawn({ let endpoints = endpoints.clone(); let stats = stats.clone(); @@ -334,10 +334,9 @@ where continue; } - let Ok(client_connection_tracker) = ClientConnectionTracker::new( - stats.clone(), - quic_server_params.max_concurrent_connections(), - ) else { + let Ok(client_connection_tracker) = + ClientConnectionTracker::new(stats.clone(), qos.max_concurrent_connections()) + else { stats .refused_connections_too_many_open_connections .fetch_add(1, Ordering::Relaxed); @@ -1351,7 +1350,7 @@ pub mod test { } = setup_quic_server( None, QuicStreamerConfig::default_for_tests(), - SwQosConfig::default(), + SwQosConfig::default_for_tests(), ); check_block_multiple_connections(server_address).await; cancel.cancel(); @@ -1372,10 +1371,12 @@ pub mod test { } = setup_quic_server( None, QuicStreamerConfig { - max_connections_per_unstaked_peer: 2, ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_connections_per_unstaked_peer: 2, + ..SwQosConfig::default_for_tests() + }, ); let client_socket = bind_to_localhost_unique().expect("should bind - client"); @@ -1582,10 +1583,12 @@ pub mod test { sender, staked_nodes, QuicStreamerConfig { - max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes + ..Default::default() + }, cancel.clone(), ) .unwrap(); @@ -1616,10 +1619,12 @@ pub mod test { sender, staked_nodes, QuicStreamerConfig { - max_connections_per_unstaked_peer: 2, ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_connections_per_unstaked_peer: 2, + ..Default::default() + }, cancel.clone(), ) .unwrap(); diff --git a/streamer/src/nonblocking/simple_qos.rs b/streamer/src/nonblocking/simple_qos.rs index 4f171dd557e..066588642bf 100644 --- a/streamer/src/nonblocking/simple_qos.rs +++ b/streamer/src/nonblocking/simple_qos.rs @@ -11,7 +11,10 @@ use { throttle_stream, ConnectionStreamCounter, STREAM_THROTTLING_INTERVAL, }, }, - quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS}, + quic::{ + StreamerStats, DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER, + DEFAULT_MAX_STAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + }, streamer::StakedNodes, }, quinn::Connection, @@ -30,20 +33,22 @@ use { #[derive(Clone)] pub struct SimpleQosConfig { pub max_streams_per_second: u64, + pub max_staked_connections: usize, + pub max_connections_per_peer: usize, } impl Default for SimpleQosConfig { fn default() -> Self { SimpleQosConfig { max_streams_per_second: DEFAULT_MAX_STREAMS_PER_MS * 1000, + max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS, + max_connections_per_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER, } } } pub struct SimpleQos { - max_streams_per_second: u64, - max_staked_connections: usize, - max_connections_per_peer: usize, + config: SimpleQosConfig, stats: Arc, staked_connection_table: Arc>, staked_nodes: Arc>, @@ -51,17 +56,13 @@ pub struct SimpleQos { impl SimpleQos { pub fn new( - qos_config: SimpleQosConfig, - max_connections_per_peer: usize, - max_staked_connections: usize, + config: SimpleQosConfig, stats: Arc, staked_nodes: Arc>, cancel: CancellationToken, ) -> Self { Self { - max_streams_per_second: qos_config.max_streams_per_second, - max_connections_per_peer, - max_staked_connections, + config, stats, staked_nodes, staked_connection_table: Arc::new(Mutex::new(ConnectionTable::new( @@ -101,7 +102,7 @@ impl SimpleQos { Some(connection.clone()), conn_context.peer_type(), conn_context.last_update.clone(), - self.max_connections_per_peer, + self.config.max_connections_per_peer, ) { update_open_connections_stat(&self.stats, &connection_table_l); @@ -118,7 +119,7 @@ impl SimpleQos { fn max_streams_per_throttling_interval(&self, _context: &SimpleQosConnectionContext) -> u64 { let interval_ms = STREAM_THROTTLING_INTERVAL.as_millis() as u64; - (self.max_streams_per_second * interval_ms / 1000).max(1) + (self.config.max_streams_per_second * interval_ms / 1000).max(1) } } @@ -173,7 +174,7 @@ impl QosController for SimpleQos { ConnectionPeerType::Staked(stake) => { let mut connection_table_l = self.staked_connection_table.lock().await; - if connection_table_l.total_size >= self.max_staked_connections { + if connection_table_l.total_size >= self.config.max_staked_connections { let num_pruned = connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); @@ -189,7 +190,7 @@ impl QosController for SimpleQos { update_open_connections_stat(&self.stats, &connection_table_l); } - if connection_table_l.total_size < self.max_staked_connections { + if connection_table_l.total_size < self.config.max_staked_connections { if let Ok((last_update, cancel_connection, stream_counter)) = self .cache_new_connection( client_connection_tracker, @@ -277,6 +278,11 @@ impl QosController for SimpleQos { .await; } } + + fn max_concurrent_connections(&self) -> usize { + // Allow 25% more connections than required to allow for handshake + self.config.max_staked_connections * 5 / 4 + } } #[cfg(test)] @@ -380,8 +386,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -432,9 +436,10 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let simple_qos = SimpleQos::new( - SimpleQosConfig::default(), - 1, // max_connections_per_peer (set to 1 to trigger limit) - 100, // max_staked_connections + SimpleQosConfig { + max_connections_per_peer: 1, + ..Default::default() + }, stats.clone(), staked_nodes, cancel.clone(), @@ -510,8 +515,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -566,8 +569,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -605,8 +606,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -647,8 +646,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -705,8 +702,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -767,9 +762,10 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides))); let simple_qos = SimpleQos::new( - SimpleQosConfig::default(), - 10, // max_connections_per_peer - 1, // max_staked_connections (set to 1 to trigger pruning) + SimpleQosConfig { + max_staked_connections: 1, + ..Default::default() + }, stats.clone(), staked_nodes, cancel.clone(), @@ -836,9 +832,10 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(Arc::new(stakes), overrides))); let simple_qos = SimpleQos::new( - SimpleQosConfig::default(), - 10, // max_connections_per_peer - 1, // max_staked_connections (set to 1) + SimpleQosConfig { + max_staked_connections: 1, + ..Default::default() + }, stats.clone(), staked_nodes, cancel.clone(), @@ -897,8 +894,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -960,8 +955,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -1022,8 +1015,6 @@ mod tests { let simple_qos = SimpleQos::new( SimpleQosConfig::default(), - 10, // max_connections_per_peer - 100, // max_staked_connections stats.clone(), staked_nodes, cancel.clone(), @@ -1079,17 +1070,12 @@ mod tests { // Set a specific max_streams_per_second for testing let qos_config = SimpleQosConfig { - max_streams_per_second: 10, // 10 streams per second + max_streams_per_second: 10, + max_staked_connections: 100, + max_connections_per_peer: 10, }; - let simple_qos = SimpleQos::new( - qos_config, - 10, // max_connections_per_peer - 100, // max_staked_connections - stats.clone(), - staked_nodes, - cancel.clone(), - ); + let simple_qos = SimpleQos::new(qos_config, stats.clone(), staked_nodes, cancel.clone()); let client_tracker = ClientConnectionTracker { stats: stats.clone(), diff --git a/streamer/src/nonblocking/swqos.rs b/streamer/src/nonblocking/swqos.rs index ec7868536d1..9f1fe1f98af 100644 --- a/streamer/src/nonblocking/swqos.rs +++ b/streamer/src/nonblocking/swqos.rs @@ -14,7 +14,11 @@ use { STREAM_THROTTLING_INTERVAL_MS, }, }, - quic::{StreamerStats, DEFAULT_MAX_STREAMS_PER_MS}, + quic::{ + StreamerStats, DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER, + DEFAULT_MAX_QUIC_CONNECTIONS_PER_UNSTAKED_PEER, DEFAULT_MAX_STAKED_CONNECTIONS, + DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_MAX_UNSTAKED_CONNECTIONS, + }, streamer::StakedNodes, }, percentage::Percentage, @@ -41,21 +45,37 @@ use { #[derive(Clone)] pub struct SwQosConfig { pub max_streams_per_ms: u64, + pub max_staked_connections: usize, + pub max_unstaked_connections: usize, + pub max_connections_per_staked_peer: usize, + pub max_connections_per_unstaked_peer: usize, } impl Default for SwQosConfig { fn default() -> Self { SwQosConfig { max_streams_per_ms: DEFAULT_MAX_STREAMS_PER_MS, + max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS, + max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS, + max_connections_per_staked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER, + max_connections_per_unstaked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_UNSTAKED_PEER, + } + } +} + +impl SwQosConfig { + #[cfg(feature = "dev-context-only-utils")] + pub fn default_for_tests() -> Self { + Self { + max_connections_per_unstaked_peer: 1, + max_connections_per_staked_peer: 1, + ..Self::default() } } } pub struct SwQos { - max_staked_connections: usize, - max_unstaked_connections: usize, - max_connections_per_staked_peer: usize, - max_connections_per_unstaked_peer: usize, + config: SwQosConfig, staked_stream_load_ema: Arc, stats: Arc, staked_nodes: Arc>, @@ -89,24 +109,17 @@ impl ConnectionContext for SwQosConnectionContext { impl SwQos { pub fn new( - qos_config: SwQosConfig, - max_staked_connections: usize, - max_unstaked_connections: usize, - max_connections_per_staked_peer: usize, - max_connections_per_unstaked_peer: usize, + config: SwQosConfig, stats: Arc, staked_nodes: Arc>, cancel: CancellationToken, ) -> Self { Self { - max_staked_connections, - max_unstaked_connections, - max_connections_per_staked_peer, - max_connections_per_unstaked_peer, + config: config.clone(), staked_stream_load_ema: Arc::new(StakedStreamLoadEMA::new( stats.clone(), - max_unstaked_connections, - qos_config.max_streams_per_ms, + config.max_unstaked_connections, + config.max_streams_per_ms, )), stats, staked_nodes, @@ -229,8 +242,8 @@ impl SwQos { ); let max_connections_per_peer = match conn_context.peer_type() { - ConnectionPeerType::Unstaked => self.max_connections_per_unstaked_peer, - ConnectionPeerType::Staked(_) => self.max_connections_per_staked_peer, + ConnectionPeerType::Unstaked => self.config.max_connections_per_unstaked_peer, + ConnectionPeerType::Staked(_) => self.config.max_connections_per_staked_peer, }; if let Some((last_update, cancel_connection, stream_counter)) = connection_table_l .try_add_connection( @@ -391,7 +404,7 @@ impl QosController for SwQos { ConnectionPeerType::Staked(stake) => { let mut connection_table_l = self.staked_connection_table.lock().await; - if connection_table_l.total_size >= self.max_staked_connections { + if connection_table_l.total_size >= self.config.max_staked_connections { let num_pruned = connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); self.stats @@ -400,7 +413,7 @@ impl QosController for SwQos { update_open_connections_stat(&self.stats, &connection_table_l); } - if connection_table_l.total_size < self.max_staked_connections { + if connection_table_l.total_size < self.config.max_staked_connections { if let Ok((last_update, cancel_connection, stream_counter)) = self .cache_new_connection( client_connection_tracker, @@ -426,7 +439,7 @@ impl QosController for SwQos { client_connection_tracker, connection, self.unstaked_connection_table.clone(), - self.max_unstaked_connections, + self.config.max_unstaked_connections, conn_context, ) .await @@ -454,7 +467,7 @@ impl QosController for SwQos { client_connection_tracker, connection, self.unstaked_connection_table.clone(), - self.max_unstaked_connections, + self.config.max_unstaked_connections, conn_context, ) .await @@ -550,6 +563,12 @@ impl QosController for SwQos { .await; } } + + fn max_concurrent_connections(&self) -> usize { + // Allow 25% more connections than required to allow for handshake + + (self.config.max_staked_connections + self.config.max_unstaked_connections) * 5 / 4 + } } #[cfg(test)] diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index f5beed7562a..36ca9e95fb3 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -48,10 +48,6 @@ where let swqos = Arc::new(SwQos::new( qos_config, - quic_server_params.max_staked_connections, - quic_server_params.max_unstaked_connections, - quic_server_params.max_connections_per_staked_peer, - quic_server_params.max_connections_per_unstaked_peer, stats.clone(), staked_nodes, cancel.clone(), diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 4d162ebbe58..4df06de3975 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -560,10 +560,6 @@ impl StreamerStats { #[derive(Clone)] pub struct QuicStreamerConfig { - pub max_connections_per_unstaked_peer: usize, - pub max_connections_per_staked_peer: usize, - pub max_staked_connections: usize, - pub max_unstaked_connections: usize, pub max_connections_per_ipaddr_per_min: u64, pub wait_for_chunk_timeout: Duration, pub num_threads: NonZeroUsize, @@ -584,10 +580,6 @@ pub struct SimpleQosQuicStreamerConfig { impl Default for QuicStreamerConfig { fn default() -> Self { Self { - max_connections_per_unstaked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_UNSTAKED_PEER, - max_connections_per_staked_peer: DEFAULT_MAX_QUIC_CONNECTIONS_PER_STAKED_PEER, - max_staked_connections: DEFAULT_MAX_STAKED_CONNECTIONS, - max_unstaked_connections: DEFAULT_MAX_UNSTAKED_CONNECTIONS, max_connections_per_ipaddr_per_min: DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, num_threads: NonZeroUsize::new(num_cpus::get().min(1)).expect("1 is non-zero"), @@ -602,17 +594,10 @@ impl QuicStreamerConfig { #[cfg(feature = "dev-context-only-utils")] pub fn default_for_tests() -> Self { Self { - max_connections_per_unstaked_peer: 1, - max_connections_per_staked_peer: 1, num_threads: Self::DEFAULT_NUM_SERVER_THREADS_FOR_TEST, ..Self::default() } } - - pub(crate) fn max_concurrent_connections(&self) -> usize { - let conns = self.max_staked_connections + self.max_unstaked_connections; - conns + conns / 4 - } } /// Generic function to spawn a tokio runtime with a QUIC server @@ -680,10 +665,6 @@ pub fn spawn_stake_wighted_qos_server( let stats = Arc::::default(); let swqos = Arc::new(SwQos::new( qos_config, - quic_server_params.max_staked_connections, - quic_server_params.max_unstaked_connections, - quic_server_params.max_connections_per_unstaked_peer, - quic_server_params.max_connections_per_unstaked_peer, stats.clone(), staked_nodes, cancel.clone(), @@ -714,11 +695,8 @@ pub fn spawn_simple_qos_server( cancel: CancellationToken, ) -> Result { let stats = Arc::::default(); - let simple_qos = Arc::new(SimpleQos::new( qos_config, - quic_server_params.max_connections_per_staked_peer, - quic_server_params.max_staked_connections, stats.clone(), staked_nodes, cancel.clone(), @@ -815,7 +793,7 @@ mod test { sender, staked_nodes, server_params, - SwQosConfig::default(), + SwQosConfig::default_for_tests(), cancel.clone(), ) .unwrap(); @@ -871,10 +849,12 @@ mod test { sender, staked_nodes, QuicStreamerConfig { - max_connections_per_unstaked_peer: 2, ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_connections_per_unstaked_peer: 2, + ..Default::default() + }, cancel.clone(), ) .unwrap(); @@ -915,13 +895,12 @@ mod test { ); let server_params = QuicStreamerConfig { - max_unstaked_connections: 0, - max_connections_per_staked_peer: 1, - max_connections_per_unstaked_peer: 0, ..QuicStreamerConfig::default_for_tests() }; let qos_config = SimpleQosConfig { + max_connections_per_peer: 1, max_streams_per_second: 20, // low limit to ensure staked node can send all packets + ..Default::default() }; let server_params = SimpleQosQuicStreamerConfig { quic_streamer_config: server_params, @@ -964,10 +943,12 @@ mod test { sender, staked_nodes, QuicStreamerConfig { - max_unstaked_connections: 0, ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_unstaked_connections: 0, + ..Default::default() + }, cancel.clone(), ) .unwrap(); diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 15a72a48b31..75e961d8c31 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -310,13 +310,13 @@ async fn test_connection_denied_until_allowed() { cancel, } = setup_quic_server( None, - QuicStreamerConfig { + QuicStreamerConfig::default_for_tests(), + SwQosConfig { // To prevent server from accepting a new connection, we // set max_connections_per_peer == 1 max_connections_per_unstaked_peer: 1, - ..QuicStreamerConfig::default_for_tests() + ..Default::default() }, - SwQosConfig::default(), ); // If we create a blocking connection and try to create connections to send TXs, @@ -386,11 +386,13 @@ async fn test_connection_pruned_and_reopened() { } = setup_quic_server( None, QuicStreamerConfig { + ..QuicStreamerConfig::default_for_tests() + }, + SwQosConfig { max_connections_per_unstaked_peer: 100, max_unstaked_connections: 1, - ..QuicStreamerConfig::default_for_tests() + ..Default::default() }, - SwQosConfig::default(), ); // Setup sending txs @@ -443,14 +445,16 @@ async fn test_staked_connection() { } = setup_quic_server( Some(staked_nodes), QuicStreamerConfig { + ..QuicStreamerConfig::default() + }, + SwQosConfig { // Must use at least the number of endpoints (10) because // `max_staked_connections` and `max_unstaked_connections` are // cumulative for all the endpoints. max_staked_connections: 10, max_unstaked_connections: 0, - ..QuicStreamerConfig::default_for_tests() + ..Default::default() }, - SwQosConfig::default(), ); // Setup sending txs @@ -594,11 +598,13 @@ async fn test_rate_limiting() { } = setup_quic_server( None, QuicStreamerConfig { - max_connections_per_unstaked_peer: 100, max_connections_per_ipaddr_per_min: 1, ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_connections_per_unstaked_peer: 100, + ..Default::default() + }, ); // open a connection to consume the limit @@ -656,11 +662,13 @@ async fn test_rate_limiting_establish_connection() { } = setup_quic_server( None, QuicStreamerConfig { - max_connections_per_unstaked_peer: 100, max_connections_per_ipaddr_per_min: 1, ..QuicStreamerConfig::default_for_tests() }, - SwQosConfig::default(), + SwQosConfig { + max_connections_per_unstaked_peer: 100, + ..Default::default() + }, ); let connection_to_reach_limit = make_client_endpoint(&server_address, None).await; @@ -738,15 +746,17 @@ async fn test_update_identity() { } = setup_quic_server( Some(staked_nodes), QuicStreamerConfig { + ..QuicStreamerConfig::default_for_tests() + }, + SwQosConfig { // Must use at least the number of endpoints (10) because // `max_staked_connections` and `max_unstaked_connections` are // cumulative for all the endpoints. max_staked_connections: 10, // Deny all unstaked connections. max_unstaked_connections: 0, - ..QuicStreamerConfig::default_for_tests() + ..Default::default() }, - SwQosConfig::default(), ); // Setup sending txs @@ -802,11 +812,13 @@ async fn test_proactive_connection_close_detection() { } = setup_quic_server( None, QuicStreamerConfig { + ..QuicStreamerConfig::default_for_tests() + }, + SwQosConfig { max_connections_per_unstaked_peer: 1, max_unstaked_connections: 1, - ..QuicStreamerConfig::default_for_tests() + ..Default::default() }, - SwQosConfig::default(), ); // Setup controlled transaction sending diff --git a/validator/src/commands/run/execute.rs b/validator/src/commands/run/execute.rs index 5569e4c6fc5..6834165608d 100644 --- a/validator/src/commands/run/execute.rs +++ b/validator/src/commands/run/execute.rs @@ -958,6 +958,11 @@ pub fn execute( let tpu_quic_server_config = SwQosQuicStreamerConfig { quic_streamer_config: QuicStreamerConfig { + max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, + num_threads: tpu_transaction_receive_threads, + ..Default::default() + }, + qos_config: SwQosConfig { max_connections_per_unstaked_peer: tpu_max_connections_per_unstaked_peer .try_into() .unwrap(), @@ -966,15 +971,17 @@ pub fn execute( .unwrap(), max_staked_connections: tpu_max_staked_connections.try_into().unwrap(), max_unstaked_connections: tpu_max_unstaked_connections.try_into().unwrap(), - max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, - num_threads: tpu_transaction_receive_threads, - ..Default::default() + max_streams_per_ms, }, - qos_config: SwQosConfig { max_streams_per_ms }, }; let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig { quic_streamer_config: QuicStreamerConfig { + max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, + num_threads: tpu_transaction_forward_receive_threads, + ..Default::default() + }, + qos_config: SwQosConfig { max_connections_per_staked_peer: tpu_max_connections_per_staked_peer .try_into() .unwrap(), @@ -983,23 +990,19 @@ pub fn execute( .unwrap(), max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(), max_unstaked_connections: tpu_max_fwd_unstaked_connections.try_into().unwrap(), - max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, - num_threads: tpu_transaction_forward_receive_threads, - ..Default::default() + max_streams_per_ms, }, - qos_config: SwQosConfig { max_streams_per_ms }, }; let vote_quic_server_config = SimpleQosQuicStreamerConfig { quic_streamer_config: QuicStreamerConfig { - max_connections_per_unstaked_peer: 1, - max_staked_connections: tpu_max_fwd_staked_connections.try_into().unwrap(), max_connections_per_ipaddr_per_min: tpu_max_connections_per_ipaddr_per_minute, num_threads: tpu_vote_transaction_receive_threads, ..Default::default() }, qos_config: SimpleQosConfig { max_streams_per_second: MAX_VOTES_PER_SECOND, + ..Default::default() }, }; diff --git a/vortexor/src/vortexor.rs b/vortexor/src/vortexor.rs index 6f48fca34d8..8118baf921a 100644 --- a/vortexor/src/vortexor.rs +++ b/vortexor/src/vortexor.rs @@ -121,14 +121,17 @@ impl Vortexor { ) -> Self { let quic_server_params = SwQosQuicStreamerConfig { quic_streamer_config: QuicStreamerConfig { - max_connections_per_unstaked_peer: max_connections_per_peer, - max_staked_connections: max_tpu_staked_connections, - max_unstaked_connections: max_tpu_unstaked_connections, max_connections_per_ipaddr_per_min, wait_for_chunk_timeout: DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, ..Default::default() }, - qos_config: SwQosConfig { max_streams_per_ms }, + qos_config: SwQosConfig { + max_connections_per_unstaked_peer: max_connections_per_peer, + max_connections_per_staked_peer: max_connections_per_peer, + max_staked_connections: max_tpu_staked_connections, + max_unstaked_connections: max_tpu_unstaked_connections, + max_streams_per_ms, + }, }; let mut quic_fwd_server_params = quic_server_params.clone(); @@ -153,12 +156,8 @@ impl Vortexor { // Fot TPU forward -- we disallow unstaked connections. Allocate all connection resources // for staked connections: - quic_fwd_server_params - .quic_streamer_config - .max_staked_connections = max_fwd_staked_connections; - quic_fwd_server_params - .quic_streamer_config - .max_unstaked_connections = max_fwd_unstaked_connections; + quic_fwd_server_params.qos_config.max_staked_connections = max_fwd_staked_connections; + quic_fwd_server_params.qos_config.max_unstaked_connections = max_fwd_unstaked_connections; let tpu_fwd_result = spawn_stake_wighted_qos_server( "solVtxTpuFwd", "quic_vortexor_tpu_forwards",