diff --git a/vortexor/src/cli.rs b/vortexor/src/cli.rs index e5eedd8f752b1e..7b91887ec44230 100644 --- a/vortexor/src/cli.rs +++ b/vortexor/src/cli.rs @@ -92,6 +92,19 @@ pub struct Cli { #[arg(long, value_parser = parse_port_range, value_name = "MIN_PORT-MAX_PORT", default_value = get_default_port_range())] pub dynamic_port_range: (u16, u16), + /// Optional TPU address to bind to. If not specified, the vortexor will bind to + /// the first available port in the dynamic port range. When this argument is + /// specified, the --bind-address and --dynamic-port-range arguments are ignored. + #[arg(long, value_name = "HOST:PORT")] + pub tpu_address: Option, + + /// Optional TPU-forward address to bind to. If not specified, the vortexor will bind to + /// the first available port in the dynamic port range after binding the tpu_address. + /// When this argument is specified, the --bind-address and --dynamic-port-range + /// arguments are ignored. + #[arg(long, value_name = "HOST:PORT")] + pub tpu_forward_address: Option, + /// Controls the max concurrent connections per IpAddr. #[arg(long, default_value_t = DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER)] pub max_connections_per_peer: usize, diff --git a/vortexor/src/main.rs b/vortexor/src/main.rs index 73b53bbefffaf2..906c7dbedd807f 100644 --- a/vortexor/src/main.rs +++ b/vortexor/src/main.rs @@ -5,7 +5,7 @@ use { solana_core::banking_trace::BankingTracer, solana_logger::redirect_stderr_to_file, solana_net_utils::{bind_in_range_with_config, SocketConfig}, - solana_sdk::{signature::read_keypair_file, signer::Signer}, + solana_sdk::{quic::QUIC_PORT_OFFSET, signature::read_keypair_file, signer::Signer}, solana_streamer::streamer::StakedNodes, solana_vortexor::{ cli::Cli, @@ -20,7 +20,7 @@ use { std::{ collections::HashMap, env, - net::IpAddr, + net::{IpAddr, SocketAddr}, sync::{atomic::AtomicBool, Arc, RwLock}, time::Duration, }, @@ -77,14 +77,21 @@ pub fn main() { let tpu_coalesce = Duration::from_millis(args.tpu_coalesce_ms); let dynamic_port_range = args.dynamic_port_range; + let tpu_address = args.tpu_address; + let tpu_forward_address = args.tpu_forward_address; let max_streams_per_ms = args.max_streams_per_ms; let exit = Arc::new(AtomicBool::new(false)); // To be linked with the Tpu sigverify and forwarder service let (tpu_sender, tpu_receiver) = bounded(DEFAULT_CHANNEL_SIZE); let (tpu_fwd_sender, _tpu_fwd_receiver) = bounded(DEFAULT_CHANNEL_SIZE); - let tpu_sockets = - Vortexor::create_tpu_sockets(*bind_address, dynamic_port_range, num_quic_endpoints); + let tpu_sockets = Vortexor::create_tpu_sockets( + *bind_address, + dynamic_port_range, + tpu_address, + tpu_forward_address, + num_quic_endpoints, + ); let (banking_tracer, _) = BankingTracer::new( None, // Not interesed in banking tracing @@ -125,7 +132,7 @@ pub fn main() { DEFAULT_SENDER_THREADS_COUNT, DEFAULT_BATCH_SIZE, DEFAULT_RECV_TIMEOUT, - destinations, + destinations.clone(), ); info!("Creating the SigVerifier"); @@ -156,6 +163,27 @@ pub fn main() { tpu_sockets.tpu_quic_fwd[0].local_addr() ); + let tpu_address = tpu_sockets.tpu_quic[0].local_addr().unwrap(); + let tpu_public_address = SocketAddr::new( + tpu_address.ip(), + tpu_address.port().saturating_sub(QUIC_PORT_OFFSET), + ); + let tpu_fwd_address = tpu_sockets.tpu_quic_fwd[0].local_addr().unwrap(); + let tpu_fwd_public_address = SocketAddr::new( + tpu_fwd_address.ip(), + tpu_fwd_address.port().saturating_sub(QUIC_PORT_OFFSET), + ); + + for destination in destinations.read().unwrap().iter() { + info!( + "To pair the validator with receiver address {destination} with this \ + vortexor, add the following arguments in the validator's start command: \ + --tpu-vortexor-receiver-address {destination} \ + --public-tpu-address {tpu_public_address} \ + --public-tpu-forward-address {tpu_fwd_public_address}", + ); + } + let vortexor = Vortexor::create_vortexor( tpu_sockets, staked_nodes, diff --git a/vortexor/src/vortexor.rs b/vortexor/src/vortexor.rs index b2230bc1915d54..996275be2a6a3a 100644 --- a/vortexor/src/vortexor.rs +++ b/vortexor/src/vortexor.rs @@ -4,7 +4,7 @@ use { banking_trace::TracedSender, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, }, - solana_net_utils::{bind_in_range_with_config, bind_more_with_config, SocketConfig}, + solana_net_utils::{multi_bind_in_range_with_config, SocketConfig}, solana_perf::packet::PacketBatch, solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair}, solana_streamer::{ @@ -13,7 +13,7 @@ use { streamer::StakedNodes, }, std::{ - net::UdpSocket, + net::{SocketAddr, UdpSocket}, sync::{atomic::AtomicBool, Arc, Mutex, RwLock}, thread::{self, JoinHandle}, time::Duration, @@ -56,26 +56,27 @@ impl Vortexor { pub fn create_tpu_sockets( bind_address: std::net::IpAddr, dynamic_port_range: (u16, u16), + tpu_address: Option, + tpu_forward_address: Option, num_quic_endpoints: usize, ) -> TpuSockets { let quic_config = SocketConfig::default().reuseport(true); - let (_, tpu_quic) = - bind_in_range_with_config(bind_address, dynamic_port_range, quic_config) - .expect("expected bind to succeed"); - - let tpu_quic_port = tpu_quic.local_addr().unwrap().port(); - let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config).unwrap(); - - let (_, tpu_quic_fwd) = bind_in_range_with_config( + let tpu_quic = bind_sockets( bind_address, - (tpu_quic_port.saturating_add(1), dynamic_port_range.1), + dynamic_port_range, + tpu_address, + num_quic_endpoints, quic_config, - ) - .expect("expected bind to succeed"); + ); - let tpu_quic_fwd = - bind_more_with_config(tpu_quic_fwd, num_quic_endpoints, quic_config).unwrap(); + let tpu_quic_fwd = bind_sockets( + bind_address, + dynamic_port_range, + tpu_forward_address, + num_quic_endpoints, + quic_config, + ); TpuSockets { tpu_quic, @@ -177,3 +178,22 @@ impl Vortexor { Ok(()) } } + +/// Binds the sockets to the specified address and port range if address is Some. +/// If the address is None, it binds to the specified bind_address and port range. +fn bind_sockets( + bind_address: std::net::IpAddr, + port_range: (u16, u16), + address: Option, + num_quic_endpoints: usize, + quic_config: SocketConfig, +) -> Vec { + let (bind_address, port_range) = address + .map(|addr| (addr.ip(), (addr.port(), addr.port().saturating_add(1)))) + .unwrap_or((bind_address, port_range)); + + let (_, sockets) = + multi_bind_in_range_with_config(bind_address, port_range, quic_config, num_quic_endpoints) + .expect("expected bind to succeed"); + sockets +} diff --git a/vortexor/tests/vortexor.rs b/vortexor/tests/vortexor.rs index 7cfc694aa6f99c..cbab1edb8e0ad1 100644 --- a/vortexor/tests/vortexor.rs +++ b/vortexor/tests/vortexor.rs @@ -49,6 +49,8 @@ async fn test_vortexor() { let tpu_sockets = Vortexor::create_tpu_sockets( bind_address, VALIDATOR_PORT_RANGE, + None, // tpu_address + None, // tpu_forward_address DEFAULT_NUM_QUIC_ENDPOINTS, );