From 326ab03472a115d052ba67ea8ca803ac2871cce7 Mon Sep 17 00:00:00 2001 From: Trent Nelson Date: Fri, 5 Apr 2024 02:36:44 -0600 Subject: [PATCH 01/17] net-utils: support SO_REUSEPORT --- net-utils/src/lib.rs | 127 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 117 insertions(+), 10 deletions(-) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 2d1b6249f3fcb1..8ea956b18f6506 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -384,14 +384,45 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } +#[derive(Clone, Debug)] +pub struct SocketConfig { + pub reuseaddr: bool, + pub reuseport: bool, +} + +impl Default for SocketConfig { + #[allow(clippy::derivable_impls)] + fn default() -> Self { + Self { + reuseaddr: false, + reuseport: false, + } + } +} + #[cfg(any(windows, target_os = "ios"))] fn udp_socket(_reuseaddr: bool) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; Ok(sock) } +#[cfg(any(windows, target_os = "ios"))] +fn udp_socket_with_config(config: SocketConfig) -> io::Result { + let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; + Ok(sock) +} + #[cfg(not(any(windows, target_os = "ios")))] fn udp_socket(reuseaddr: bool) -> io::Result { + let config = SocketConfig { + reuseaddr, + reuseport: false, + }; + udp_socket_with_config(config) +} + +#[cfg(not(any(windows, target_os = "ios")))] +fn udp_socket_with_config(config: SocketConfig) -> io::Result { use { nix::sys::socket::{ setsockopt, @@ -399,14 +430,21 @@ fn udp_socket(reuseaddr: bool) -> io::Result { }, std::os::unix::io::AsRawFd, }; + let SocketConfig { + reuseaddr, + mut reuseport, + } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; let sock_fd = sock.as_raw_fd(); + // best effort, i.e. ignore setsockopt() errors, we'll get the failure in caller if reuseaddr { - // best effort, i.e. ignore errors here, we'll get the failure in caller - setsockopt(sock_fd, ReusePort, &true).ok(); setsockopt(sock_fd, ReuseAddr, &true).ok(); + reuseport = true; + } + if reuseport { + setsockopt(sock_fd, ReusePort, &true).ok(); } Ok(sock) @@ -430,7 +468,16 @@ pub fn bind_common_in_range( } pub fn bind_in_range(ip_addr: IpAddr, range: PortRange) -> io::Result<(u16, UdpSocket)> { - let sock = udp_socket(false)?; + let config = SocketConfig::default(); + bind_in_range_with_config(ip_addr, range, config) +} + +pub fn bind_in_range_with_config( + ip_addr: IpAddr, + range: PortRange, + config: SocketConfig, +) -> io::Result<(u16, UdpSocket)> { + let sock = udp_socket_with_config(config)?; for port in range.0..range.1 { let addr = SocketAddr::new(ip_addr, port); @@ -484,8 +531,12 @@ pub fn multi_bind_in_range( port }; // drop the probe, port should be available... briefly. + let config = SocketConfig { + reuseaddr: true, + reuseport: true, + }; for _ in 0..num { - let sock = bind_to(ip_addr, port, true); + let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { sockets.push(sock); } else { @@ -506,7 +557,19 @@ pub fn multi_bind_in_range( } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result { - let sock = udp_socket(reuseaddr)?; + let config = SocketConfig { + reuseaddr, + reuseport: false, + }; + bind_to_with_config(ip_addr, port, config) +} + +pub fn bind_to_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result { + let sock = udp_socket_with_config(config)?; let addr = SocketAddr::new(ip_addr, port); @@ -519,7 +582,20 @@ pub fn bind_common( port: u16, reuseaddr: bool, ) -> io::Result<(UdpSocket, TcpListener)> { - let sock = udp_socket(reuseaddr)?; + let config = SocketConfig { + reuseaddr, + reuseport: false, + }; + bind_common_with_config(ip_addr, port, config) +} + +// binds both a UdpSocket and a TcpListener +pub fn bind_common_with_config( + ip_addr: IpAddr, + port: u16, + config: SocketConfig, +) -> io::Result<(UdpSocket, TcpListener)> { + let sock = udp_socket_with_config(config)?; let addr = SocketAddr::new(ip_addr, port); let sock_addr = SockAddr::from(addr); @@ -531,6 +607,18 @@ pub fn bind_two_in_range_with_offset( ip_addr: IpAddr, range: PortRange, offset: u16, +) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { + let sock1_config = SocketConfig::default(); + let sock2_config = SocketConfig::default(); + bind_two_in_range_with_offset_and_config(ip_addr, range, offset, sock1_config, sock2_config) +} + +pub fn bind_two_in_range_with_offset_and_config( + ip_addr: IpAddr, + range: PortRange, + offset: u16, + sock1_config: SocketConfig, + sock2_config: SocketConfig, ) -> io::Result<((u16, UdpSocket), (u16, UdpSocket))> { if range.1.saturating_sub(range.0) < offset { return Err(io::Error::new( @@ -539,9 +627,11 @@ pub fn bind_two_in_range_with_offset( )); } for port in range.0..range.1 { - if let Ok(first_bind) = bind_to(ip_addr, port, false) { + if let Ok(first_bind) = bind_to_with_config(ip_addr, port, sock1_config.clone()) { if range.1.saturating_sub(port) >= offset { - if let Ok(second_bind) = bind_to(ip_addr, port + offset, false) { + if let Ok(second_bind) = + bind_to_with_config(ip_addr, port + offset, sock2_config.clone()) + { return Ok(( (first_bind.local_addr().unwrap().port(), first_bind), (second_bind.local_addr().unwrap().port(), second_bind), @@ -581,6 +671,19 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re } } +pub fn bind_more_with_config( + socket: UdpSocket, + num: usize, + config: SocketConfig, +) -> io::Result> { + let addr = socket.local_addr().unwrap(); + let ip = addr.ip(); + let port = addr.port(); + std::iter::once(Ok(socket)) + .chain((1..num).map(|_| bind_to_with_config(ip, port, config.clone()))) + .collect() +} + #[cfg(test)] mod tests { use {super::*, std::net::Ipv4Addr}; @@ -684,8 +787,12 @@ mod tests { let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); assert_eq!(bind_in_range(ip_addr, (2000, 2001)).unwrap().0, 2000); let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); - let x = bind_to(ip_addr, 2002, true).unwrap(); - let y = bind_to(ip_addr, 2002, true).unwrap(); + let config = SocketConfig { + reuseaddr: true, + reuseport: true, + }; + let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() From d25934a044b268e66e3b6c24db25e5589a3eae21 Mon Sep 17 00:00:00 2001 From: Alessandro Decina Date: Tue, 2 Apr 2024 11:45:28 +0000 Subject: [PATCH 02/17] tpu: use multiple quic endpoints --- Cargo.lock | 2 + client/src/connection_cache.rs | 5 +- core/src/tpu.rs | 16 ++-- gossip/src/cluster_info.rs | 103 +++++++++++++++++++--- programs/sbf/Cargo.lock | 1 + quic-client/tests/quic_client.rs | 12 ++- streamer/Cargo.toml | 2 + streamer/src/nonblocking/quic.rs | 144 +++++++++++++++++++++++++++---- streamer/src/quic.rs | 57 +++++++++--- validator/src/bootstrap.rs | 4 +- 10 files changed, 290 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1ba69acd0787f5..22cdcb099eac05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7273,6 +7273,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.5", @@ -7286,6 +7287,7 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "socket2 0.5.6", "solana-logger", "solana-measure", "solana-metrics", diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index a94bc7cd3d8ca8..e9cec25bb2ff9c 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -256,7 +256,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -275,6 +275,9 @@ mod tests { ) .unwrap(); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); let connection_cache = ConnectionCache::new_with_client_options( "connection_cache_test", 1, // connection_pool_size diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 640caf64544d45..8c114b0f62a7f9 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -34,7 +34,9 @@ use { solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, - quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS}, + quic::{ + spawn_server_multi, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, + }, streamer::StakedNodes, }, solana_turbine::broadcast_stage::{BroadcastStage, BroadcastStageType}, @@ -57,8 +59,8 @@ pub struct TpuSockets { pub transaction_forwards: Vec, pub vote: Vec, pub broadcast: Vec, - pub transactions_quic: UdpSocket, - pub transactions_forwards_quic: UdpSocket, + pub transactions_quic: Vec, + pub transactions_forwards_quic: Vec, } pub struct Tpu { @@ -149,10 +151,10 @@ impl Tpu { let (non_vote_sender, non_vote_receiver) = banking_tracer.create_channel_non_vote(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_quic_t, key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpu", "quic_streamer_tpu", transactions_quic_sockets, @@ -169,10 +171,10 @@ impl Tpu { .unwrap(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: tpu_forwards_quic_t, key_updater: forwards_key_updater, - } = spawn_server( + } = spawn_server_multi( "solQuicTpuFwd", "quic_streamer_tpu_forwards", transactions_forwards_quic_sockets, diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 7d737d313eeae7..79c148e454b433 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -54,8 +54,9 @@ use { solana_ledger::shred::Shred, solana_measure::measure::Measure, solana_net_utils::{ - bind_common, bind_common_in_range, bind_in_range, bind_two_in_range_with_offset, - find_available_port_in_range, multi_bind_in_range, PortRange, + bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, + bind_more_with_config, bind_two_in_range_with_offset_and_config, + find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig, }, solana_perf::{ data_budget::DataBudget, @@ -2782,8 +2783,8 @@ pub struct Sockets { pub serve_repair: UdpSocket, pub serve_repair_quic: UdpSocket, pub ancestor_hashes_requests: UdpSocket, - pub tpu_quic: UdpSocket, - pub tpu_forwards_quic: UdpSocket, + pub tpu_quic: Vec, + pub tpu_forwards_quic: Vec, } pub struct NodeConfig { @@ -2794,6 +2795,8 @@ pub struct NodeConfig { pub public_tpu_forwards_addr: Option, } +const QUIC_ENDPOINTS: usize = 10; + #[derive(Debug)] pub struct Node { pub info: ContactInfo, @@ -2811,15 +2814,45 @@ impl Node { let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); let port_range = (1024, 65535); + let udp_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(localhost_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + localhost_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -2906,7 +2939,19 @@ impl Node { } } fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) { - bind_in_range(bind_ip_addr, port_range).expect("Failed to bind") + let config = SocketConfig { + reuseaddr: false, + reuseport: false, + }; + Self::bind_with_config(bind_ip_addr, port_range, config) + } + + fn bind_with_config( + bind_ip_addr: IpAddr, + port_range: PortRange, + config: SocketConfig, + ) -> (u16, UdpSocket) { + bind_in_range_with_config(bind_ip_addr, port_range, config).expect("Failed to bind") } pub fn new_single_bind( @@ -2919,10 +2964,36 @@ impl Node { Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr); let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range); let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range); + let udp_config = SocketConfig { + reuseaddr: false, + reuseport: false, + }; + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config.clone(), + quic_config.clone(), + ) + .unwrap(); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = - bind_two_in_range_with_offset(bind_ip_addr, port_range, QUIC_PORT_OFFSET).unwrap(); + bind_two_in_range_with_offset_and_config( + bind_ip_addr, + port_range, + QUIC_PORT_OFFSET, + udp_config, + quic_config.clone(), + ) + .unwrap(); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); @@ -3004,21 +3075,31 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); - let (_tpu_port_quic, tpu_quic) = Self::bind( + let quic_config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), + quic_config.clone(), ); + let tpu_quic = + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); - let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind( + let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( bind_ip_addr, ( tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), + quic_config.clone(), ); + let tpu_forwards_quic = + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index f3279c33612893..89c788736e9499 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6319,6 +6319,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "futures 0.3.30", "futures-util", "histogram", "indexmap 2.2.5", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 0237fc21d098dc..f7faf2eec9c10d 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -68,7 +68,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (s, exit, keypair) = server_args(); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -209,7 +209,7 @@ mod tests { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let (request_recv_socket, request_recv_exit, keypair) = server_args(); let SpawnServerResult { - endpoint: request_recv_endpoint, + endpoints: request_recv_endpoints, thread: request_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -228,7 +228,7 @@ mod tests { ) .unwrap(); - drop(request_recv_endpoint); + drop(request_recv_endpoints); // Response Receiver: let (response_recv_socket, response_recv_exit, keypair2) = server_args(); let (sender2, receiver2) = unbounded(); @@ -237,7 +237,7 @@ mod tests { let port = response_recv_socket.local_addr().unwrap().port(); let server_addr = SocketAddr::new(addr, port); let SpawnServerResult { - endpoint: response_recv_endpoint, + endpoints: mut response_recv_endpoints, thread: response_recv_thread, key_updater: _, } = solana_streamer::quic::spawn_server( @@ -268,6 +268,10 @@ mod tests { key: priv_key, }); + let response_recv_endpoint = response_recv_endpoints + .pop() + .expect("at least one endpoint"); + drop(response_recv_endpoints); let endpoint = QuicLazyInitializedEndpoint::new(client_certificate, Some(response_recv_endpoint)); let request_sender = diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 55d0030e734607..aa3f04c3996a06 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } async-channel = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } +futures = { workspace = true } futures-util = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } @@ -37,6 +38,7 @@ x509-parser = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +socket2 = { workspace = true } solana-logger = { workspace = true } [lib] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index feb9bd2db65a3e..39997fa3aab555 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -12,9 +12,10 @@ use { }, bytes::Bytes, crossbeam_channel::Sender, + futures::{stream::FuturesUnordered, Future, StreamExt as _}, indexmap::map::{Entry, IndexMap}, percentage::Percentage, - quinn::{Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, + quinn::{Accept, Connecting, Connection, Endpoint, EndpointConfig, TokioRuntime, VarInt}, quinn_proto::VarIntBoundsExceeded, rand::{thread_rng, Rng}, solana_measure::measure::Measure, @@ -35,11 +36,13 @@ use { std::{ iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, + pin::Pin, // CAUTION: be careful not to introduce any awaits while holding an RwLock. sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, Arc, RwLock, }, + task::Poll, time::{Duration, Instant}, }, tokio::{ @@ -51,6 +54,7 @@ use { // but if we do, the scope of the RwLock must always be a subset of the async Mutex // (i.e. lock order is always async Mutex -> RwLock). Also, be careful not to // introduce any other awaits while holding the RwLock. + select, sync::{Mutex, MutexGuard}, task::JoinHandle, time::timeout, @@ -125,20 +129,55 @@ pub fn spawn_server( wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result<(Endpoint, Arc, JoinHandle<()>), QuicServerError> { - info!("Start {name} quic server on {sock:?}"); + spawn_server_multi( + name, + vec![sock], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + wait_for_chunk_timeout, + coalesce, + ) + .map(|(mut endpoints, stats, handle)| (endpoints.remove(0), stats, handle)) +} + +#[allow(clippy::too_many_arguments, clippy::type_complexity)] +pub fn spawn_server_multi( + name: &'static str, + sockets: Vec, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result<(Vec, Arc, JoinHandle<()>), QuicServerError> { + info!("Start {name} quic server on {sockets:?}"); let (config, _cert) = configure_server(keypair)?; - let endpoint = Endpoint::new( - EndpointConfig::default(), - Some(config), - sock, - Arc::new(TokioRuntime), - ) - .map_err(QuicServerError::EndpointFailed)?; + let endpoints = sockets + .into_iter() + .map(|sock| { + Endpoint::new( + EndpointConfig::default(), + Some(config.clone()), + sock, + Arc::new(TokioRuntime), + ) + .map_err(QuicServerError::EndpointFailed) + }) + .collect::, _>>()?; let stats = Arc::::default(); let handle = tokio::spawn(run_server( name, - endpoint.clone(), + endpoints.clone(), packet_sender, exit, max_connections_per_peer, @@ -149,13 +188,13 @@ pub fn spawn_server( wait_for_chunk_timeout, coalesce, )); - Ok((endpoint, stats, handle)) + Ok((endpoints, stats, handle)) } #[allow(clippy::too_many_arguments)] async fn run_server( name: &'static str, - incoming: Endpoint, + incoming: Vec, packet_sender: Sender, exit: Arc, max_connections_per_peer: usize, @@ -185,8 +224,37 @@ async fn run_server( stats.clone(), coalesce, )); + + let mut accepts = incoming + .iter() + .enumerate() + .map(|(i, incoming)| { + Box::pin(EndpointAccept { + accept: incoming.accept(), + endpoint: i, + }) + }) + .collect::>(); while !exit.load(Ordering::Relaxed) { - let timeout_connection = timeout(WAIT_FOR_CONNECTION_TIMEOUT, incoming.accept()).await; + let timeout_connection = select! { + ready = accepts.next() => { + if let Some((connecting, i)) = ready { + accepts.push( + Box::pin(EndpointAccept { + accept: incoming[i].accept(), + endpoint: i, + } + )); + Ok(connecting) + } else { + // we can't really get here - we never poll an empty FuturesUnordered + continue + } + } + _ = tokio::time::sleep(WAIT_FOR_CONNECTION_TIMEOUT) => { + Err(()) + } + }; if last_datapoint.elapsed().as_secs() >= 5 { stats.report(name); @@ -1196,6 +1264,25 @@ impl ConnectionTable { } } +struct EndpointAccept<'a> { + endpoint: usize, + accept: Accept<'a>, +} + +impl<'a> Future for EndpointAccept<'a> { + type Output = (Option, usize); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { + let i = self.endpoint; + // Safety: + // self is pinned and accept is a field so it can't get moved out. See safety docs of + // map_unchecked_mut. + unsafe { self.map_unchecked_mut(|this| &mut this.accept) } + .poll(cx) + .map(|r| (r, i)) + } +} + #[cfg(test)] pub mod test { use { @@ -1209,13 +1296,18 @@ pub mod test { async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, TransportConfig}, + socket2, solana_sdk::{ net::DEFAULT_TPU_COALESCE, quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, signature::Keypair, signer::Signer, }, - std::collections::HashMap, + std::{ + collections::HashMap, + os::fd::{FromRawFd, IntoRawFd}, + str::FromStr as _, + }, tokio::time::sleep, }; @@ -1274,15 +1366,29 @@ pub mod test { SocketAddr, Arc, ) { - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let sockets = (0..10) + .map(|_| { + let sock = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + ) + .unwrap(); + sock.set_reuse_port(true).unwrap(); + sock.bind(&SocketAddr::from_str("127.0.0.1:42069").unwrap().into()) + .unwrap(); + unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) } + }) + .collect::>(); + let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); - let server_address = s.local_addr().unwrap(); + let server_address = sockets[0].local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default())); - let (_, stats, t) = spawn_server( - "quic_streamer_test", - s, + let (_, stats, t) = spawn_server_multi( + "one-million-sol", + sockets, &keypair, sender, exit.clone(), diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 3b1b6b21adf468..483f045b727199 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -37,7 +37,7 @@ impl SkipClientVerification { } pub struct SpawnServerResult { - pub endpoint: Endpoint, + pub endpoints: Vec, pub thread: thread::JoinHandle<()>, pub key_updater: Arc, } @@ -117,13 +117,15 @@ pub enum QuicServerError { } pub struct EndpointKeyUpdater { - endpoint: Endpoint, + endpoints: Vec, } impl NotifyKeyUpdate for EndpointKeyUpdater { fn update_key(&self, key: &Keypair) -> Result<(), Box> { let (config, _) = configure_server(key)?; - self.endpoint.set_server_config(Some(config)); + for endpoint in &self.endpoints { + endpoint.set_server_config(Some(config.clone())); + } Ok(()) } } @@ -474,7 +476,38 @@ impl StreamStats { pub fn spawn_server( thread_name: &'static str, metrics_name: &'static str, - sock: UdpSocket, + socket: UdpSocket, + keypair: &Keypair, + packet_sender: Sender, + exit: Arc, + max_connections_per_peer: usize, + staked_nodes: Arc>, + max_staked_connections: usize, + max_unstaked_connections: usize, + wait_for_chunk_timeout: Duration, + coalesce: Duration, +) -> Result { + spawn_server_multi( + thread_name, + metrics_name, + vec![socket], + keypair, + packet_sender, + exit, + max_connections_per_peer, + staked_nodes, + max_staked_connections, + max_unstaked_connections, + wait_for_chunk_timeout, + coalesce, + ) +} + +#[allow(clippy::too_many_arguments)] +pub fn spawn_server_multi( + thread_name: &'static str, + metrics_name: &'static str, + sockets: Vec, keypair: &Keypair, packet_sender: Sender, exit: Arc, @@ -486,11 +519,11 @@ pub fn spawn_server( coalesce: Duration, ) -> Result { let runtime = rt(format!("{thread_name}Rt")); - let (endpoint, _stats, task) = { + let (endpoints, _stats, task) = { let _guard = runtime.enter(); - crate::nonblocking::quic::spawn_server( + crate::nonblocking::quic::spawn_server_multi( metrics_name, - sock, + sockets, keypair, packet_sender, exit, @@ -511,10 +544,10 @@ pub fn spawn_server( }) .unwrap(); let updater = EndpointKeyUpdater { - endpoint: endpoint.clone(), + endpoints: endpoints.clone(), }; Ok(SpawnServerResult { - endpoint, + endpoints, thread: handle, key_updater: Arc::new(updater), }) @@ -543,7 +576,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -602,7 +635,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( @@ -648,7 +681,7 @@ mod test { let server_address = s.local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); let SpawnServerResult { - endpoint: _, + endpoints: _, thread: t, key_updater: _, } = spawn_server( diff --git a/validator/src/bootstrap.rs b/validator/src/bootstrap.rs index 12bbd0b21001c9..f736dfe778f881 100644 --- a/validator/src/bootstrap.rs +++ b/validator/src/bootstrap.rs @@ -86,11 +86,11 @@ fn verify_reachable_ports( } if verify_address(&node.info.tpu(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu.iter()); - udp_sockets.push(&node.sockets.tpu_quic); + udp_sockets.extend(&node.sockets.tpu_quic); } if verify_address(&node.info.tpu_forwards(Protocol::UDP).ok()) { udp_sockets.extend(node.sockets.tpu_forwards.iter()); - udp_sockets.push(&node.sockets.tpu_forwards_quic); + udp_sockets.extend(&node.sockets.tpu_forwards_quic); } if verify_address(&node.info.tpu_vote().ok()) { udp_sockets.extend(node.sockets.tpu_vote.iter()); From b49a8ea24646189ee5c97118521b46185aade348 Mon Sep 17 00:00:00 2001 From: Stephen Akridge Date: Fri, 5 Apr 2024 19:16:26 +0200 Subject: [PATCH 03/17] Fix clippy and PR comments --- Cargo.lock | 1 + gossip/src/cluster_info.rs | 6 +----- net-utils/src/lib.rs | 2 +- programs/sbf/Cargo.lock | 1 + streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 36 +++++++++++++------------------- 6 files changed, 19 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 22cdcb099eac05..78ce399d156041 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7291,6 +7291,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-sdk", "solana-transaction-metrics-tracker", diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 79c148e454b433..5fdd3709e74575 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2831,10 +2831,6 @@ impl Node { quic_config.clone(), ) .unwrap(); - let quic_config = SocketConfig { - reuseaddr: false, - reuseport: true, - }; let tpu_quic = bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = @@ -3099,7 +3095,7 @@ impl Node { quic_config.clone(), ); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 8ea956b18f6506..b4d47a3c416803 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -407,7 +407,7 @@ fn udp_socket(_reuseaddr: bool) -> io::Result { } #[cfg(any(windows, target_os = "ios"))] -fn udp_socket_with_config(config: SocketConfig) -> io::Result { +fn udp_socket_with_config(_config: SocketConfig) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; Ok(sock) } diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 89c788736e9499..286fc9f9ace536 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6335,6 +6335,7 @@ dependencies = [ "rustls", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-sdk", "solana-transaction-metrics-tracker", diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index aa3f04c3996a06..b884e811c231ec 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -29,6 +29,7 @@ rand = { workspace = true } rustls = { workspace = true, features = ["dangerous_configuration"] } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-sdk = { workspace = true } solana-transaction-metrics-tracker = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 39997fa3aab555..29a0855e39fa52 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1296,18 +1296,14 @@ pub mod test { async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, TransportConfig}, - socket2, + solana_net_utils::{bind_more_with_config, bind_to_with_config, SocketConfig}, solana_sdk::{ net::DEFAULT_TPU_COALESCE, quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, signature::Keypair, signer::Signer, }, - std::{ - collections::HashMap, - os::fd::{FromRawFd, IntoRawFd}, - str::FromStr as _, - }, + std::collections::HashMap, tokio::time::sleep, }; @@ -1356,6 +1352,16 @@ pub mod test { config } + fn make_quic_sockets() -> Vec { + const NUM_QUIC_SOCKETS: usize = 10; + let config = SocketConfig { + reuseaddr: false, + reuseport: true, + }; + let socket = bind_to_with_config("127.0.0.1".parse().unwrap(), 0, config.clone()).unwrap(); + bind_more_with_config(socket, NUM_QUIC_SOCKETS - 1, config).unwrap() + } + fn setup_quic_server( option_staked_nodes: Option, max_connections_per_peer: usize, @@ -1366,28 +1372,14 @@ pub mod test { SocketAddr, Arc, ) { - let sockets = (0..10) - .map(|_| { - let sock = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - ) - .unwrap(); - sock.set_reuse_port(true).unwrap(); - sock.bind(&SocketAddr::from_str("127.0.0.1:42069").unwrap().into()) - .unwrap(); - unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) } - }) - .collect::>(); - + let sockets = make_quic_sockets(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); let server_address = sockets[0].local_addr().unwrap(); let staked_nodes = Arc::new(RwLock::new(option_staked_nodes.unwrap_or_default())); let (_, stats, t) = spawn_server_multi( - "one-million-sol", + "quic_streamer_test", sockets, &keypair, sender, From 8612fbfacda3f5ea143314c60d4bcd0cd66e57c4 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sat, 6 Apr 2024 14:25:20 +0000 Subject: [PATCH 04/17] keep track of ports and reused_port for sock binding --- Cargo.lock | 1 + net-utils/Cargo.toml | 1 + net-utils/src/lib.rs | 76 +++++++++++++++++++++++++++++++++++------ programs/sbf/Cargo.lock | 1 + 4 files changed, 68 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78ce399d156041..3fa06b3e7b87e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6533,6 +6533,7 @@ dependencies = [ "bincode", "clap 3.2.23", "crossbeam-channel", + "lazy_static", "log", "nix 0.26.4", "rand 0.8.5", diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index 3486b30bbb9cda..590368f21a63c8 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } bincode = { workspace = true } clap = { version = "3.1.5", features = ["cargo"] } crossbeam-channel = { workspace = true } +lazy_static = { workspace = true } log = { workspace = true } nix = { workspace = true } rand = { workspace = true } diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index b4d47a3c416803..9f67fcccd932c3 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -1,5 +1,7 @@ //! The `net_utils` module assists with networking #![allow(clippy::arithmetic_side_effects)] +#[macro_use] +extern crate lazy_static; use { crossbeam_channel::unbounded, log::*, @@ -9,7 +11,7 @@ use { collections::{BTreeMap, HashSet}, io::{self, Read, Write}, net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket}, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }, url::Url, @@ -384,6 +386,55 @@ pub fn is_host_port(string: String) -> Result<(), String> { parse_host_port(&string).map(|_| ()) } +lazy_static! { + static ref ALL_USED_PORTS: Mutex> = Mutex::new(Vec::new()); + static ref REUSED_PORTS: Mutex> = Mutex::new(Vec::new()); +} + +#[allow(clippy::collapsible_else_if)] +fn sock_bind(sock: &Socket, addr: SocketAddr, reuse: bool) -> io::Result<()> { + let port = addr.port(); + let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); + let mut reused_ports_lock = REUSED_PORTS.lock().unwrap(); + if reuse { + if !reused_ports_lock.contains(&port) { + // bind for a reuse_port for the first time + if all_used_ports_lock.contains(&port) { + // this port is already binded. + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been binded before", port), + )) + } else { + // this port is not yet binded. + let r = sock.bind(&SockAddr::from(addr)); + if r.is_ok() { + all_used_ports_lock.push(port); + reused_ports_lock.push(port); + } + r + } + } else { + // rebind to a reused port + sock.bind(&SockAddr::from(addr)) + } + } else { + if all_used_ports_lock.contains(&port) { + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been binded before", port), + )) + } else { + let r = sock.bind(&SockAddr::from(addr)); + + if r.is_ok() { + all_used_ports_lock.push(port); + } + r + } + } +} + #[derive(Clone, Debug)] pub struct SocketConfig { pub reuseaddr: bool, @@ -477,12 +528,12 @@ pub fn bind_in_range_with_config( range: PortRange, config: SocketConfig, ) -> io::Result<(u16, UdpSocket)> { - let sock = udp_socket_with_config(config)?; + let sock = udp_socket_with_config(config.clone())?; for port in range.0..range.1 { let addr = SocketAddr::new(ip_addr, port); - - if sock.bind(&SockAddr::from(addr)).is_ok() { + let bind = sock_bind(&sock, addr, config.reuseport); + if bind.is_ok() { let sock: UdpSocket = sock.into(); return Result::Ok((sock.local_addr().unwrap().port(), sock)); } @@ -497,7 +548,8 @@ pub fn bind_in_range_with_config( pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { let sock = udp_socket(false)?; let addr = SocketAddr::new(ip_addr, 0); - match sock.bind(&SockAddr::from(addr)) { + let bind = sock_bind(&sock, addr, false); + match bind { Ok(_) => Result::Ok(sock.into()), Err(err) => Err(io::Error::new( io::ErrorKind::Other, @@ -569,11 +621,13 @@ pub fn bind_to_with_config( port: u16, config: SocketConfig, ) -> io::Result { - let sock = udp_socket_with_config(config)?; + let sock = udp_socket_with_config(config.clone())?; let addr = SocketAddr::new(ip_addr, port); - sock.bind(&SockAddr::from(addr)).map(|_| sock.into()) + let bind = sock_bind(&sock, addr, config.reuseport); + + bind.map(|_| sock.into()) } // binds both a UdpSocket and a TcpListener @@ -595,12 +649,12 @@ pub fn bind_common_with_config( port: u16, config: SocketConfig, ) -> io::Result<(UdpSocket, TcpListener)> { - let sock = udp_socket_with_config(config)?; + let sock = udp_socket_with_config(config.clone())?; let addr = SocketAddr::new(ip_addr, port); - let sock_addr = SockAddr::from(addr); - sock.bind(&sock_addr) - .and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener))) + let bind = sock_bind(&sock, addr, config.reuseport); + + bind.and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener))) } pub fn bind_two_in_range_with_offset( diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 286fc9f9ace536..78e0f1277c8d8f 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5314,6 +5314,7 @@ dependencies = [ "bincode", "clap 3.1.6", "crossbeam-channel", + "lazy_static", "log", "nix", "rand 0.8.5", From 814a10e29df513045917dcb162548160e3490e15 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sat, 6 Apr 2024 17:08:57 +0000 Subject: [PATCH 05/17] add first_reuse to socket config to fix test_snapshots_restart_validity --- gossip/src/cluster_info.rs | 21 ++++-- net-utils/src/lib.rs | 108 ++++++++++++++++++++----------- streamer/src/nonblocking/quic.rs | 4 +- 3 files changed, 91 insertions(+), 42 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 5fdd3709e74575..52db944e5fb2f3 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2817,10 +2817,12 @@ impl Node { let udp_config = SocketConfig { reuseaddr: false, reuseport: true, + first_reuse: true, }; let quic_config = SocketConfig { reuseaddr: false, reuseport: true, + first_reuse: true, }; let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( @@ -2831,8 +2833,11 @@ impl Node { quic_config.clone(), ) .unwrap(); + + let mut quic_config_more = quic_config.clone(); + quic_config_more.first_reuse = false; let tpu_quic = - bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config_more.clone()).unwrap(); let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); @@ -2848,7 +2853,8 @@ impl Node { ) .unwrap(); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config_more.clone()) + .unwrap(); let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); @@ -2938,6 +2944,7 @@ impl Node { let config = SocketConfig { reuseaddr: false, reuseport: false, + first_reuse: false, }; Self::bind_with_config(bind_ip_addr, port_range, config) } @@ -2963,10 +2970,12 @@ impl Node { let udp_config = SocketConfig { reuseaddr: false, reuseport: false, + first_reuse: false, }; let quic_config = SocketConfig { reuseaddr: false, reuseport: true, + first_reuse: true, }; let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) = bind_two_in_range_with_offset_and_config( @@ -2977,8 +2986,10 @@ impl Node { quic_config.clone(), ) .unwrap(); + let mut quic_config_more = quic_config.clone(); + quic_config_more.first_reuse = false; let tpu_quic = - bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config_more.clone()).unwrap(); let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( bind_ip_addr, @@ -2989,7 +3000,8 @@ impl Node { ) .unwrap(); let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); + bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config_more.clone()) + .unwrap(); let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range); let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range); let (_, repair) = Self::bind(bind_ip_addr, port_range); @@ -3074,6 +3086,7 @@ impl Node { let quic_config = SocketConfig { reuseaddr: false, reuseport: true, + first_reuse: true, }; let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( bind_ip_addr, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 9f67fcccd932c3..9ba64a9b0613c4 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -392,37 +392,56 @@ lazy_static! { } #[allow(clippy::collapsible_else_if)] -fn sock_bind(sock: &Socket, addr: SocketAddr, reuse: bool) -> io::Result<()> { +fn sock_bind(sock: &Socket, addr: SocketAddr, config: SocketConfig) -> io::Result<()> { let port = addr.port(); let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); let mut reused_ports_lock = REUSED_PORTS.lock().unwrap(); - if reuse { - if !reused_ports_lock.contains(&port) { + + if config.reuseport { + // binding reuse port + if config.first_reuse { // bind for a reuse_port for the first time if all_used_ports_lock.contains(&port) { - // this port is already binded. + // this port is already bounded. Err(io::Error::new( io::ErrorKind::Other, - format!("{} has already been binded before", port), + format!("{} has already been bound before", port), )) } else { - // this port is not yet binded. - let r = sock.bind(&SockAddr::from(addr)); - if r.is_ok() { - all_used_ports_lock.push(port); - reused_ports_lock.push(port); + if reused_ports_lock.contains(&port) { + // this port is already bounded before as reused ports. + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been bound before as resued_port", port), + )) + } else { + // this port is not yet bounded. + let r = sock.bind(&SockAddr::from(addr)); + if r.is_ok() { + all_used_ports_lock.push(port); + reused_ports_lock.push(port); + } + r } - r } } else { - // rebind to a reused port - sock.bind(&SockAddr::from(addr)) + // binding to existing reuse port + if reused_ports_lock.contains(&port) { + sock.bind(&SockAddr::from(addr)) + } else { + // existing reuse port not found + Err(io::Error::new( + io::ErrorKind::Other, + format!("reuse_port {} has not been bound before", port), + )) + } } } else { + // binding non reuse port if all_used_ports_lock.contains(&port) { Err(io::Error::new( io::ErrorKind::Other, - format!("{} has already been binded before", port), + format!("{} has already been bound before", port), )) } else { let r = sock.bind(&SockAddr::from(addr)); @@ -439,6 +458,7 @@ fn sock_bind(sock: &Socket, addr: SocketAddr, reuse: bool) -> io::Result<()> { pub struct SocketConfig { pub reuseaddr: bool, pub reuseport: bool, + pub first_reuse: bool, } impl Default for SocketConfig { @@ -447,12 +467,13 @@ impl Default for SocketConfig { Self { reuseaddr: false, reuseport: false, + first_reuse: false, } } } #[cfg(any(windows, target_os = "ios"))] -fn udp_socket(_reuseaddr: bool) -> io::Result { +fn udp_socket(_reuseaddr: bool, _first_reuse: bool) -> io::Result { let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; Ok(sock) } @@ -464,10 +485,11 @@ fn udp_socket_with_config(_config: SocketConfig) -> io::Result { } #[cfg(not(any(windows, target_os = "ios")))] -fn udp_socket(reuseaddr: bool) -> io::Result { +fn udp_socket(reuseaddr: bool, first_reuse: bool) -> io::Result { let config = SocketConfig { reuseaddr, reuseport: false, + first_reuse, }; udp_socket_with_config(config) } @@ -484,6 +506,7 @@ fn udp_socket_with_config(config: SocketConfig) -> io::Result { let SocketConfig { reuseaddr, mut reuseport, + first_reuse: _, } = config; let sock = Socket::new(Domain::IPV4, Type::DGRAM, None)?; @@ -532,7 +555,7 @@ pub fn bind_in_range_with_config( for port in range.0..range.1 { let addr = SocketAddr::new(ip_addr, port); - let bind = sock_bind(&sock, addr, config.reuseport); + let bind = sock_bind(&sock, addr, config.clone()); if bind.is_ok() { let sock: UdpSocket = sock.into(); return Result::Ok((sock.local_addr().unwrap().port(), sock)); @@ -546,10 +569,9 @@ pub fn bind_in_range_with_config( } pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { - let sock = udp_socket(false)?; + let sock = udp_socket(false, false)?; let addr = SocketAddr::new(ip_addr, 0); - let bind = sock_bind(&sock, addr, false); - match bind { + match sock.bind(&SockAddr::from(addr)) { Ok(_) => Result::Ok(sock.into()), Err(err) => Err(io::Error::new( io::ErrorKind::Other, @@ -558,7 +580,7 @@ pub fn bind_with_any_port(ip_addr: IpAddr) -> io::Result { } } -// binds many sockets to the same port in a range +// binds many sockets to the same port in a range with port reuse pub fn multi_bind_in_range( ip_addr: IpAddr, range: PortRange, @@ -578,16 +600,23 @@ pub fn multi_bind_in_range( let mut port = 0; let mut error = None; for _ in 0..NUM_TRIES { - port = { - let (port, _) = bind_in_range(ip_addr, range)?; - port - }; // drop the probe, port should be available... briefly. - - let config = SocketConfig { + let mut config = SocketConfig { reuseaddr: true, reuseport: true, + first_reuse: true, }; - for _ in 0..num { + + // find first available reuse port to bind + let sock = bind_in_range_with_config(ip_addr, range, config.clone()); + if sock.is_err() { + continue; + } + let (found, sock) = sock.unwrap(); + port = found; + sockets.push(sock); + config.first_reuse = false; + // bind the rest sock on the reuse port + for _ in 1..num { let sock = bind_to_with_config(ip_addr, port, config.clone()); if let Ok(sock) = sock { sockets.push(sock); @@ -609,11 +638,9 @@ pub fn multi_bind_in_range( } pub fn bind_to(ip_addr: IpAddr, port: u16, reuseaddr: bool) -> io::Result { - let config = SocketConfig { - reuseaddr, - reuseport: false, - }; - bind_to_with_config(ip_addr, port, config) + let sock = udp_socket(reuseaddr, false)?; + let addr = SocketAddr::new(ip_addr, port); + sock.bind(&SockAddr::from(addr)).map(|_| sock.into()) } pub fn bind_to_with_config( @@ -625,7 +652,7 @@ pub fn bind_to_with_config( let addr = SocketAddr::new(ip_addr, port); - let bind = sock_bind(&sock, addr, config.reuseport); + let bind = sock_bind(&sock, addr, config); bind.map(|_| sock.into()) } @@ -639,6 +666,7 @@ pub fn bind_common( let config = SocketConfig { reuseaddr, reuseport: false, + first_reuse: false, }; bind_common_with_config(ip_addr, port, config) } @@ -652,7 +680,7 @@ pub fn bind_common_with_config( let sock = udp_socket_with_config(config.clone())?; let addr = SocketAddr::new(ip_addr, port); - let bind = sock_bind(&sock, addr, config.reuseport); + let bind = sock_bind(&sock, addr, config); bind.and_then(|_| TcpListener::bind(addr).map(|listener| (sock.into(), listener))) } @@ -844,9 +872,15 @@ mod tests { let config = SocketConfig { reuseaddr: true, reuseport: true, + first_reuse: true, + }; + let config2 = SocketConfig { + reuseaddr: true, + reuseport: true, + first_reuse: false, }; - let x = bind_to_with_config(ip_addr, 2002, config.clone()).unwrap(); - let y = bind_to_with_config(ip_addr, 2002, config).unwrap(); + let x = bind_to_with_config(ip_addr, 2002, config).unwrap(); + let y = bind_to_with_config(ip_addr, 2002, config2).unwrap(); assert_eq!( x.local_addr().unwrap().port(), y.local_addr().unwrap().port() diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 29a0855e39fa52..629ef599c9aa59 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1354,11 +1354,13 @@ pub mod test { fn make_quic_sockets() -> Vec { const NUM_QUIC_SOCKETS: usize = 10; - let config = SocketConfig { + let mut config = SocketConfig { reuseaddr: false, reuseport: true, + first_reuse: true, }; let socket = bind_to_with_config("127.0.0.1".parse().unwrap(), 0, config.clone()).unwrap(); + config.first_reuse = false; bind_more_with_config(socket, NUM_QUIC_SOCKETS - 1, config).unwrap() } From 123d74556debf402ebe824fb9d2d4eea55ffddee Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sat, 6 Apr 2024 20:07:46 +0000 Subject: [PATCH 06/17] clippy --- net-utils/src/lib.rs | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 9ba64a9b0613c4..4dd8441c590280 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -391,7 +391,6 @@ lazy_static! { static ref REUSED_PORTS: Mutex> = Mutex::new(Vec::new()); } -#[allow(clippy::collapsible_else_if)] fn sock_bind(sock: &Socket, addr: SocketAddr, config: SocketConfig) -> io::Result<()> { let port = addr.port(); let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); @@ -407,22 +406,20 @@ fn sock_bind(sock: &Socket, addr: SocketAddr, config: SocketConfig) -> io::Resul io::ErrorKind::Other, format!("{} has already been bound before", port), )) + } else if reused_ports_lock.contains(&port) { + // this port is already bounded before as reused ports. + Err(io::Error::new( + io::ErrorKind::Other, + format!("{} has already been bound before as resued_port", port), + )) } else { - if reused_ports_lock.contains(&port) { - // this port is already bounded before as reused ports. - Err(io::Error::new( - io::ErrorKind::Other, - format!("{} has already been bound before as resued_port", port), - )) - } else { - // this port is not yet bounded. - let r = sock.bind(&SockAddr::from(addr)); - if r.is_ok() { - all_used_ports_lock.push(port); - reused_ports_lock.push(port); - } - r + // this port is not yet bounded. + let r = sock.bind(&SockAddr::from(addr)); + if r.is_ok() { + all_used_ports_lock.push(port); + reused_ports_lock.push(port); } + r } } else { // binding to existing reuse port From 6e0c27ae75ce0abcd3d9b6e50596dd60d3abb441 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sat, 6 Apr 2024 21:06:57 +0000 Subject: [PATCH 07/17] fix multi-port quic binding --- gossip/src/cluster_info.rs | 23 ++++++++--------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 52db944e5fb2f3..d709ecddc69ad2 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3083,32 +3083,25 @@ impl Node { let (tpu_port, tpu_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind"); - let quic_config = SocketConfig { - reuseaddr: false, - reuseport: true, - first_reuse: true, - }; - let (_tpu_port_quic, tpu_quic) = Self::bind_with_config( + let (_tpu_port_quic, tpu_quic) = multi_bind_in_range( bind_ip_addr, (tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1), - quic_config.clone(), - ); - let tpu_quic = - bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap(); + QUIC_ENDPOINTS, + ) + .expect("tpu_quic multi_bind"); let (tpu_forwards_port, tpu_forwards_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind"); - let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config( + let (_tpu_forwards_port_quic, tpu_forwards_quic) = multi_bind_in_range( bind_ip_addr, ( tpu_forwards_port + QUIC_PORT_OFFSET, tpu_forwards_port + QUIC_PORT_OFFSET + 1, ), - quic_config.clone(), - ); - let tpu_forwards_quic = - bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap(); + QUIC_ENDPOINTS, + ) + .expect("tpu_forward_quic multi_bind"); let (tpu_vote_port, tpu_vote_sockets) = multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind"); From 5ee7c8b370941d5e57e0c3169d9938aec8542ee1 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 00:41:00 +0000 Subject: [PATCH 08/17] fix streamer tests --- Cargo.lock | 1 + net-utils/src/lib.rs | 1 + streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 29 ++++++++++++++++++++++++++--- streamer/src/quic.rs | 7 +++++++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3fa06b3e7b87e8..11107c72e97c6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7288,6 +7288,7 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rustls", + "serial_test", "socket2 0.5.6", "solana-logger", "solana-measure", diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 4dd8441c590280..4a8634a69c973a 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -393,6 +393,7 @@ lazy_static! { fn sock_bind(sock: &Socket, addr: SocketAddr, config: SocketConfig) -> io::Result<()> { let port = addr.port(); + assert_ne!(port, 0, "don't use sock_bind for any port. "); let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); let mut reused_ports_lock = REUSED_PORTS.lock().unwrap(); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index b884e811c231ec..46dc8b72b1f626 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -39,6 +39,7 @@ x509-parser = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +serial_test = { workspace = true } socket2 = { workspace = true } solana-logger = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 629ef599c9aa59..fd37c0714ad40a 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1296,7 +1296,10 @@ pub mod test { async_channel::unbounded as async_unbounded, crossbeam_channel::{unbounded, Receiver}, quinn::{ClientConfig, IdleTimeout, TransportConfig}, - solana_net_utils::{bind_more_with_config, bind_to_with_config, SocketConfig}, + serial_test::serial, + solana_net_utils::{ + bind_more_with_config, bind_to_with_config, bind_with_any_port, SocketConfig, + }, solana_sdk::{ net::DEFAULT_TPU_COALESCE, quic::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, @@ -1354,12 +1357,20 @@ pub mod test { fn make_quic_sockets() -> Vec { const NUM_QUIC_SOCKETS: usize = 10; + + // find an available port. + let port = { + let t = bind_with_any_port("127.0.0.1".parse().unwrap()).unwrap(); + t.local_addr().unwrap().port() + }; + let mut config = SocketConfig { - reuseaddr: false, + reuseaddr: true, reuseport: true, first_reuse: true, }; - let socket = bind_to_with_config("127.0.0.1".parse().unwrap(), 0, config.clone()).unwrap(); + let socket = + bind_to_with_config("127.0.0.1".parse().unwrap(), port, config.clone()).unwrap(); config.first_reuse = false; bind_more_with_config(socket, NUM_QUIC_SOCKETS - 1, config).unwrap() } @@ -1565,6 +1576,7 @@ pub mod test { } } + #[serial] #[tokio::test] async fn test_quic_server_exit() { let (t, exit, _receiver, _server_address, _stats) = setup_quic_server(None, 1); @@ -1572,6 +1584,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_timeout() { solana_logger::setup(); @@ -1581,6 +1594,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_packet_batcher() { solana_logger::setup(); @@ -1630,6 +1644,7 @@ pub mod test { handle.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_stream_timeout() { solana_logger::setup(); @@ -1660,6 +1675,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_server_block_multiple_connections() { solana_logger::setup(); @@ -1669,6 +1685,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_server_multiple_connections_on_single_client_endpoint() { solana_logger::setup(); @@ -1728,6 +1745,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_server_multiple_writes() { solana_logger::setup(); @@ -1737,6 +1755,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_server_staked_connection_removal() { solana_logger::setup(); @@ -1762,6 +1781,7 @@ pub mod test { assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); } + #[serial] #[tokio::test] async fn test_quic_server_zero_staked_connection_removal() { // In this test, the client has a pubkey, but is not in stake table. @@ -1788,6 +1808,7 @@ pub mod test { assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); } + #[serial] #[tokio::test] async fn test_quic_server_unstaked_connection_removal() { solana_logger::setup(); @@ -1806,6 +1827,7 @@ pub mod test { assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); } + #[serial] #[tokio::test] async fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); @@ -1835,6 +1857,7 @@ pub mod test { t.await.unwrap(); } + #[serial] #[tokio::test] async fn test_quic_server_multiple_streams() { solana_logger::setup(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 483f045b727199..affa03ab3b0cac 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -559,6 +559,7 @@ mod test { super::*, crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, crossbeam_channel::unbounded, + serial_test::serial, solana_sdk::net::DEFAULT_TPU_COALESCE, std::net::SocketAddr, }; @@ -597,6 +598,7 @@ mod test { (t, exit, receiver, server_address) } + #[serial] #[test] fn test_quic_server_exit() { let (t, exit, _receiver, _server_address) = setup_quic_server(); @@ -604,6 +606,7 @@ mod test { t.join().unwrap(); } + #[serial] #[test] fn test_quic_timeout() { solana_logger::setup(); @@ -614,6 +617,7 @@ mod test { t.join().unwrap(); } + #[serial] #[test] fn test_quic_server_block_multiple_connections() { solana_logger::setup(); @@ -625,6 +629,7 @@ mod test { t.join().unwrap(); } + #[serial] #[test] fn test_quic_server_multiple_streams() { solana_logger::setup(); @@ -660,6 +665,7 @@ mod test { t.join().unwrap(); } + #[serial] #[test] fn test_quic_server_multiple_writes() { solana_logger::setup(); @@ -671,6 +677,7 @@ mod test { t.join().unwrap(); } + #[serial] #[test] fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); From 7615d35f34bd62944d1c3075a69840f3b407dabf Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 02:06:20 +0000 Subject: [PATCH 09/17] fix default gossip port --- validator/src/main.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/validator/src/main.rs b/validator/src/main.rs index c8494221d614b0..a1d31f900f59f5 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1821,12 +1821,16 @@ pub fn main() { let gossip_addr = SocketAddr::new( gossip_host, value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { - solana_net_utils::find_available_port_in_range(bind_address, (0, 1)).unwrap_or_else( - |err| { - eprintln!("Unable to find an available gossip port: {err}"); - exit(1); - }, - ) + // find an available port. + let get_port = || -> Result<_, std::io::Error> { + let t = solana_net_utils::bind_with_any_port(bind_address)?; + let addr = t.local_addr()?; + Ok(addr.port()) + }; + get_port().unwrap_or_else(|err| { + eprintln!("Unable to find an available gossip port: {err}"); + exit(1); + }) }), ); From 9a83ec51b9f4941ef73e2ea63d53e2d7626d9127 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 02:53:34 +0000 Subject: [PATCH 10/17] fix gossip spy port --- gossip/src/main.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/gossip/src/main.rs b/gossip/src/main.rs index 1f31195f431d36..9d725d2c3aba30 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -236,11 +236,16 @@ fn process_spy(matches: &ArgMatches, socket_addr_space: SocketAddrSpace) -> std: let gossip_addr = SocketAddr::new( gossip_host, value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { - solana_net_utils::find_available_port_in_range( - IpAddr::V4(Ipv4Addr::UNSPECIFIED), - (0, 1), - ) - .expect("unable to find an available gossip port") + // find an available port. + let get_port = || -> Result<_, std::io::Error> { + let t = solana_net_utils::bind_with_any_port(bind_address)?; + let addr = t.local_addr()?; + Ok(addr.port()) + }; + get_port().unwrap_or_else(|err| { + eprintln!("Unable to find an available gossip port: {err}"); + exit(1); + }) }), ); let discover_timeout = Duration::from_secs(timeout.unwrap_or(u64::MAX)); From 2c60c82790c6eb5f34ad663c82af11adb9098e4c Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 03:24:49 +0000 Subject: [PATCH 11/17] test_optimistic_confirmation_violation_without_tower require port stay the same in restart. Add clear ports to fix this. --- Cargo.lock | 1 + local-cluster/Cargo.toml | 1 + local-cluster/tests/local_cluster.rs | 2 ++ net-utils/src/lib.rs | 8 ++++++++ 4 files changed, 12 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 11107c72e97c6e..fc829de2bcbe02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6426,6 +6426,7 @@ dependencies = [ "solana-gossip", "solana-ledger", "solana-logger", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index 07b30030295e52..9f51f75da0e51d 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -23,6 +23,7 @@ solana-entry = { workspace = true } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 44032aeeb4d38b..3db7af74d5fbf5 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3339,6 +3339,8 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b } } + solana_net_utils::clear_used_ports(); + // Step 3: // Run validator C only to make it produce and vote on its own fork. info!("Restart validator C again!!!"); diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 4a8634a69c973a..7033965b75537a 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -391,6 +391,14 @@ lazy_static! { static ref REUSED_PORTS: Mutex> = Mutex::new(Vec::new()); } +pub fn clear_used_ports() { + let mut all_used_ports_lock = ALL_USED_PORTS.lock().unwrap(); + let mut reused_ports_lock = REUSED_PORTS.lock().unwrap(); + + all_used_ports_lock.clear(); + reused_ports_lock.clear(); +} + fn sock_bind(sock: &Socket, addr: SocketAddr, config: SocketConfig) -> io::Result<()> { let port = addr.port(); assert_ne!(port, 0, "don't use sock_bind for any port. "); From 66f9243e64b79d23b14433f159796513cc7c88cf Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 03:38:55 +0000 Subject: [PATCH 12/17] clean up ports after each test --- streamer/src/nonblocking/quic.rs | 12 ++++++++++++ streamer/src/quic.rs | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index fd37c0714ad40a..ef4a566a12c66b 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1582,6 +1582,7 @@ pub mod test { let (t, exit, _receiver, _server_address, _stats) = setup_quic_server(None, 1); exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1592,6 +1593,7 @@ pub mod test { check_timeout(receiver, server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1642,6 +1644,7 @@ pub mod test { assert_eq!(i, num_packets); exit.store(true, Ordering::Relaxed); handle.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1673,6 +1676,7 @@ pub mod test { exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1683,6 +1687,7 @@ pub mod test { check_block_multiple_connections(server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1743,6 +1748,7 @@ pub mod test { exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1753,6 +1759,7 @@ pub mod test { check_multiple_writes(receiver, server_address, None).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1779,6 +1786,7 @@ pub mod test { ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1806,6 +1814,7 @@ pub mod test { ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1825,6 +1834,7 @@ pub mod test { ); assert_eq!(stats.connection_removed.load(Ordering::Relaxed), 1); assert_eq!(stats.connection_remove_failed.load(Ordering::Relaxed), 0); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1855,6 +1865,7 @@ pub mod test { check_unstaked_node_connect_failure(server_address).await; exit.store(true, Ordering::Relaxed); t.await.unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -1891,6 +1902,7 @@ pub mod test { t.await.unwrap(); assert_eq!(stats.total_connections.load(Ordering::Relaxed), 0); assert_eq!(stats.total_new_connections.load(Ordering::Relaxed), 2); + solana_net_utils::clear_used_ports(); } #[test] diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index affa03ab3b0cac..1aacd551722867 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -604,6 +604,7 @@ mod test { let (t, exit, _receiver, _server_address) = setup_quic_server(); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -615,6 +616,7 @@ mod test { runtime.block_on(check_timeout(receiver, server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -627,6 +629,7 @@ mod test { runtime.block_on(check_block_multiple_connections(server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -663,6 +666,7 @@ mod test { runtime.block_on(check_multiple_streams(receiver, server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -675,6 +679,7 @@ mod test { runtime.block_on(check_multiple_writes(receiver, server_address, None)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } #[serial] @@ -711,5 +716,6 @@ mod test { runtime.block_on(check_unstaked_node_connect_failure(server_address)); exit.store(true, Ordering::Relaxed); t.join().unwrap(); + solana_net_utils::clear_used_ports(); } } From cf1e69c0fdc06ee15c5a17e3c88340cc1315e3e9 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 03:43:30 +0000 Subject: [PATCH 13/17] fix build --- gossip/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gossip/src/main.rs b/gossip/src/main.rs index 9d725d2c3aba30..3bab06ee811d80 100644 --- a/gossip/src/main.rs +++ b/gossip/src/main.rs @@ -238,7 +238,7 @@ fn process_spy(matches: &ArgMatches, socket_addr_space: SocketAddrSpace) -> std: value_t!(matches, "gossip_port", u16).unwrap_or_else(|_| { // find an available port. let get_port = || -> Result<_, std::io::Error> { - let t = solana_net_utils::bind_with_any_port(bind_address)?; + let t = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))?; let addr = t.local_addr()?; Ok(addr.port()) }; From acc9b7231bd0d3b895f0fd57883c3de27ce4f820 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 03:43:59 +0000 Subject: [PATCH 14/17] no reuse of ipaddr --- net-utils/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 7033965b75537a..494afceab72949 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -607,7 +607,7 @@ pub fn multi_bind_in_range( let mut error = None; for _ in 0..NUM_TRIES { let mut config = SocketConfig { - reuseaddr: true, + reuseaddr: false, reuseport: true, first_reuse: true, }; From f6b1bcda19369e9fd15b445664fd696e2a282d38 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 13:39:09 +0000 Subject: [PATCH 15/17] fix find port --- gossip/src/cluster_info.rs | 4 +++- net-utils/src/lib.rs | 14 +++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index d709ecddc69ad2..8f6fe09ffec1b3 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -3707,7 +3707,9 @@ mod tests { VALIDATOR_PORT_RANGE.1 + (2 * MINIMUM_VALIDATOR_PORT_RANGE_WIDTH), ); let ip = IpAddr::V4(Ipv4Addr::LOCALHOST); - let port = bind_in_range(ip, port_range).expect("Failed to bind").0; + let port = + find_available_port_in_range(ip, port_range).expect("Failed to find available port"); + let config = NodeConfig { gossip_addr: socketaddr!(Ipv4Addr::LOCALHOST, port), port_range, diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 494afceab72949..c7b2ed702f3eb0 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -740,8 +740,20 @@ pub fn find_available_port_in_range(ip_addr: IpAddr, range: PortRange) -> io::Re let (start, end) = range; let mut tries_left = end - start; let mut rand_port = thread_rng().gen_range(start..end); + + let lookup = |ip_addr, port| -> io::Result<()> { + let config = SocketConfig { + reuseaddr: false, + reuseport: false, + first_reuse: false, + }; + let sock = udp_socket_with_config(config)?; + let addr = SocketAddr::new(ip_addr, port); + sock.bind(&SockAddr::from(addr)) + }; + loop { - match bind_common(ip_addr, rand_port, false) { + match lookup(ip_addr, rand_port) { Ok(_) => { break Ok(rand_port); } From ec6999d0e2fa3b87bdbc041994fb15eb7ddee6e6 Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Sun, 7 Apr 2024 14:00:19 +0000 Subject: [PATCH 16/17] clear used port when localcluster drop --- local-cluster/src/local_cluster.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 3d8df638fbbb81..3ce2d1c386e82a 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -1019,5 +1019,6 @@ impl Cluster for LocalCluster { impl Drop for LocalCluster { fn drop(&mut self) { self.close(); + solana_net_utils::clear_used_ports(); } } From 56e3063abde3682ef1a81e15ec9fea87a3d58ede Mon Sep 17 00:00:00 2001 From: HaoranYi Date: Mon, 8 Apr 2024 15:52:33 +0000 Subject: [PATCH 17/17] Add a test for port bind --- net-utils/src/lib.rs | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index c7b2ed702f3eb0..fbdc8f632d887a 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -953,6 +953,45 @@ mod tests { bind_common_in_range(ip_addr, (port, port + 1)).unwrap_err(); } + #[test] + fn test_port_binding() { + let ip_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + let addr = SocketAddr::new(ip_addr, 3527); + + let do_bind = |reuse1, reuse2, expect| { + let config = SocketConfig { + reuseaddr: false, + reuseport: reuse1, + first_reuse: false, + }; + let sock = udp_socket_with_config(config).unwrap(); + let b1 = sock.bind(&SockAddr::from(addr)); + + let config = SocketConfig { + reuseaddr: false, + reuseport: reuse2, + first_reuse: false, + }; + let sock = udp_socket_with_config(config).unwrap(); + let b2 = sock.bind(&SockAddr::from(addr)); + + assert_eq!((b1.is_ok(), b2.is_ok()), expect); + }; + + // Testing different config for binding two socks + // reuseport (false, false), expect first ok, second err. + do_bind(false, false, (true, false)); + + // reuseport (true, true), both should be ok. + do_bind(true, true, (true, true)); + + // reuseport (true, false), expect first ok, second err. + do_bind(true, false, (true, false)); + + // reuseport (false, true), expect first ok, second err. + do_bind(false, true, (true, false)); + } + #[test] fn test_get_public_ip_addr_none() { solana_logger::setup();