Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions vortexor/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ pub struct Cli {
#[arg(long, value_parser = parse_port_range, value_name = "MIN_PORT-MAX_PORT", default_value = get_default_port_range())]
pub dynamic_port_range: (u16, u16),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many dynamic ports does vortexor need? Maybe it could be easier to just make tpu_address and tpu_forwards_address required arguments and remove the dynamic_port_range argument?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the dynamic_port_range is also used for client socket binds for rpc and websocket. We may add more sockets in the future such as for RPC service of the vortexor for subscription management.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, yes in this case it makes sense to have it to deconflict with agave. Also please make sure that defaults here are not the same as for agave itself, so if this is not provided we do not start accidentally binding on top of agave's default port range=)


/// Optional TPU address to bind to. If not specified, the vortexor will bind to
/// the first available port in the dynamic port range. When this argument is
/// specified, the --bind-address and --dynamic-port-range arguments are ignored.
#[arg(long, value_name = "HOST:PORT")]
pub tpu_address: Option<SocketAddr>,

/// Optional TPU-forward address to bind to. If not specified, the vortexor will bind to
/// the first available port in the dynamic port range after binding the tpu_address.
/// When this argument is specified, the --bind-address and --dynamic-port-range
/// arguments are ignored.
#[arg(long, value_name = "HOST:PORT")]
pub tpu_forward_address: Option<SocketAddr>,

/// Controls the max concurrent connections per IpAddr.
#[arg(long, default_value_t = DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER)]
pub max_connections_per_peer: usize,
Expand Down
38 changes: 33 additions & 5 deletions vortexor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
solana_core::banking_trace::BankingTracer,
solana_logger::redirect_stderr_to_file,
solana_net_utils::{bind_in_range_with_config, SocketConfig},
solana_sdk::{signature::read_keypair_file, signer::Signer},
solana_sdk::{quic::QUIC_PORT_OFFSET, signature::read_keypair_file, signer::Signer},
solana_streamer::streamer::StakedNodes,
solana_vortexor::{
cli::Cli,
Expand All @@ -20,7 +20,7 @@ use {
std::{
collections::HashMap,
env,
net::IpAddr,
net::{IpAddr, SocketAddr},
sync::{atomic::AtomicBool, Arc, RwLock},
time::Duration,
},
Expand Down Expand Up @@ -77,14 +77,21 @@ pub fn main() {
let tpu_coalesce = Duration::from_millis(args.tpu_coalesce_ms);
let dynamic_port_range = args.dynamic_port_range;

let tpu_address = args.tpu_address;
let tpu_forward_address = args.tpu_forward_address;
let max_streams_per_ms = args.max_streams_per_ms;
let exit = Arc::new(AtomicBool::new(false));
// To be linked with the Tpu sigverify and forwarder service
let (tpu_sender, tpu_receiver) = bounded(DEFAULT_CHANNEL_SIZE);
let (tpu_fwd_sender, _tpu_fwd_receiver) = bounded(DEFAULT_CHANNEL_SIZE);

let tpu_sockets =
Vortexor::create_tpu_sockets(*bind_address, dynamic_port_range, num_quic_endpoints);
let tpu_sockets = Vortexor::create_tpu_sockets(
*bind_address,
dynamic_port_range,
tpu_address,
tpu_forward_address,
num_quic_endpoints,
);

let (banking_tracer, _) = BankingTracer::new(
None, // Not interesed in banking tracing
Expand Down Expand Up @@ -125,7 +132,7 @@ pub fn main() {
DEFAULT_SENDER_THREADS_COUNT,
DEFAULT_BATCH_SIZE,
DEFAULT_RECV_TIMEOUT,
destinations,
destinations.clone(),
);

info!("Creating the SigVerifier");
Expand Down Expand Up @@ -156,6 +163,27 @@ pub fn main() {
tpu_sockets.tpu_quic_fwd[0].local_addr()
);

let tpu_address = tpu_sockets.tpu_quic[0].local_addr().unwrap();
let tpu_public_address = SocketAddr::new(
tpu_address.ip(),
tpu_address.port().saturating_sub(QUIC_PORT_OFFSET),
);
let tpu_fwd_address = tpu_sockets.tpu_quic_fwd[0].local_addr().unwrap();
let tpu_fwd_public_address = SocketAddr::new(
tpu_fwd_address.ip(),
tpu_fwd_address.port().saturating_sub(QUIC_PORT_OFFSET),
);

for destination in destinations.read().unwrap().iter() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great UX, thank you!

info!(
"To pair the validator with receiver address {destination} with this \
vortexor, add the following arguments in the validator's start command: \
--tpu-vortexor-receiver-address {destination} \
--public-tpu-address {tpu_public_address} \
--public-tpu-forward-address {tpu_fwd_public_address}",
);
}

let vortexor = Vortexor::create_vortexor(
tpu_sockets,
staked_nodes,
Expand Down
50 changes: 35 additions & 15 deletions vortexor/src/vortexor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
banking_trace::TracedSender, sigverify::TransactionSigVerifier,
sigverify_stage::SigVerifyStage,
},
solana_net_utils::{bind_in_range_with_config, bind_more_with_config, SocketConfig},
solana_net_utils::{multi_bind_in_range_with_config, SocketConfig},
solana_perf::packet::PacketBatch,
solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
Expand All @@ -13,7 +13,7 @@ use {
streamer::StakedNodes,
},
std::{
net::UdpSocket,
net::{SocketAddr, UdpSocket},
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
thread::{self, JoinHandle},
time::Duration,
Expand Down Expand Up @@ -56,26 +56,27 @@ impl Vortexor {
pub fn create_tpu_sockets(
bind_address: std::net::IpAddr,
dynamic_port_range: (u16, u16),
tpu_address: Option<SocketAddr>,
tpu_forward_address: Option<SocketAddr>,
num_quic_endpoints: usize,
) -> TpuSockets {
let quic_config = SocketConfig::default().reuseport(true);

let (_, tpu_quic) =
bind_in_range_with_config(bind_address, dynamic_port_range, quic_config)
.expect("expected bind to succeed");

let tpu_quic_port = tpu_quic.local_addr().unwrap().port();
let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config).unwrap();

let (_, tpu_quic_fwd) = bind_in_range_with_config(
let tpu_quic = bind_sockets(
bind_address,
(tpu_quic_port.saturating_add(1), dynamic_port_range.1),
dynamic_port_range,
tpu_address,
num_quic_endpoints,
quic_config,
)
.expect("expected bind to succeed");
);

let tpu_quic_fwd =
bind_more_with_config(tpu_quic_fwd, num_quic_endpoints, quic_config).unwrap();
let tpu_quic_fwd = bind_sockets(
bind_address,
dynamic_port_range,
tpu_forward_address,
num_quic_endpoints,
quic_config,
);

TpuSockets {
tpu_quic,
Expand Down Expand Up @@ -177,3 +178,22 @@ impl Vortexor {
Ok(())
}
}

/// Binds the sockets to the specified address and port range if address is Some.
/// If the address is None, it binds to the specified bind_address and port range.
fn bind_sockets(
bind_address: std::net::IpAddr,
port_range: (u16, u16),
address: Option<SocketAddr>,
num_quic_endpoints: usize,
quic_config: SocketConfig,
) -> Vec<UdpSocket> {
let (bind_address, port_range) = address
.map(|addr| (addr.ip(), (addr.port(), addr.port().saturating_add(1))))
.unwrap_or((bind_address, port_range));

let (_, sockets) =
multi_bind_in_range_with_config(bind_address, port_range, quic_config, num_quic_endpoints)
.expect("expected bind to succeed");
sockets
}
2 changes: 2 additions & 0 deletions vortexor/tests/vortexor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ async fn test_vortexor() {
let tpu_sockets = Vortexor::create_tpu_sockets(
bind_address,
VALIDATOR_PORT_RANGE,
None, // tpu_address
None, // tpu_forward_address
DEFAULT_NUM_QUIC_ENDPOINTS,
);

Expand Down
Loading