diff --git a/Cargo.lock b/Cargo.lock index 36274246efb..622d943ac23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7415,6 +7415,7 @@ dependencies = [ "bytes", "crossbeam-channel", "dashmap", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.6", @@ -7429,6 +7430,7 @@ dependencies = [ "rand 0.8.5", "rustls", "smallvec", + "socket2 0.5.7", "solana-logger", "solana-measure", "solana-metrics", diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a682b6e6db2..1e8c88e8bdb 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -260,7 +260,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -281,6 +281,9 @@ mod tests { ) .unwrap(); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); let connection_cache = ConnectionCache::new_with_client_options( "connection_cache_test", 1, // connection_pool_size diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 96350cfbb26..05c3932c490 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -38,7 +38,9 @@ use { solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, - quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + quic::{ + spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + }, streamer::StakedNodes, }, solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, @@ -60,8 +62,8 @@ pub struct TpuSockets { pub transaction_forwards: Vec, pub vote: Vec, pub broadcast: Vec, - pub transactions_quic: UdpSocket, - pub transactions_forwards_quic: UdpSocket, + pub transactions_quic: Vec, + pub transactions_forwards_quic: Vec, } pub struct Tpu { @@ -153,10 +155,10 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_quic_t, key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpu", "quic_streamer_tpu", transactions_quic_sockets, @@ -175,10 +177,10 @@ impl Tpu { .unwrap(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_forwards_quic_t, key_updater: forwards_key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpuFwd", "quic_streamer_tpu_forwards", transactions_forwards_quic_sockets, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 8b98b5ff358..ea3d02715d8 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -54,8 +54,9 @@ use { solana_ledger::shred::Shred, solana_measure::measure::Measure, solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, bind_two_in_range_with_offset, - find_available_port_in_range, multi_bind_in_range, PortRange, + bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, + bind_more_with_config, bind_two_in_range_with_offset_and_config, + find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig, }, solana_perf::{ data_budget::DataBudget, @@ -2891,8 +2892,8 @@ pub struct Sockets { pub serve_repair: UdpSocket, pub serve_repair_quic: UdpSocket, pub ancestor_hashes_requests: UdpSocket, - pub tpu_quic: UdpSocket, - pub tpu_forwards_quic: UdpSocket, + pub tpu_quic: Vec, + pub tpu_forwards_quic: Vec, } pub struct NodeConfig { @@ -2905,6 +2906,9 @@ pub struct NodeConfig { pub num_tvu_sockets: NonZeroUsize, } +// This will be adjusted and parameterized in follow-on PRs. +const QUIC_ENDPOINTS: usize = 1; + #[derive(Debug)] pub struct Node { pub info: ContactInfo, @@ -2922,15 +2926,35 @@ impl Node { let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); let port_range = (1024, 65535); + let udp_config = SocketConfig { reuseport: false }; + let quic_config = SocketConfig { reuseport: true }; let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -3008,7 +3032,7 @@ impl Node { if gossip_addr.port() != 0 { ( gossip_addr.port(), - bind_common(bind_ip_addr, gossip_addr.port(), false).unwrap_or_else(|e| { + bind_common(bind_ip_addr, gossip_addr.port()).unwrap_or_else(|e| { panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e) }), ) @@ -3017,7 +3041,16 @@ impl Node { } } fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) { - bind_in_range(bind_ip_addr, port_range).expect("Failed to bind") + let config = SocketConfig { reuseport: false }; + Self::bind_with_config(bind_ip_addr, port_range, config) + } + + fn bind_with_config( + bind_ip_addr: IpAddr, + port_range: PortRange, + config: SocketConfig, + ) -> (u16, UdpSocket) { + bind_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind") } pub fn new_single_bind( @@ -3030,10 +3063,30 @@ impl Node { Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); + let udp_config = SocketConfig { reuseport: false }; + let quic_config = SocketConfig { reuseport: true }; let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); @@ -3117,21 +3170,28 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); - let (_tpu_port_quic, tpu_quic) = Self::bind( + let quic_config = SocketConfig { reuseport: true }; + let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), + quic_config.clone(), ); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); - let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind( + let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( bind_ip_addr, ( tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), + quic_config.clone(), ); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); @@ -3338,7 +3398,6 @@ mod tests { }, itertools::izip, solana_ledger::shred::Shredder, - solana_net_utils::MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, solana_sdk::signature::{Keypair, Signer}, solana_vote_program::{vote_instruction, vote_state::Vote}, std::{ @@ -3771,10 +3830,8 @@ mod tests { fn new_with_external_ip_test_gossip() { // Can't use VALIDATOR_PORT_RANGE because if this test runs in parallel with others, the // port returned by `bind_in_range()` might be snatched up before `Node::new_with_external_ip()` runs - let port_range = ( - VALIDATOR_PORT_RANGE.1 + MINIMUM_VALIDATOR_PORT_RANGE_WIDTH, - VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH), - ); + let (start, end) = VALIDATOR_PORT_RANGE; + let port_range = (end, end + (end - start)); let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); let port = bind_in_range(ip, port_range).expect("Failed to bind").0; let config = NodeConfig { diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 58493984682..e6ac57b9302 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -337,7 +337,9 @@ impl LocalCluster { socket_addr_space, DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, - DEFAULT_TPU_ENABLE_UDP, + // We are turning tpu_enable_udp to true in order to prevent concurrent local cluster tests + // to use the same QUIC ports due to SO_REUSEPORT. + true, 32, // max connections per IpAddr per minute Arc::new(RwLock::new(None)), ) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index e7df091edbf..50008171eed 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -384,29 +384,45 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } +#[derive(Clone, Debug)] +pub struct SocketConfig { + pub reuseport: bool, +} + +impl Default for SocketConfig { + #[allow(clippy::derivable_impls)] + fn default() -> Self { + Self { reuseport: false } + } +} + #[cfg(any(windows, target_os = "ios"))] fn udp_socket(_reuseaddr: bool) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; Ok(sock) } +#[cfg(any(windows, target_os = "ios"))] +fn udp_socket_with_config(_config: SocketConfig) -> io::Result { + let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; + Ok(sock) +} + +#[cfg(not(any(windows, target_os = "ios")))] +fn udp_socket(reuseport: bool) -> io::Result { + let config = SocketConfig { reuseport }; + udp_socket_with_config(config) +} + #[cfg(not(any(windows, target_os = "ios")))] -fn udp_socket(reuseaddr: bool) -> io::Result { - use { - nix::sys::socket::{ - setsockopt, - sockopt::{ReuseAddr, ReusePort}, - }, - std::os::fd::AsFd, - }; +fn udp_socket_with_config(config: SocketConfig) -> io::Result { + use nix::sys::socket::{setsockopt, sockopt::ReusePort}; + let SocketConfig { reuseport } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; - let sock_fd = sock.as_fd(); - if reuseaddr { - // best effort, i.e. ignore errors here, we'll get the failure in caller - setsockopt(&sock_fd, ReusePort, &true).ok(); - setsockopt(&sock_fd, ReuseAddr, &true).ok(); + if reuseport { + setsockopt(&sock, ReusePort, &true).ok(); } Ok(sock) @@ -418,7 +434,7 @@ pub fn bind_common_in_range( range: PortRange, ) -> io::Result<(u16, (UdpSocket, TcpListener))> { for port in range.0..range.1 { - if let Ok((sock, listener)) = bind_common(ip_addr, port, false) { + if let Ok((sock, listener)) = bind_common(ip_addr, port) { return Result::Ok((sock.local_addr().unwrap().port(), (sock, listener))); } } @@ -430,7 +446,16 @@ pub fn bind_common_in_range( } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { - let sock = udp_socket(false)?; + let config = SocketConfig::default(); + bind_in_range_with_config(ip_addr, range, config) +} + +pub fn bind_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, +) -> io::Result<(u16, UdpSocket)> { + let sock = udp_socket_with_config(config)?; for port in range.0..range.1 { let addr = SocketAddr::new(ip_addr, port); @@ -484,8 +509,9 @@ pub fn multi_bind_in_range( port }; // drop the probe, port should be available... briefly. + let config = SocketConfig { reuseport: true }; for _ in 0..num { - let sock = bind_to(ip_addr, port, true); + let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { sockets.push(sock); } else { @@ -505,8 +531,17 @@ pub fn multi_bind_in_range( Ok((port, sockets)) } -pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result { - let sock = udp_socket(reuseaddr)?; +pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result { + let config = SocketConfig { reuseport }; + bind_to_with_config(ip_addr, port, config) +} + +pub fn bind_to_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config)?; let addr = SocketAddr::new(ip_addr, port); @@ -514,12 +549,18 @@ pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result io::Result<(UdpSocket, TcpListener)> { + let config = SocketConfig { reuseport: false }; + bind_common_with_config(ip_addr, port, config) +} + +// binds both a UdpSocket and a TcpListener +pub fn bind_common_with_config( ip_addr: IpAddr, port: u16, - reuseaddr: bool, + config: SocketConfig, ) -> io::Result<(UdpSocket, TcpListener)> { - let sock = udp_socket(reuseaddr)?; + let sock = udp_socket_with_config(config)?; let addr = SocketAddr::new(ip_addr, port); let sock_addr = SockAddr::from(addr); @@ -531,6 +572,18 @@ pub fn bind_two_in_range_with_offset( ip_addr: IpAddr, range: PortRange, offset: u16, +) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { + let sock1_config = SocketConfig::default(); + let sock2_config = SocketConfig::default(); + bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config) +} + +pub fn bind_two_in_range_with_offset_and_config( + ip_addr: IpAddr, + range: PortRange, + offset: u16, + sock1_config: SocketConfig, + sock2_config: SocketConfig, ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { if range.1.saturating_sub(range.0) < offset { return Err(io::Error::new( @@ -539,9 +592,11 @@ pub fn bind_two_in_range_with_offset( )); } for port in range.0..range.1 { - if let Ok(first_bind) = bind_to(ip_addr, port, false) { + if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config.clone()) { if range.1.saturating_sub(port) >= offset { - if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) { + if let Ok(second_bind) = + bind_to_with_config(ip_addr, port + offset, sock2_config.clone()) + { return Ok(( (first_bind.local_addr().unwrap().port(), first_bind), (second_bind.local_addr().unwrap().port(), second_bind), @@ -563,7 +618,7 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re let mut tries_left = end - start; let mut rand_port = thread_rng().gen_range(start..end); loop { - match bind_common(ip_addr, rand_port, false) { + match bind_common(ip_addr, rand_port) { Ok(_) => { break Ok(rand_port); } @@ -581,6 +636,19 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re } } +pub fn bind_more_with_config( + socket: UdpSocket, + num: usize, + config: SocketConfig, +) -> io::Result> { + let addr = socket.local_addr().unwrap(); + let ip = addr.ip(); + let port = addr.port(); + std::iter::once(Ok(socket)) + .chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone()))) + .collect() +} + #[cfg(test)] mod tests { use {super::*, std::net::Ipv4Addr}; @@ -684,8 +752,9 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let x = bind_to(ip_addr, 2002, true).unwrap(); - let y = bind_to(ip_addr, 2002, true).unwrap(); + let config = SocketConfig { reuseport: true }; + let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f32a3544397..ad3d327535c 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6241,6 +6241,7 @@ dependencies = [ "bytes", "crossbeam-channel", "dashmap", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.6", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 7f97786da66..0d1397d6e47 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -73,7 +73,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (s, exit, keypair) = server_args(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -159,7 +159,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (s, exit, keypair) = server_args(); let solana_streamer::nonblocking::quic::SpawnNonBlockingServerResult { - endpoint: _, + endpoints: _, stats: _, thread: t, max_concurrent_connections: _, @@ -223,7 +223,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (request_recv_socket, request_recv_exit, keypair) = server_args(); let SpawnServerResult { - endpoint: request_recv_endpoint, + endpoints: request_recv_endpoints, thread: request_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -244,7 +244,7 @@ mod tests { ) .unwrap(); - drop(request_recv_endpoint); + drop(request_recv_endpoints); // Response Receiver: let (response_recv_socket, response_recv_exit, keypair2) = server_args(); let (sender2, receiver2) = unbounded(); @@ -253,7 +253,7 @@ mod tests { let port = response_recv_socket.local_addr().unwrap().port(); let server_addr = SocketAddr::new(addr, port); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -286,6 +286,10 @@ mod tests { key: priv_key, }); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); + drop(response_recv_endpoints); let endpoint = QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint)); let request_sender = diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index d173da1a568..8ffa23299ba 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -14,6 +14,7 @@ async-channel = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } dashmap = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } @@ -39,6 +40,7 @@ x509-parser = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +socket2 = { workspace = true } solana-logger = { workspace = true } [lib] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index c4ad535bd41..cdeb46cfd06 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -16,9 +16,10 @@ use { }, bytes::Bytes, crossbeam_channel::Sender, + futures::{stream::FuturesUnordered, Future, StreamExt as _}, indexmap::map::{Entry, IndexMap}, percentage::Percentage, - quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, + quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, smallvec::SmallVec, @@ -40,11 +41,13 @@ use { std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, + pin::Pin, // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, + task::Poll, time::{Duration, Instant}, }, tokio::{ @@ -56,6 +59,7 @@ use { // but if we do, the scope of the RwLock must always be a subset of the async Mutex // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to // introduce any other awaits while holding the RwLock. + select, sync::{Mutex, MutexGuard}, task::JoinHandle, time::{sleep, timeout}, @@ -135,7 +139,7 @@ impl ConnectionPeerType { } pub struct SpawnNonBlockingServerResult { - pub endpoint: Endpoint, + pub endpoints: Vec, pub stats: Arc, pub thread: JoinHandle<()>, pub max_concurrent_connections: usize, @@ -157,23 +161,61 @@ pub fn spawn_server( wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { - info!("Start {name} quic server on {sock:?}"); - let concurrent_connections = max_staked_connections + max_unstaked_connections; + spawn_server_multi( + name, + vec![sock], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + max_streams_per_ms, + max_connections_per_ipaddr_per_min, + wait_for_chunk_timeout, + coalesce, + ) +} + +#[allow(clippy::too_many_arguments, clippy::type_complexity)] +pub fn spawn_server_multi( + name: &'static str, + sockets: Vec, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + max_streams_per_ms: u64, + max_connections_per_ipaddr_per_min: u64, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result { + info!("Start {name} quic server on {sockets:?}"); + let concurrent_connections = + (max_staked_connections + max_unstaked_connections) / sockets.len(); let max_concurrent_connections = concurrent_connections + concurrent_connections / 4; let (config, _cert) = configure_server(keypair, max_concurrent_connections)?; - let endpoint = Endpoint::new( - EndpointConfig::default(), - Some(config), - sock, - Arc::new(TokioRuntime), - ) - .map_err(QuicServerError::EndpointFailed)?; - + let endpoints = sockets + .into_iter() + .map(|sock| { + Endpoint::new( + EndpointConfig::default(), + Some(config.clone()), + sock, + Arc::new(TokioRuntime), + ) + .map_err(QuicServerError::EndpointFailed) + }) + .collect::, _>>()?; let stats = Arc::::default(); let handle = tokio::spawn(run_server( name, - endpoint.clone(), + endpoints.clone(), packet_sender, exit, max_connections_per_peer, @@ -187,7 +229,7 @@ pub fn spawn_server( coalesce, )); Ok(SpawnNonBlockingServerResult { - endpoint, + endpoints, stats, thread: handle, max_concurrent_connections, @@ -197,7 +239,7 @@ pub fn spawn_server( #[allow(clippy::too_many_arguments)] async fn run_server( name: &'static str, - incoming: Endpoint, + incoming: Vec, packet_sender: Sender, exit: Arc, max_connections_per_peer: usize, @@ -234,8 +276,37 @@ async fn run_server( stats.clone(), coalesce, )); + + let mut accepts = incoming + .iter() + .enumerate() + .map(|(i, incoming)| { + Box::pin(EndpointAccept { + accept: incoming.accept(), + endpoint: i, + }) + }) + .collect::>(); while !exit.load(Ordering::Relaxed) { - let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; + let timeout_connection = select! { + ready = accepts.next() => { + if let Some((connecting, i)) = ready { + accepts.push( + Box::pin(EndpointAccept { + accept: incoming[i].accept(), + endpoint: i, + } + )); + Ok(connecting) + } else { + // we can't really get here - we never poll an empty FuturesUnordered + continue + } + } + _ = tokio::time::sleep(WAIT_FOR_CONNECTION_TIMEOUT) => { + Err(()) + } + }; if last_datapoint.elapsed().as_secs() >= 5 { stats.report(name); @@ -1317,6 +1388,25 @@ impl ConnectionTable { } } +struct EndpointAccept<'a> { + endpoint: usize, + accept: Accept<'a>, +} + +impl<'a> Future for EndpointAccept<'a> { + type Output = (Option, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { + let i = self.endpoint; + // Safety: + // self is pinned and accept is a field so it can't get moved out. See safety docs of + // map_unchecked_mut. + unsafe { self.map_unchecked_mut(|this| &mut this.accept) } + .poll(cx) + .map(|r| (r, i)) + } +} + #[cfg(test)] pub mod test { use { @@ -1395,20 +1485,47 @@ pub mod test { SocketAddr, Arc, ) { - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let sockets = { + #[cfg(not(target_os = "windows"))] + { + use std::{ + os::fd::{FromRawFd, IntoRawFd}, + str::FromStr as _, + }; + (0..10) + .map(|_| { + let sock = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ) + .unwrap(); + sock.set_reuse_port(true).unwrap(); + sock.bind(&SocketAddr::from_str("127.0.0.1:0").unwrap().into()) + .unwrap(); + unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) } + }) + .collect::>() + } + #[cfg(target_os = "windows")] + { + vec![UdpSocket::bind("127.0.0.1:0").unwrap()] + } + }; + let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); - let server_address = s.local_addr().unwrap(); + let server_address = sockets[0].local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default())); let SpawnNonBlockingServerResult { - endpoint: _, + endpoints: _, stats, thread: t, max_concurrent_connections: _, - } = spawn_server( + } = spawn_server_multi( "quic_streamer_test", - s, + sockets, &keypair, sender, exit.clone(), @@ -1844,7 +1961,7 @@ pub mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnNonBlockingServerResult { - endpoint: _, + endpoints: _, stats: _, thread: t, max_concurrent_connections: _, @@ -1880,7 +1997,7 @@ pub mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnNonBlockingServerResult { - endpoint: _, + endpoints: _, stats, thread: t, max_concurrent_connections: _, diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index fca91004358..a2aa9f18a05 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -37,7 +37,7 @@ impl SkipClientVerification { } pub struct SpawnServerResult { - pub endpoint: Endpoint, + pub endpoints: Vec, pub thread: thread::JoinHandle<()>, pub key_updater: Arc, } @@ -121,14 +121,16 @@ pub enum QuicServerError { } pub struct EndpointKeyUpdater { - endpoint: Endpoint, + endpoints: Vec, max_concurrent_connections: usize, } impl NotifyKeyUpdate for EndpointKeyUpdater { fn update_key(&self, key: &Keypair) -> Result<(), Box> { let (config, _) = configure_server(key, self.max_concurrent_connections)?; - self.endpoint.set_server_config(Some(config)); + for endpoint in &self.endpoints { + endpoint.set_server_config(Some(config.clone())); + } Ok(()) } } @@ -525,7 +527,42 @@ impl StreamStats { pub fn spawn_server( thread_name: &'static str, metrics_name: &'static str, - sock: UdpSocket, + socket: UdpSocket, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + max_streams_per_ms: u64, + max_connections_per_ipaddr_per_min: u64, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result { + spawn_server_multi( + thread_name, + metrics_name, + vec![socket], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + max_streams_per_ms, + max_connections_per_ipaddr_per_min, + wait_for_chunk_timeout, + coalesce, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn spawn_server_multi( + thread_name: &'static str, + metrics_name: &'static str, + sockets: Vec, keypair: &Keypair, packet_sender: Sender, exit: Arc, @@ -541,9 +578,9 @@ pub fn spawn_server( let runtime = rt(format!("{thread_name}Rt")); let result = { let _guard = runtime.enter(); - crate::nonblocking::quic::spawn_server( + crate::nonblocking::quic::spawn_server_multi( metrics_name, - sock, + sockets, keypair, packet_sender, exit, @@ -566,11 +603,11 @@ pub fn spawn_server( }) .unwrap(); let updater = EndpointKeyUpdater { - endpoint: result.endpoint.clone(), + endpoints: result.endpoints.clone(), max_concurrent_connections: result.max_concurrent_connections, }; Ok(SpawnServerResult { - endpoint: result.endpoint, + endpoints: result.endpoints, thread: handle, key_updater: Arc::new(updater), }) @@ -602,7 +639,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -663,7 +700,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -711,7 +748,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 63084741f03..ef1ddf348b1 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -85,11 +85,11 @@ fn verify_reachable_ports( } if verify_address(&node.info.tpu(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu.iter()); - udp_sockets.push(&node.sockets.tpu_quic); + udp_sockets.extend(&node.sockets.tpu_quic); } if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); - udp_sockets.push(&node.sockets.tpu_forwards_quic); + udp_sockets.extend(&node.sockets.tpu_forwards_quic); } if verify_address(&node.info.tpu_vote().ok()) { udp_sockets.extend(node.sockets.tpu_vote.iter());