diff --git a/Cargo.lock b/Cargo.lock index 1ba69acd0787f5..fc829de2bcbe02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6426,6 +6426,7 @@ dependencies = [ "solana-gossip", "solana-ledger", "solana-logger", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -6533,6 +6534,7 @@ dependencies = [ "bincode", "clap 3.2.23", "crossbeam-channel", + "lazy_static", "log", "nix 0.26.4", "rand 0.8.5", @@ -7273,6 +7275,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.5", @@ -7286,9 +7289,12 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "serial_test", + "socket2 0.5.6", "solana-logger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-sdk", "solana-transaction-metrics-tracker", diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a94bc7cd3d8ca8..e9cec25bb2ff9c 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -256,7 +256,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -275,6 +275,9 @@ mod tests { ) .unwrap(); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); let connection_cache = ConnectionCache::new_with_client_options( "connection_cache_test", 1, // connection_pool_size diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 640caf64544d45..8c114b0f62a7f9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -34,7 +34,9 @@ use { solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + quic::{ + spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + }, streamer::StakedNodes, }, solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, @@ -57,8 +59,8 @@ pub struct TpuSockets { pub transaction_forwards: Vec, pub vote: Vec, pub broadcast: Vec, - pub transactions_quic: UdpSocket, - pub transactions_forwards_quic: UdpSocket, + pub transactions_quic: Vec, + pub transactions_forwards_quic: Vec, } pub struct Tpu { @@ -149,10 +151,10 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_quic_t, key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpu", "quic_streamer_tpu", transactions_quic_sockets, @@ -169,10 +171,10 @@ impl Tpu { .unwrap(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_forwards_quic_t, key_updater: forwards_key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpuFwd", "quic_streamer_tpu_forwards", transactions_forwards_quic_sockets, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7d737d313eeae7..8f6fe09ffec1b3 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -54,8 +54,9 @@ use { solana_ledger::shred::Shred, solana_measure::measure::Measure, solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, bind_two_in_range_with_offset, - find_available_port_in_range, multi_bind_in_range, PortRange, + bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, + bind_more_with_config, bind_two_in_range_with_offset_and_config, + find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig, }, solana_perf::{ data_budget::DataBudget, @@ -2782,8 +2783,8 @@ pub struct Sockets { pub serve_repair: UdpSocket, pub serve_repair_quic: UdpSocket, pub ancestor_hashes_requests: UdpSocket, - pub tpu_quic: UdpSocket, - pub tpu_forwards_quic: UdpSocket, + pub tpu_quic: Vec, + pub tpu_forwards_quic: Vec, } pub struct NodeConfig { @@ -2794,6 +2795,8 @@ pub struct NodeConfig { pub public_tpu_forwards_addr: Option, } +const QUIC_ENDPOINTS: usize = 10; + #[derive(Debug)] pub struct Node { pub info: ContactInfo, @@ -2811,15 +2814,47 @@ impl Node { let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); let port_range = (1024, 65535); + let udp_config = SocketConfig { + reuseaddr: false, + reuseport: true, + first_reuse: true, + }; + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + first_reuse: true, + }; let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + + let mut quic_config_more = quic_config.clone(); + quic_config_more.first_reuse = false; + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config_more.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config_more.clone()) + .unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -2906,7 +2941,20 @@ impl Node { } } fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) { - bind_in_range(bind_ip_addr, port_range).expect("Failed to bind") + let config = SocketConfig { + reuseaddr: false, + reuseport: false, + first_reuse: false, + }; + Self::bind_with_config(bind_ip_addr, port_range, config) + } + + fn bind_with_config( + bind_ip_addr: IpAddr, + port_range: PortRange, + config: SocketConfig, + ) -> (u16, UdpSocket) { + bind_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind") } pub fn new_single_bind( @@ -2919,10 +2967,41 @@ impl Node { Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); + let udp_config = SocketConfig { + reuseaddr: false, + reuseport: false, + first_reuse: false, + }; + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + first_reuse: true, + }; let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let mut quic_config_more = quic_config.clone(); + quic_config_more.first_reuse = false; + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config_more.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config_more.clone()) + .unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); @@ -3004,21 +3083,25 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); - let (_tpu_port_quic, tpu_quic) = Self::bind( + let (_tpu_port_quic, tpu_quic) = multi_bind_in_range( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), - ); + QUIC_ENDPOINTS, + ) + .expect("tpu_quic multi_bind"); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); - let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind( + let (_tpu_forwards_port_quic, tpu_forwards_quic) = multi_bind_in_range( bind_ip_addr, ( tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), - ); + QUIC_ENDPOINTS, + ) + .expect("tpu_forward_quic multi_bind"); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); @@ -3624,7 +3707,9 @@ mod tests { VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH), ); let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let port = bind_in_range(ip, port_range).expect("Failed to bind").0; + let port = + find_available_port_in_range(ip, port_range).expect("Failed to find available port"); + let config = NodeConfig { gossip_addr: socketaddr!(Ipv4Addr::LOCALHOST, port), port_range, diff --git a/gossip/src/main.rs b/gossip/src/main.rs index 1f31195f431d36..3bab06ee811d80 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -236,11 +236,16 @@ fn process_spy(matches: &ArgMatches, socket_addr_space: SocketAddrSpace) -> std: let gossip_addr = SocketAddr::new( gossip_host, value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { - solana_net_utils::find_available_port_in_range( - IpAddr::V4(Ipv4Addr::UNSPECIFIED), - (0, 1), - ) - .expect("unable to find an available gossip port") + // find an available port. + let get_port = || -> Result<_, std::io::Error> { + let t = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))?; + let addr = t.local_addr()?; + Ok(addr.port()) + }; + get_port().unwrap_or_else(|err| { + eprintln!("Unable to find an available gossip port: {err}"); + exit(1); + }) }), ); let discover_timeout = Duration::from_secs(timeout.unwrap_or(u64::MAX)); diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index 07b30030295e52..9f51f75da0e51d 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -23,6 +23,7 @@ solana-entry = { workspace = true } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 3d8df638fbbb81..3ce2d1c386e82a 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -1019,5 +1019,6 @@ impl Cluster for LocalCluster { impl Drop for LocalCluster { fn drop(&mut self) { self.close(); + solana_net_utils::clear_used_ports(); } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 44032aeeb4d38b..3db7af74d5fbf5 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3339,6 +3339,8 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b } } + solana_net_utils::clear_used_ports(); + // Step 3: // Run validator C only to make it produce and vote on its own fork. info!("Restart validator C again!!!"); diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index 3486b30bbb9cda..590368f21a63c8 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } bincode = { workspace = true } clap = { version = "3.1.5", features = ["cargo"] } crossbeam-channel = { workspace = true } +lazy_static = { workspace = true } log = { workspace = true } nix = { workspace = true } rand = { workspace = true } diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 2d1b6249f3fcb1..fbdc8f632d887a 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -1,5 +1,7 @@ //! The `net_utils` module assists with networking #![allow(clippy::arithmetic_side_effects)] +#[macro_use] +extern crate lazy_static; use { crossbeam_channel::unbounded, log::*, @@ -9,7 +11,7 @@ use { collections::{BTreeMap, HashSet}, io::{self, Read, Write}, net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }, url::Url, @@ -384,14 +386,122 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } +lazy_static! { + static ref ALL_USED_PORTS: Mutex> = Mutex::new(Vec::new()); + static ref REUSED_PORTS: Mutex> = Mutex::new(Vec::new()); +} + +pub fn clear_used_ports() { + let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); + let mut reused_ports_lock = REUSED_PORTS.lock().unwrap(); + + all_used_ports_lock.clear(); + reused_ports_lock.clear(); +} + +fn sock_bind(sock: &Socket, addr: SocketAddr, config: SocketConfig) -> io::Result<()> { + let port = addr.port(); + assert_ne!(port, 0, "don't use sock_bind for any port. "); + let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); + let mut reused_ports_lock = REUSED_PORTS.lock().unwrap(); + + if config.reuseport { + // binding reuse port + if config.first_reuse { + // bind for a reuse_port for the first time + if all_used_ports_lock.contains(&port) { + // this port is already bounded. + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been bound before", port), + )) + } else if reused_ports_lock.contains(&port) { + // this port is already bounded before as reused ports. + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been bound before as resued_port", port), + )) + } else { + // this port is not yet bounded. + let r = sock.bind(&SockAddr::from(addr)); + if r.is_ok() { + all_used_ports_lock.push(port); + reused_ports_lock.push(port); + } + r + } + } else { + // binding to existing reuse port + if reused_ports_lock.contains(&port) { + sock.bind(&SockAddr::from(addr)) + } else { + // existing reuse port not found + Err(io::Error::new( + io::ErrorKind::Other, + format!("reuse_port {} has not been bound before", port), + )) + } + } + } else { + // binding non reuse port + if all_used_ports_lock.contains(&port) { + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been bound before", port), + )) + } else { + let r = sock.bind(&SockAddr::from(addr)); + + if r.is_ok() { + all_used_ports_lock.push(port); + } + r + } + } +} + +#[derive(Clone, Debug)] +pub struct SocketConfig { + pub reuseaddr: bool, + pub reuseport: bool, + pub first_reuse: bool, +} + +impl Default for SocketConfig { + #[allow(clippy::derivable_impls)] + fn default() -> Self { + Self { + reuseaddr: false, + reuseport: false, + first_reuse: false, + } + } +} + +#[cfg(any(windows, target_os = "ios"))] +fn udp_socket(_reuseaddr: bool, _first_reuse: bool) -> io::Result { + let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; + Ok(sock) +} + #[cfg(any(windows, target_os = "ios"))] -fn udp_socket(_reuseaddr: bool) -> io::Result { +fn udp_socket_with_config(_config: SocketConfig) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; Ok(sock) } #[cfg(not(any(windows, target_os = "ios")))] -fn udp_socket(reuseaddr: bool) -> io::Result { +fn udp_socket(reuseaddr: bool, first_reuse: bool) -> io::Result { + let config = SocketConfig { + reuseaddr, + reuseport: false, + first_reuse, + }; + udp_socket_with_config(config) +} + +#[cfg(not(any(windows, target_os = "ios")))] +fn udp_socket_with_config(config: SocketConfig) -> io::Result { use { nix::sys::socket::{ setsockopt, @@ -399,14 +509,22 @@ fn udp_socket(reuseaddr: bool) -> io::Result { }, std::os::unix::io::AsRawFd, }; + let SocketConfig { + reuseaddr, + mut reuseport, + first_reuse: _, + } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; let sock_fd = sock.as_raw_fd(); + // best effort, i.e. ignore setsockopt() errors, we'll get the failure in caller if reuseaddr { - // best effort, i.e. ignore errors here, we'll get the failure in caller - setsockopt(sock_fd, ReusePort, &true).ok(); setsockopt(sock_fd, ReuseAddr, &true).ok(); + reuseport = true; + } + if reuseport { + setsockopt(sock_fd, ReusePort, &true).ok(); } Ok(sock) @@ -430,12 +548,21 @@ pub fn bind_common_in_range( } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { - let sock = udp_socket(false)?; + let config = SocketConfig::default(); + bind_in_range_with_config(ip_addr, range, config) +} + +pub fn bind_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, +) -> io::Result<(u16, UdpSocket)> { + let sock = udp_socket_with_config(config.clone())?; for port in range.0..range.1 { let addr = SocketAddr::new(ip_addr, port); - - if sock.bind(&SockAddr::from(addr)).is_ok() { + let bind = sock_bind(&sock, addr, config.clone()); + if bind.is_ok() { let sock: UdpSocket = sock.into(); return Result::Ok((sock.local_addr().unwrap().port(), sock)); } @@ -448,7 +575,7 @@ pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpS } pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - let sock = udp_socket(false)?; + let sock = udp_socket(false, false)?; let addr = SocketAddr::new(ip_addr, 0); match sock.bind(&SockAddr::from(addr)) { Ok(_) => Result::Ok(sock.into()), @@ -459,7 +586,7 @@ pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { } } -// binds many sockets to the same port in a range +// binds many sockets to the same port in a range with port reuse pub fn multi_bind_in_range( ip_addr: IpAddr, range: PortRange, @@ -479,13 +606,24 @@ pub fn multi_bind_in_range( let mut port = 0; let mut error = None; for _ in 0..NUM_TRIES { - port = { - let (port, _) = bind_in_range(ip_addr, range)?; - port - }; // drop the probe, port should be available... briefly. + let mut config = SocketConfig { + reuseaddr: false, + reuseport: true, + first_reuse: true, + }; - for _ in 0..num { - let sock = bind_to(ip_addr, port, true); + // find first available reuse port to bind + let sock = bind_in_range_with_config(ip_addr, range, config.clone()); + if sock.is_err() { + continue; + } + let (found, sock) = sock.unwrap(); + port = found; + sockets.push(sock); + config.first_reuse = false; + // bind the rest sock on the reuse port + for _ in 1..num { + let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { sockets.push(sock); } else { @@ -506,11 +644,23 @@ pub fn multi_bind_in_range( } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result { - let sock = udp_socket(reuseaddr)?; + let sock = udp_socket(reuseaddr, false)?; + let addr = SocketAddr::new(ip_addr, port); + sock.bind(&SockAddr::from(addr)).map(|_| sock.into()) +} + +pub fn bind_to_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config.clone())?; let addr = SocketAddr::new(ip_addr, port); - sock.bind(&SockAddr::from(addr)).map(|_| sock.into()) + let bind = sock_bind(&sock, addr, config); + + bind.map(|_| sock.into()) } // binds both a UdpSocket and a TcpListener @@ -519,18 +669,44 @@ pub fn bind_common( port: u16, reuseaddr: bool, ) -> io::Result<(UdpSocket, TcpListener)> { - let sock = udp_socket(reuseaddr)?; + let config = SocketConfig { + reuseaddr, + reuseport: false, + first_reuse: false, + }; + bind_common_with_config(ip_addr, port, config) +} + +// binds both a UdpSocket and a TcpListener +pub fn bind_common_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result<(UdpSocket, TcpListener)> { + let sock = udp_socket_with_config(config.clone())?; let addr = SocketAddr::new(ip_addr, port); - let sock_addr = SockAddr::from(addr); - sock.bind(&sock_addr) - .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener))) + let bind = sock_bind(&sock, addr, config); + + bind.and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener))) } pub fn bind_two_in_range_with_offset( ip_addr: IpAddr, range: PortRange, offset: u16, +) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { + let sock1_config = SocketConfig::default(); + let sock2_config = SocketConfig::default(); + bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config) +} + +pub fn bind_two_in_range_with_offset_and_config( + ip_addr: IpAddr, + range: PortRange, + offset: u16, + sock1_config: SocketConfig, + sock2_config: SocketConfig, ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { if range.1.saturating_sub(range.0) < offset { return Err(io::Error::new( @@ -539,9 +715,11 @@ pub fn bind_two_in_range_with_offset( )); } for port in range.0..range.1 { - if let Ok(first_bind) = bind_to(ip_addr, port, false) { + if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config.clone()) { if range.1.saturating_sub(port) >= offset { - if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) { + if let Ok(second_bind) = + bind_to_with_config(ip_addr, port + offset, sock2_config.clone()) + { return Ok(( (first_bind.local_addr().unwrap().port(), first_bind), (second_bind.local_addr().unwrap().port(), second_bind), @@ -562,8 +740,20 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re let (start, end) = range; let mut tries_left = end - start; let mut rand_port = thread_rng().gen_range(start..end); + + let lookup = |ip_addr, port| -> io::Result<()> { + let config = SocketConfig { + reuseaddr: false, + reuseport: false, + first_reuse: false, + }; + let sock = udp_socket_with_config(config)?; + let addr = SocketAddr::new(ip_addr, port); + sock.bind(&SockAddr::from(addr)) + }; + loop { - match bind_common(ip_addr, rand_port, false) { + match lookup(ip_addr, rand_port) { Ok(_) => { break Ok(rand_port); } @@ -581,6 +771,19 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re } } +pub fn bind_more_with_config( + socket: UdpSocket, + num: usize, + config: SocketConfig, +) -> io::Result> { + let addr = socket.local_addr().unwrap(); + let ip = addr.ip(); + let port = addr.port(); + std::iter::once(Ok(socket)) + .chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone()))) + .collect() +} + #[cfg(test)] mod tests { use {super::*, std::net::Ipv4Addr}; @@ -684,8 +887,18 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let x = bind_to(ip_addr, 2002, true).unwrap(); - let y = bind_to(ip_addr, 2002, true).unwrap(); + let config = SocketConfig { + reuseaddr: true, + reuseport: true, + first_reuse: true, + }; + let config2 = SocketConfig { + reuseaddr: true, + reuseport: true, + first_reuse: false, + }; + let x = bind_to_with_config(ip_addr, 2002, config).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config2).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() @@ -740,6 +953,45 @@ mod tests { bind_common_in_range(ip_addr, (port, port + 1)).unwrap_err(); } + #[test] + fn test_port_binding() { + let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let addr = SocketAddr::new(ip_addr, 3527); + + let do_bind = |reuse1, reuse2, expect| { + let config = SocketConfig { + reuseaddr: false, + reuseport: reuse1, + first_reuse: false, + }; + let sock = udp_socket_with_config(config).unwrap(); + let b1 = sock.bind(&SockAddr::from(addr)); + + let config = SocketConfig { + reuseaddr: false, + reuseport: reuse2, + first_reuse: false, + }; + let sock = udp_socket_with_config(config).unwrap(); + let b2 = sock.bind(&SockAddr::from(addr)); + + assert_eq!((b1.is_ok(), b2.is_ok()), expect); + }; + + // Testing different config for binding two socks + // reuseport (false, false), expect first ok, second err. + do_bind(false, false, (true, false)); + + // reuseport (true, true), both should be ok. + do_bind(true, true, (true, true)); + + // reuseport (true, false), expect first ok, second err. + do_bind(true, false, (true, false)); + + // reuseport (false, true), expect first ok, second err. + do_bind(false, true, (true, false)); + } + #[test] fn test_get_public_ip_addr_none() { solana_logger::setup(); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f3279c33612893..78e0f1277c8d8f 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5314,6 +5314,7 @@ dependencies = [ "bincode", "clap 3.1.6", "crossbeam-channel", + "lazy_static", "log", "nix", "rand 0.8.5", @@ -6319,6 +6320,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.5", @@ -6334,6 +6336,7 @@ dependencies = [ "rustls", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-sdk", "solana-transaction-metrics-tracker", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 0237fc21d098dc..f7faf2eec9c10d 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -68,7 +68,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (s, exit, keypair) = server_args(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -209,7 +209,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (request_recv_socket, request_recv_exit, keypair) = server_args(); let SpawnServerResult { - endpoint: request_recv_endpoint, + endpoints: request_recv_endpoints, thread: request_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -228,7 +228,7 @@ mod tests { ) .unwrap(); - drop(request_recv_endpoint); + drop(request_recv_endpoints); // Response Receiver: let (response_recv_socket, response_recv_exit, keypair2) = server_args(); let (sender2, receiver2) = unbounded(); @@ -237,7 +237,7 @@ mod tests { let port = response_recv_socket.local_addr().unwrap().port(); let server_addr = SocketAddr::new(addr, port); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -268,6 +268,10 @@ mod tests { key: priv_key, }); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); + drop(response_recv_endpoints); let endpoint = QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint)); let request_sender = diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 55d0030e734607..46dc8b72b1f626 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } async-channel = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } @@ -28,6 +29,7 @@ rand = { workspace = true } rustls = { workspace = true, features = ["dangerous_configuration"] } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } solana-transaction-metrics-tracker = { workspace = true } @@ -37,6 +39,8 @@ x509-parser = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +serial_test = { workspace = true } +socket2 = { workspace = true } solana-logger = { workspace = true } [lib] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index feb9bd2db65a3e..ef4a566a12c66b 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -12,9 +12,10 @@ use { }, bytes::Bytes, crossbeam_channel::Sender, + futures::{stream::FuturesUnordered, Future, StreamExt as _}, indexmap::map::{Entry, IndexMap}, percentage::Percentage, - quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, + quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, solana_measure::measure::Measure, @@ -35,11 +36,13 @@ use { std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, + pin::Pin, // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, + task::Poll, time::{Duration, Instant}, }, tokio::{ @@ -51,6 +54,7 @@ use { // but if we do, the scope of the RwLock must always be a subset of the async Mutex // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to // introduce any other awaits while holding the RwLock. + select, sync::{Mutex, MutexGuard}, task::JoinHandle, time::timeout, @@ -125,20 +129,55 @@ pub fn spawn_server( wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { - info!("Start {name} quic server on {sock:?}"); + spawn_server_multi( + name, + vec![sock], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + wait_for_chunk_timeout, + coalesce, + ) + .map(|(mut endpoints, stats, handle)| (endpoints.remove(0), stats, handle)) +} + +#[allow(clippy::too_many_arguments, clippy::type_complexity)] +pub fn spawn_server_multi( + name: &'static str, + sockets: Vec, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result<(Vec, Arc, JoinHandle<()>), QuicServerError> { + info!("Start {name} quic server on {sockets:?}"); let (config, _cert) = configure_server(keypair)?; - let endpoint = Endpoint::new( - EndpointConfig::default(), - Some(config), - sock, - Arc::new(TokioRuntime), - ) - .map_err(QuicServerError::EndpointFailed)?; + let endpoints = sockets + .into_iter() + .map(|sock| { + Endpoint::new( + EndpointConfig::default(), + Some(config.clone()), + sock, + Arc::new(TokioRuntime), + ) + .map_err(QuicServerError::EndpointFailed) + }) + .collect::, _>>()?; let stats = Arc::::default(); let handle = tokio::spawn(run_server( name, - endpoint.clone(), + endpoints.clone(), packet_sender, exit, max_connections_per_peer, @@ -149,13 +188,13 @@ pub fn spawn_server( wait_for_chunk_timeout, coalesce, )); - Ok((endpoint, stats, handle)) + Ok((endpoints, stats, handle)) } #[allow(clippy::too_many_arguments)] async fn run_server( name: &'static str, - incoming: Endpoint, + incoming: Vec, packet_sender: Sender, exit: Arc, max_connections_per_peer: usize, @@ -185,8 +224,37 @@ async fn run_server( stats.clone(), coalesce, )); + + let mut accepts = incoming + .iter() + .enumerate() + .map(|(i, incoming)| { + Box::pin(EndpointAccept { + accept: incoming.accept(), + endpoint: i, + }) + }) + .collect::>(); while !exit.load(Ordering::Relaxed) { - let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; + let timeout_connection = select! { + ready = accepts.next() => { + if let Some((connecting, i)) = ready { + accepts.push( + Box::pin(EndpointAccept { + accept: incoming[i].accept(), + endpoint: i, + } + )); + Ok(connecting) + } else { + // we can't really get here - we never poll an empty FuturesUnordered + continue + } + } + _ = tokio::time::sleep(WAIT_FOR_CONNECTION_TIMEOUT) => { + Err(()) + } + }; if last_datapoint.elapsed().as_secs() >= 5 { stats.report(name); @@ -1196,6 +1264,25 @@ impl ConnectionTable { } } +struct EndpointAccept<'a> { + endpoint: usize, + accept: Accept<'a>, +} + +impl<'a> Future for EndpointAccept<'a> { + type Output = (Option, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { + let i = self.endpoint; + // Safety: + // self is pinned and accept is a field so it can't get moved out. See safety docs of + // map_unchecked_mut. + unsafe { self.map_unchecked_mut(|this| &mut this.accept) } + .poll(cx) + .map(|r| (r, i)) + } +} + #[cfg(test)] pub mod test { use { @@ -1209,6 +1296,10 @@ pub mod test { async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, TransportConfig}, + serial_test::serial, + solana_net_utils::{ + bind_more_with_config, bind_to_with_config, bind_with_any_port, SocketConfig, + }, solana_sdk::{ net::DEFAULT_TPU_COALESCE, quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, @@ -1264,6 +1355,26 @@ pub mod test { config } + fn make_quic_sockets() -> Vec { + const NUM_QUIC_SOCKETS: usize = 10; + + // find an available port. + let port = { + let t = bind_with_any_port("127.0.0.1".parse().unwrap()).unwrap(); + t.local_addr().unwrap().port() + }; + + let mut config = SocketConfig { + reuseaddr: true, + reuseport: true, + first_reuse: true, + }; + let socket = + bind_to_with_config("127.0.0.1".parse().unwrap(), port, config.clone()).unwrap(); + config.first_reuse = false; + bind_more_with_config(socket, NUM_QUIC_SOCKETS - 1, config).unwrap() + } + fn setup_quic_server( option_staked_nodes: Option, max_connections_per_peer: usize, @@ -1274,15 +1385,15 @@ pub mod test { SocketAddr, Arc, ) { - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let sockets = make_quic_sockets(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); - let server_address = s.local_addr().unwrap(); + let server_address = sockets[0].local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default())); - let (_, stats, t) = spawn_server( + let (_, stats, t) = spawn_server_multi( "quic_streamer_test", - s, + sockets, &keypair, sender, exit.clone(), @@ -1465,13 +1576,16 @@ pub mod test { } } + #[serial] #[tokio::test] async fn test_quic_server_exit() { let (t, exit, _receiver, _server_address, _stats) = setup_quic_server(None, 1); exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_timeout() { solana_logger::setup(); @@ -1479,8 +1593,10 @@ pub mod test { check_timeout(receiver, server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_packet_batcher() { solana_logger::setup(); @@ -1528,8 +1644,10 @@ pub mod test { assert_eq!(i, num_packets); exit.store(true, Ordering::Relaxed); handle.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_stream_timeout() { solana_logger::setup(); @@ -1558,8 +1676,10 @@ pub mod test { exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_block_multiple_connections() { solana_logger::setup(); @@ -1567,8 +1687,10 @@ pub mod test { check_block_multiple_connections(server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_multiple_connections_on_single_client_endpoint() { solana_logger::setup(); @@ -1626,8 +1748,10 @@ pub mod test { exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_multiple_writes() { solana_logger::setup(); @@ -1635,8 +1759,10 @@ pub mod test { check_multiple_writes(receiver, server_address, None).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_staked_connection_removal() { solana_logger::setup(); @@ -1660,8 +1786,10 @@ pub mod test { ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_zero_staked_connection_removal() { // In this test, the client has a pubkey, but is not in stake table. @@ -1686,8 +1814,10 @@ pub mod test { ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_unstaked_connection_removal() { solana_logger::setup(); @@ -1704,8 +1834,10 @@ pub mod test { ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); @@ -1733,8 +1865,10 @@ pub mod test { check_unstaked_node_connect_failure(server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[tokio::test] async fn test_quic_server_multiple_streams() { solana_logger::setup(); @@ -1768,6 +1902,7 @@ pub mod test { t.await.unwrap(); assert_eq!(stats.total_connections.load(Ordering::Relaxed), 0); assert_eq!(stats.total_new_connections.load(Ordering::Relaxed), 2); + solana_net_utils::clear_used_ports(); } #[test] diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 3b1b6b21adf468..1aacd551722867 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -37,7 +37,7 @@ impl SkipClientVerification { } pub struct SpawnServerResult { - pub endpoint: Endpoint, + pub endpoints: Vec, pub thread: thread::JoinHandle<()>, pub key_updater: Arc, } @@ -117,13 +117,15 @@ pub enum QuicServerError { } pub struct EndpointKeyUpdater { - endpoint: Endpoint, + endpoints: Vec, } impl NotifyKeyUpdate for EndpointKeyUpdater { fn update_key(&self, key: &Keypair) -> Result<(), Box> { let (config, _) = configure_server(key)?; - self.endpoint.set_server_config(Some(config)); + for endpoint in &self.endpoints { + endpoint.set_server_config(Some(config.clone())); + } Ok(()) } } @@ -474,7 +476,38 @@ impl StreamStats { pub fn spawn_server( thread_name: &'static str, metrics_name: &'static str, - sock: UdpSocket, + socket: UdpSocket, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result { + spawn_server_multi( + thread_name, + metrics_name, + vec![socket], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + wait_for_chunk_timeout, + coalesce, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn spawn_server_multi( + thread_name: &'static str, + metrics_name: &'static str, + sockets: Vec, keypair: &Keypair, packet_sender: Sender, exit: Arc, @@ -486,11 +519,11 @@ pub fn spawn_server( coalesce: Duration, ) -> Result { let runtime = rt(format!("{thread_name}Rt")); - let (endpoint, _stats, task) = { + let (endpoints, _stats, task) = { let _guard = runtime.enter(); - crate::nonblocking::quic::spawn_server( + crate::nonblocking::quic::spawn_server_multi( metrics_name, - sock, + sockets, keypair, packet_sender, exit, @@ -511,10 +544,10 @@ pub fn spawn_server( }) .unwrap(); let updater = EndpointKeyUpdater { - endpoint: endpoint.clone(), + endpoints: endpoints.clone(), }; Ok(SpawnServerResult { - endpoint, + endpoints, thread: handle, key_updater: Arc::new(updater), }) @@ -526,6 +559,7 @@ mod test { super::*, crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, crossbeam_channel::unbounded, + serial_test::serial, solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, }; @@ -543,7 +577,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -564,13 +598,16 @@ mod test { (t, exit, receiver, server_address) } + #[serial] #[test] fn test_quic_server_exit() { let (t, exit, _receiver, _server_address) = setup_quic_server(); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[test] fn test_quic_timeout() { solana_logger::setup(); @@ -579,8 +616,10 @@ mod test { runtime.block_on(check_timeout(receiver, server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[test] fn test_quic_server_block_multiple_connections() { solana_logger::setup(); @@ -590,8 +629,10 @@ mod test { runtime.block_on(check_block_multiple_connections(server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[test] fn test_quic_server_multiple_streams() { solana_logger::setup(); @@ -602,7 +643,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -625,8 +666,10 @@ mod test { runtime.block_on(check_multiple_streams(receiver, server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[test] fn test_quic_server_multiple_writes() { solana_logger::setup(); @@ -636,8 +679,10 @@ mod test { runtime.block_on(check_multiple_writes(receiver, server_address, None)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } + #[serial] #[test] fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); @@ -648,7 +693,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -671,5 +716,6 @@ mod test { runtime.block_on(check_unstaked_node_connect_failure(server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } } diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 12bbd0b21001c9..f736dfe778f881 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -86,11 +86,11 @@ fn verify_reachable_ports( } if verify_address(&node.info.tpu(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu.iter()); - udp_sockets.push(&node.sockets.tpu_quic); + udp_sockets.extend(&node.sockets.tpu_quic); } if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); - udp_sockets.push(&node.sockets.tpu_forwards_quic); + udp_sockets.extend(&node.sockets.tpu_forwards_quic); } if verify_address(&node.info.tpu_vote().ok()) { udp_sockets.extend(node.sockets.tpu_vote.iter()); diff --git a/validator/src/main.rs b/validator/src/main.rs index c8494221d614b0..a1d31f900f59f5 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1821,12 +1821,16 @@ pub fn main() { let gossip_addr = SocketAddr::new( gossip_host, value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { - solana_net_utils::find_available_port_in_range(bind_address, (0, 1)).unwrap_or_else( - |err| { - eprintln!("Unable to find an available gossip port: {err}"); - exit(1); - }, - ) + // find an available port. + let get_port = || -> Result<_, std::io::Error> { + let t = solana_net_utils::bind_with_any_port(bind_address)?; + let addr = t.local_addr()?; + Ok(addr.port()) + }; + get_port().unwrap_or_else(|err| { + eprintln!("Unable to find an available gossip port: {err}"); + exit(1); + }) }), );