Skip to content

Commit 45c1069

Browse files
authored
update socket config and create builder pattern (anza-xyz#3929)
* update socket config and create builder pattern * reuse new code for deprecated methods * add default config for readwrite socket * leave default buffer size to OS * switch to unified socket config by reuseport * remove cfg gating and add derive copy
1 parent 7740b82 commit 45c1069

File tree

10 files changed

+273
-140
lines changed

10 files changed

+273
-140
lines changed

bench-streamer/src/main.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use {
44
clap::{crate_description, crate_name, Arg, Command},
55
crossbeam_channel::unbounded,
6-
solana_net_utils::bind_to_unspecified,
6+
solana_net_utils::{bind_to_unspecified, SocketConfig},
77
solana_streamer::{
88
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
99
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
@@ -95,9 +95,10 @@ fn main() -> Result<()> {
9595
let mut read_channels = Vec::new();
9696
let mut read_threads = Vec::new();
9797
let recycler = PacketBatchRecycler::default();
98-
let (_port, read_sockets) = solana_net_utils::multi_bind_in_range(
98+
let (_port, read_sockets) = solana_net_utils::multi_bind_in_range_with_config(
9999
ip_addr,
100100
(port, port + num_sockets as u16),
101+
SocketConfig::default().reuseport(true),
101102
num_sockets,
102103
)
103104
.unwrap();

connection-cache/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ rayon = { workspace = true }
2222
solana-keypair = { workspace = true }
2323
solana-measure = { workspace = true }
2424
solana-metrics = { workspace = true }
25+
solana-net-utils = { workspace = true }
2526
solana-time-utils = { workspace = true }
2627
solana-transaction-error = { workspace = true }
2728
thiserror = { workspace = true }

connection-cache/src/connection_cache.rs

+11-4
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ mod tests {
514514
async_trait::async_trait,
515515
rand::{Rng, SeedableRng},
516516
rand_chacha::ChaChaRng,
517+
solana_net_utils::SocketConfig,
517518
solana_transaction_error::TransportResult,
518519
std::{
519520
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
@@ -571,8 +572,11 @@ mod tests {
571572
fn default() -> Self {
572573
Self {
573574
udp_socket: Arc::new(
574-
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
575-
.expect("Unable to bind to UDP socket"),
575+
solana_net_utils::bind_with_any_port_with_config(
576+
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
577+
SocketConfig::default(),
578+
)
579+
.expect("Unable to bind to UDP socket"),
576580
),
577581
}
578582
}
@@ -582,8 +586,11 @@ mod tests {
582586
fn new() -> Result<Self, ClientError> {
583587
Ok(Self {
584588
udp_socket: Arc::new(
585-
solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED))
586-
.map_err(Into::<ClientError>::into)?,
589+
solana_net_utils::bind_with_any_port_with_config(
590+
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
591+
SocketConfig::default(),
592+
)
593+
.map_err(Into::<ClientError>::into)?,
587594
),
588595
})
589596
}

gossip/src/cluster_info.rs

+105-69
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,10 @@ use {
5454
solana_ledger::shred::Shred,
5555
solana_measure::measure::Measure,
5656
solana_net_utils::{
57-
bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config,
58-
bind_more_with_config, bind_to_localhost, bind_to_unspecified,
57+
bind_common_in_range_with_config, bind_common_with_config, bind_in_range,
58+
bind_in_range_with_config, bind_more_with_config, bind_to_localhost, bind_to_unspecified,
5959
bind_two_in_range_with_offset_and_config, find_available_port_in_range,
60-
multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
60+
multi_bind_in_range_with_config, PortRange, SocketConfig, VALIDATOR_PORT_RANGE,
6161
},
6262
solana_perf::{
6363
data_budget::DataBudget,
@@ -2614,21 +2614,20 @@ impl Node {
26142614
let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
26152615
let port_range = (1024, 65535);
26162616

2617-
let udp_config = SocketConfig { reuseport: false };
2618-
let quic_config = SocketConfig { reuseport: true };
2617+
let udp_config = SocketConfig::default();
2618+
let quic_config = SocketConfig::default().reuseport(true);
26192619
let ((_tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
26202620
bind_two_in_range_with_offset_and_config(
26212621
localhost_ip_addr,
26222622
port_range,
26232623
QUIC_PORT_OFFSET,
2624-
udp_config.clone(),
2625-
quic_config.clone(),
2624+
udp_config,
2625+
quic_config,
26262626
)
26272627
.unwrap();
2628-
let tpu_quic =
2629-
bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap();
2628+
let tpu_quic = bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config).unwrap();
26302629
let (gossip_port, (gossip, ip_echo)) =
2631-
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
2630+
bind_common_in_range_with_config(localhost_ip_addr, port_range, udp_config).unwrap();
26322631
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
26332632
let tvu = bind_to_localhost().unwrap();
26342633
let tvu_quic = bind_to_localhost().unwrap();
@@ -2637,13 +2636,12 @@ impl Node {
26372636
localhost_ip_addr,
26382637
port_range,
26392638
QUIC_PORT_OFFSET,
2640-
udp_config.clone(),
2641-
quic_config.clone(),
2639+
udp_config,
2640+
quic_config,
26422641
)
26432642
.unwrap();
26442643
let tpu_forwards_quic =
2645-
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone())
2646-
.unwrap();
2644+
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap();
26472645
let tpu_vote = bind_to_localhost().unwrap();
26482646
let tpu_vote_quic = bind_to_localhost().unwrap();
26492647
let tpu_vote_quic =
@@ -2732,21 +2730,19 @@ impl Node {
27322730
port_range: PortRange,
27332731
bind_ip_addr: IpAddr,
27342732
) -> (u16, (UdpSocket, TcpListener)) {
2733+
let config = SocketConfig::default();
27352734
if gossip_addr.port() != 0 {
27362735
(
27372736
gossip_addr.port(),
2738-
bind_common(bind_ip_addr, gossip_addr.port()).unwrap_or_else(|e| {
2739-
panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e)
2740-
}),
2737+
bind_common_with_config(bind_ip_addr, gossip_addr.port(), config).unwrap_or_else(
2738+
|e| panic!("gossip_addr bind_to port {}: {}", gossip_addr.port(), e),
2739+
),
27412740
)
27422741
} else {
2743-
bind_common_in_range(bind_ip_addr, port_range).expect("Failed to bind")
2742+
bind_common_in_range_with_config(bind_ip_addr, port_range, config)
2743+
.expect("Failed to bind")
27442744
}
27452745
}
2746-
fn bind(bind_ip_addr: IpAddr, port_range: PortRange) -> (u16, UdpSocket) {
2747-
let config = SocketConfig { reuseport: false };
2748-
Self::bind_with_config(bind_ip_addr, port_range, config)
2749-
}
27502746

27512747
fn bind_with_config(
27522748
bind_ip_addr: IpAddr,
@@ -2764,49 +2760,65 @@ impl Node {
27642760
) -> Self {
27652761
let (gossip_port, (gossip, ip_echo)) =
27662762
Self::get_gossip_port(gossip_addr, port_range, bind_ip_addr);
2767-
let (tvu_port, tvu) = Self::bind(bind_ip_addr, port_range);
2768-
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
2769-
let udp_config = SocketConfig { reuseport: false };
2770-
let quic_config = SocketConfig { reuseport: true };
2763+
2764+
let socket_config = SocketConfig::default();
2765+
let socket_config_reuseport = SocketConfig::default().reuseport(true);
2766+
let (tvu_port, tvu) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2767+
let (tvu_quic_port, tvu_quic) =
2768+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
27712769
let ((tpu_port, tpu), (_tpu_quic_port, tpu_quic)) =
27722770
bind_two_in_range_with_offset_and_config(
27732771
bind_ip_addr,
27742772
port_range,
27752773
QUIC_PORT_OFFSET,
2776-
udp_config.clone(),
2777-
quic_config.clone(),
2774+
socket_config,
2775+
socket_config_reuseport,
27782776
)
27792777
.unwrap();
27802778
let tpu_quic: Vec<UdpSocket> =
2781-
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap();
2779+
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, socket_config_reuseport)
2780+
.unwrap();
2781+
27822782
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
27832783
bind_two_in_range_with_offset_and_config(
27842784
bind_ip_addr,
27852785
port_range,
27862786
QUIC_PORT_OFFSET,
2787-
udp_config,
2788-
quic_config.clone(),
2787+
socket_config,
2788+
socket_config_reuseport,
27892789
)
27902790
.unwrap();
27912791
let tpu_forwards_quic = bind_more_with_config(
27922792
tpu_forwards_quic,
27932793
DEFAULT_QUIC_ENDPOINTS,
2794-
quic_config.clone(),
2794+
socket_config_reuseport,
27952795
)
27962796
.unwrap();
2797-
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
2798-
let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range);
2799-
let tpu_vote_quic: Vec<UdpSocket> =
2800-
bind_more_with_config(tpu_vote_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap();
2801-
2802-
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
2803-
let (_, repair) = Self::bind(bind_ip_addr, port_range);
2804-
let (_, repair_quic) = Self::bind(bind_ip_addr, port_range);
2805-
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
2806-
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
2807-
let (_, broadcast) = Self::bind(bind_ip_addr, port_range);
2808-
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
2809-
let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range);
2797+
2798+
let (tpu_vote_port, tpu_vote) =
2799+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2800+
let (tpu_vote_quic_port, tpu_vote_quic) =
2801+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2802+
let tpu_vote_quic: Vec<UdpSocket> = bind_more_with_config(
2803+
tpu_vote_quic,
2804+
DEFAULT_QUIC_ENDPOINTS,
2805+
socket_config_reuseport,
2806+
)
2807+
.unwrap();
2808+
2809+
let (_, retransmit_socket) =
2810+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2811+
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2812+
let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2813+
let (serve_repair_port, serve_repair) =
2814+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2815+
let (serve_repair_quic_port, serve_repair_quic) =
2816+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2817+
let (_, broadcast) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2818+
let (_, ancestor_hashes_requests) =
2819+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2820+
let (_, ancestor_hashes_requests_quic) =
2821+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
28102822

28112823
let rpc_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
28122824
let rpc_pubsub_port = find_available_port_in_range(bind_ip_addr, port_range).unwrap();
@@ -2882,62 +2894,86 @@ impl Node {
28822894
let (gossip_port, (gossip, ip_echo)) =
28832895
Self::get_gossip_port(&gossip_addr, port_range, bind_ip_addr);
28842896

2885-
let (tvu_port, tvu_sockets) =
2886-
multi_bind_in_range(bind_ip_addr, port_range, num_tvu_sockets.get())
2887-
.expect("tvu multi_bind");
2888-
let (tvu_quic_port, tvu_quic) = Self::bind(bind_ip_addr, port_range);
2897+
let socket_config = SocketConfig::default();
2898+
let socket_config_reuseport = SocketConfig::default().reuseport(true);
2899+
2900+
let (tvu_port, tvu_sockets) = multi_bind_in_range_with_config(
2901+
bind_ip_addr,
2902+
port_range,
2903+
socket_config_reuseport,
2904+
num_tvu_sockets.get(),
2905+
)
2906+
.expect("tvu multi_bind");
2907+
2908+
let (tvu_quic_port, tvu_quic) =
2909+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2910+
28892911
let (tpu_port, tpu_sockets) =
2890-
multi_bind_in_range(bind_ip_addr, port_range, 32).expect("tpu multi_bind");
2912+
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 32)
2913+
.expect("tpu multi_bind");
28912914

2892-
let quic_config = SocketConfig { reuseport: true };
28932915
let (_tpu_port_quic, tpu_quic) = Self::bind_with_config(
28942916
bind_ip_addr,
28952917
(tpu_port + QUIC_PORT_OFFSET, tpu_port + QUIC_PORT_OFFSET + 1),
2896-
quic_config.clone(),
2918+
socket_config_reuseport,
28972919
);
28982920
let tpu_quic =
2899-
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap();
2921+
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), socket_config_reuseport)
2922+
.unwrap();
29002923

29012924
let (tpu_forwards_port, tpu_forwards_sockets) =
2902-
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
2925+
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 8)
2926+
.expect("tpu_forwards multi_bind");
29032927

29042928
let (_tpu_forwards_port_quic, tpu_forwards_quic) = Self::bind_with_config(
29052929
bind_ip_addr,
29062930
(
29072931
tpu_forwards_port + QUIC_PORT_OFFSET,
29082932
tpu_forwards_port + QUIC_PORT_OFFSET + 1,
29092933
),
2910-
quic_config.clone(),
2934+
socket_config_reuseport,
29112935
);
29122936
let tpu_forwards_quic = bind_more_with_config(
29132937
tpu_forwards_quic,
29142938
num_quic_endpoints.get(),
2915-
quic_config.clone(),
2939+
socket_config_reuseport,
29162940
)
29172941
.unwrap();
29182942

29192943
let (tpu_vote_port, tpu_vote_sockets) =
2920-
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
2944+
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 1)
2945+
.expect("tpu_vote multi_bind");
29212946

2922-
let (tpu_vote_quic_port, tpu_vote_quic) = Self::bind(bind_ip_addr, port_range);
2947+
let (tpu_vote_quic_port, tpu_vote_quic) =
2948+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
29232949

2924-
let tpu_vote_quic =
2925-
bind_more_with_config(tpu_vote_quic, num_quic_endpoints.get(), quic_config.clone())
2926-
.unwrap();
2950+
let tpu_vote_quic = bind_more_with_config(
2951+
tpu_vote_quic,
2952+
num_quic_endpoints.get(),
2953+
socket_config_reuseport,
2954+
)
2955+
.unwrap();
29272956

29282957
let (_, retransmit_sockets) =
2929-
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("retransmit multi_bind");
2958+
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 8)
2959+
.expect("retransmit multi_bind");
2960+
2961+
let (_, repair) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2962+
let (_, repair_quic) = Self::bind_with_config(bind_ip_addr, port_range, socket_config);
29302963

2931-
let (_, repair) = Self::bind(bind_ip_addr, port_range);
2932-
let (_, repair_quic) = Self::bind(bind_ip_addr, port_range);
2933-
let (serve_repair_port, serve_repair) = Self::bind(bind_ip_addr, port_range);
2934-
let (serve_repair_quic_port, serve_repair_quic) = Self::bind(bind_ip_addr, port_range);
2964+
let (serve_repair_port, serve_repair) =
2965+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2966+
let (serve_repair_quic_port, serve_repair_quic) =
2967+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
29352968

29362969
let (_, broadcast) =
2937-
multi_bind_in_range(bind_ip_addr, port_range, 4).expect("broadcast multi_bind");
2970+
multi_bind_in_range_with_config(bind_ip_addr, port_range, socket_config_reuseport, 4)
2971+
.expect("broadcast multi_bind");
29382972

2939-
let (_, ancestor_hashes_requests) = Self::bind(bind_ip_addr, port_range);
2940-
let (_, ancestor_hashes_requests_quic) = Self::bind(bind_ip_addr, port_range);
2973+
let (_, ancestor_hashes_requests) =
2974+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
2975+
let (_, ancestor_hashes_requests_quic) =
2976+
Self::bind_with_config(bind_ip_addr, port_range, socket_config);
29412977

29422978
let mut info = ContactInfo::new(
29432979
*pubkey,

0 commit comments

Comments
 (0)