Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions net-utils/src/ip_echo_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use tokio_codec::{BytesCodec, Decoder};

pub type IpEchoServer = Runtime;

pub const MAX_PORT_COUNT_PER_MESSAGE: usize = 4;

#[derive(Serialize, Deserialize, Default)]
pub(crate) struct IpEchoServerMessage {
tcp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
udp_ports: [u16; 4], // Fixed size list of ports to avoid vec serde
tcp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
udp_ports: [u16; MAX_PORT_COUNT_PER_MESSAGE], // Fixed size list of ports to avoid vec serde
}

impl IpEchoServerMessage {
Expand Down
235 changes: 184 additions & 51 deletions net-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
use log::*;
use rand::{thread_rng, Rng};
use socket2::{Domain, SockAddr, Socket, Type};
use std::collections::{BTreeMap, BTreeSet};
use std::io::{self, Read, Write};
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::sync::mpsc::channel;
use std::time::Duration;

mod ip_echo_server;
use ip_echo_server::IpEchoServerMessage;
pub use ip_echo_server::{ip_echo_server, IpEchoServer};
pub use ip_echo_server::{ip_echo_server, IpEchoServer, MAX_PORT_COUNT_PER_MESSAGE};

/// A data type representing a public Udp socket
pub struct UdpSocketPair {
Expand Down Expand Up @@ -92,34 +93,36 @@ pub fn get_public_ip_addr(ip_echo_server_addr: &SocketAddr) -> Result<IpAddr, St

// Checks if any of the provided TCP/UDP ports are not reachable by the machine at
// `ip_echo_server_addr`
pub fn verify_reachable_ports(
const DEFAULT_TIMEOUT_SECS: u64 = 5;
const DEFAULT_RETRY_COUNT: usize = 5;

fn do_verify_reachable_ports(
ip_echo_server_addr: &SocketAddr,
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
timeout: u64,
udp_retry_count: usize,
) -> bool {
let udp_ports: Vec<_> = udp_sockets
.iter()
.map(|udp_socket| udp_socket.local_addr().unwrap().port())
.collect();

info!(
"Checking that tcp ports {:?} and udp ports {:?} are reachable from {:?}",
tcp_listeners, udp_ports, ip_echo_server_addr
"Checking that tcp ports {:?} from {:?}",
tcp_listeners, ip_echo_server_addr
);

let tcp_ports: Vec<_> = tcp_listeners.iter().map(|(port, _)| *port).collect();
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&tcp_ports, &udp_ports),
IpEchoServerMessage::new(&tcp_ports, &[]),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));

let mut ok = true;
let timeout = Duration::from_secs(timeout);

// Wait for a connection to open on each TCP port
for (port, tcp_listener) in tcp_listeners {
let (sender, receiver) = channel();
std::thread::spawn(move || {
let listening_addr = tcp_listener.local_addr().unwrap();
let thread_handle = std::thread::spawn(move || {
debug!("Waiting for incoming connection on tcp/{}", port);
match tcp_listener.incoming().next() {
Some(_) => sender
Expand All @@ -128,7 +131,7 @@ pub fn verify_reachable_ports(
None => warn!("tcp incoming failed"),
}
});
match receiver.recv_timeout(Duration::from_secs(5)) {
match receiver.recv_timeout(timeout) {
Ok(_) => {
info!("tcp/{} is reachable", port);
}
Expand All @@ -137,61 +140,127 @@ pub fn verify_reachable_ports(
"Received no response at tcp/{}, check your port configuration: {}",
port, err
);
// Ugh, std rustc doesn't provide acceptng with timeout or restoring original
// nonblocking-status of sockets because of lack of getter, only the setter...
// So, to close the thread cleanly, just connect from here.
// ref: https://github.com/rust-lang/rust/issues/31615
TcpStream::connect_timeout(&listening_addr, timeout).unwrap();
ok = false;
}
}
// ensure to reap the thread
thread_handle.join().unwrap();
}

if !ok {
// No retries for TCP, abort on the first failure
return ok;
}

for _udp_retries in 0..5 {
// Wait for a datagram to arrive at each UDP port
for udp_socket in udp_sockets {
let port = udp_socket.local_addr().unwrap().port();
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
let (sender, receiver) = channel();
std::thread::spawn(move || {
let mut buf = [0; 1];
debug!("Waiting for incoming datagram on udp/{}", port);
match udp_socket.recv(&mut buf) {
Ok(_) => sender
.send(())
.unwrap_or_else(|err| warn!("send failure: {}", err)),
Err(err) => warn!("udp recv failure: {}", err),
}
});
match receiver.recv_timeout(Duration::from_secs(5)) {
Ok(_) => {
info!("udp/{} is reachable", port);
}
Err(err) => {
error!(
"Received no response at udp/{}, check your port configuration: {}",
port, err
);
ok = false;
}
let mut udp_ports: BTreeMap<_, _> = BTreeMap::new();
udp_sockets.iter().for_each(|udp_socket| {
let port = udp_socket.local_addr().unwrap().port();
udp_ports
.entry(port)
.or_insert_with(Vec::new)
.push(udp_socket);
});
let udp_ports: Vec<_> = udp_ports.into_iter().collect();

info!(
"Checking that udp ports {:?} are reachable from {:?}",
udp_ports.iter().map(|(port, _)| port).collect::<Vec<_>>(),
ip_echo_server_addr
);

'outer: for checked_ports_and_sockets in udp_ports.chunks(MAX_PORT_COUNT_PER_MESSAGE) {
ok = false;

for udp_remaining_retry in (0_usize..udp_retry_count).rev() {
let (checked_ports, checked_socket_iter) = (
checked_ports_and_sockets
.iter()
.map(|(port, _)| *port)
.collect::<Vec<_>>(),
checked_ports_and_sockets
.iter()
.map(|(_, sockets)| sockets)
.flatten(),
);

let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&[], &checked_ports),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));

// Spawn threads at once!
let thread_handles: Vec<_> = checked_socket_iter
.map(|udp_socket| {
let port = udp_socket.local_addr().unwrap().port();
let udp_socket = udp_socket.try_clone().expect("Unable to clone udp socket");
std::thread::spawn(move || {
let mut buf = [0; 1];
let original_read_timeout = udp_socket.read_timeout().unwrap();
udp_socket.set_read_timeout(Some(timeout)).unwrap();
let recv_result = udp_socket.recv(&mut buf);
debug!(
"Waited for incoming datagram on udp/{}: {:?}",
port, recv_result
);
udp_socket.set_read_timeout(original_read_timeout).unwrap();
recv_result.map(|_| port).ok()
})
})
.collect();

// Now join threads!
// Separate from the above by collect()-ing as an intermediately step to make the iterator
// eager not lazy so that joining happens here at once after creating bunch of threads
// at once.
let reachable_ports: BTreeSet<_> = thread_handles
.into_iter()
.filter_map(|t| t.join().unwrap())
.collect();

if reachable_ports.len() == checked_ports.len() {
info!(
"checked udp ports: {:?}, reachable udp ports: {:?}",
checked_ports, reachable_ports
);
ok = true;
break;
} else if udp_remaining_retry > 0 {
// Might have lost a UDP packet, retry a couple times
error!(
"checked udp ports: {:?}, reachable udp ports: {:?}",
checked_ports, reachable_ports
);
error!("There are some udp ports with no response!! Retrying...");
} else {
error!("Maximum retry count is reached....");
break 'outer;
}
}
if ok {
break;
}
ok = true;

// Might have lost a UDP packet, retry a couple times
let _ = ip_echo_server_request(
ip_echo_server_addr,
IpEchoServerMessage::new(&[], &udp_ports),
)
.map_err(|err| warn!("ip_echo_server request failed: {}", err));
}

ok
}

pub fn verify_reachable_ports(
ip_echo_server_addr: &SocketAddr,
tcp_listeners: Vec<(u16, TcpListener)>,
udp_sockets: &[&UdpSocket],
) -> bool {
do_verify_reachable_ports(
ip_echo_server_addr,
tcp_listeners,
udp_sockets,
DEFAULT_TIMEOUT_SECS,
DEFAULT_RETRY_COUNT,
)
}

pub fn parse_port_or_addr(optstr: Option<&str>, default_addr: SocketAddr) -> SocketAddr {
if let Some(addrstr) = optstr {
if let Ok(port) = addrstr.parse() {
Expand Down Expand Up @@ -512,7 +581,25 @@ mod tests {
}

#[test]
fn test_get_public_ip_addr() {
fn test_get_public_ip_addr_none() {
solana_logger::setup();
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let (_server_port, (server_udp_socket, server_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

let _runtime = ip_echo_server(server_tcp_listener);

let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();
assert_eq!(
get_public_ip_addr(&server_ip_echo_addr),
parse_host("127.0.0.1"),
);

assert!(verify_reachable_ports(&server_ip_echo_addr, vec![], &[],));
}

#[test]
fn test_get_public_ip_addr_reachable() {
solana_logger::setup();
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let (_server_port, (server_udp_socket, server_tcp_listener)) =
Expand All @@ -534,4 +621,50 @@ mod tests {
&[&client_udp_socket],
));
}

#[test]
fn test_get_public_ip_addr_tcp_unreachable() {
solana_logger::setup();
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let (_server_port, (server_udp_socket, _server_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

// make the socket unreachable by not running the ip echo server!

let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();

let (correct_client_port, (_client_udp_socket, client_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

assert!(!do_verify_reachable_ports(
&server_ip_echo_addr,
vec![(correct_client_port, client_tcp_listener)],
&[],
2,
3,
));
}

#[test]
fn test_get_public_ip_addr_udp_unreachable() {
solana_logger::setup();
let ip_addr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let (_server_port, (server_udp_socket, _server_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

// make the socket unreachable by not running the ip echo server!

let server_ip_echo_addr = server_udp_socket.local_addr().unwrap();

let (_correct_client_port, (client_udp_socket, _client_tcp_listener)) =
bind_common_in_range(ip_addr, (3200, 3250)).unwrap();

assert!(!do_verify_reachable_ports(
&server_ip_echo_addr,
vec![],
&[&client_udp_socket],
2,
3,
));
}
}
12 changes: 11 additions & 1 deletion validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,17 @@ pub fn main() {
}

if let Some(ref cluster_entrypoint) = cluster_entrypoint {
let udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair];
let mut udp_sockets = vec![
&node.sockets.gossip,
&node.sockets.repair,
&node.sockets.serve_repair,
];
udp_sockets.extend(node.sockets.tpu.iter());
udp_sockets.extend(node.sockets.tpu_forwards.iter());
udp_sockets.extend(node.sockets.tvu.iter());
udp_sockets.extend(node.sockets.tvu_forwards.iter());
udp_sockets.extend(node.sockets.broadcast.iter());
udp_sockets.extend(node.sockets.retransmit_sockets.iter());

let mut tcp_listeners = vec![];
if !private_rpc {
Expand Down