diff --git a/Cargo.lock b/Cargo.lock index 47b646cd2ae..af5b25b7087 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3128,8 +3128,7 @@ dependencies = [ [[package]] name = "discv5" version = "0.10.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c7999df38d0bd8f688212e1a4fae31fd2fea6d218649b9cd7c40bf3ec1318fc" +source = "git+https://github.com/sigp/discv5?rev=7663c00#7663c00ee0837ea98547caaedede95d9d6736f4d" dependencies = [ "aes", "aes-gcm", @@ -8941,6 +8940,7 @@ dependencies = [ "secp256k1 0.30.0", "serde", "smallvec", + "socket2", "thiserror 2.0.18", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 2b58f61a9b7..ce5ca60a515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -581,7 +581,7 @@ tower = "0.5" tower-http = "0.6" # p2p -discv5 = "0.10" +discv5 = { git = "https://github.com/sigp/discv5", rev = "7663c00" } if-addrs = "0.14" # rpc diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index 7c73fd2cebf..0d9f0bc5315 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -47,9 +47,7 @@ use secp256k1::SecretKey; use std::{ cell::RefCell, collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque}, - fmt, - future::poll_fn, - io, + fmt, io, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, pin::Pin, rc::Rc, @@ -243,17 +241,56 @@ impl Discv4 { /// ``` pub async fn bind( local_address: SocketAddr, + local_node_record: NodeRecord, + secret_key: SecretKey, + config: Discv4Config, + ) -> io::Result<(Self, Discv4Service)> { + let socket = Arc::new(UdpSocket::bind(local_address).await?); + trace!(target: "discv4", local_addr=?socket.local_addr(), "opened UDP socket"); + let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer); + + Self::bind_with_socket(socket, Some(tx), rx, local_node_record, secret_key, config) + } + + /// Creates a new `Discv4` instance using a pre-bound shared socket. No receive loop is + /// spawned; instead returns an [`IngressHandler`] that should be used to forward raw packets + /// received by the socket owner (e.g. discv5 unrecognized frames). + pub fn bind_shared( + socket: Arc, + local_node_record: NodeRecord, + secret_key: SecretKey, + config: Discv4Config, + ) -> io::Result<(Self, Discv4Service, IngressHandler)> { + let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer); + let local_id = local_node_record.id; + let (discv4, service) = + Self::bind_with_socket(socket, None, rx, local_node_record, secret_key, config)?; + + let handler = IngressHandler::new(tx, local_id); + + Ok((discv4, service, handler)) + } + + fn bind_with_socket( + socket: Arc, + ingress_tx: Option, + ingress_rx: IngressReceiver, mut local_node_record: NodeRecord, secret_key: SecretKey, config: Discv4Config, ) -> io::Result<(Self, Discv4Service)> { - let socket = UdpSocket::bind(local_address).await?; let local_addr = socket.local_addr()?; local_node_record.udp_port = local_addr.port(); - trace!(target: "discv4", ?local_addr,"opened UDP socket"); - let mut service = - Discv4Service::new(socket, local_addr, local_node_record, secret_key, config); + let mut service = Discv4Service::new( + socket, + ingress_tx, + ingress_rx, + local_addr, + local_node_record, + secret_key, + config, + ); // resolve the external address immediately service.resolve_external_ip(); @@ -520,20 +557,25 @@ pub struct Discv4Service { impl Discv4Service { /// Create a new instance for a bound [`UdpSocket`]. + /// + /// If `ingress_tx` is `Some`, the receive loop is spawned to read from the socket. If `None`, + /// the caller feeds packets into `ingress_rx` externally (shared socket mode). pub(crate) fn new( - socket: UdpSocket, + socket: Arc, + ingress_tx: Option, + ingress_rx: IngressReceiver, local_address: SocketAddr, local_node_record: NodeRecord, secret_key: SecretKey, config: Discv4Config, ) -> Self { - let socket = Arc::new(socket); - let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer); let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer); let mut tasks = JoinSet::<()>::new(); - let udp = Arc::clone(&socket); - tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id)); + if let Some(ingress_tx) = ingress_tx { + let udp = Arc::clone(&socket); + tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id)); + } let udp = Arc::clone(&socket); tasks.spawn(send_loop(udp, egress_rx)); @@ -947,7 +989,7 @@ impl Discv4Service { let key = kad_key(peer_id); match self.kbuckets.entry(&key) { BucketEntry::Present(entry, _) => Some(f(entry.value())), - BucketEntry::Pending(mut entry, _) => Some(f(entry.value())), + BucketEntry::Pending(entry, _) => Some(f(entry.value())), _ => None, } } @@ -973,7 +1015,9 @@ impl Discv4Service { kbucket::Entry::Present(mut entry, _) => { entry.value_mut().update_with_enr(last_enr_seq) } - kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq), + kbucket::Entry::Pending(mut entry, _) => { + entry.value_mut().update_with_enr(last_enr_seq) + } _ => return, }; @@ -1025,8 +1069,8 @@ impl Discv4Service { } kbucket::Entry::Pending(mut entry, mut status) => { // endpoint is now proven - entry.value().establish_proof(); - entry.value().update_with_enr(last_enr_seq); + entry.value_mut().establish_proof(); + entry.value_mut().update_with_enr(last_enr_seq); if !status.is_connected() { status.state = ConnectionState::Connected; @@ -1158,7 +1202,7 @@ impl Discv4Service { } else { is_proven = entry.value().has_endpoint_proof; } - entry.value().update_with_enr(ping.enr_sq) + entry.value_mut().update_with_enr(ping.enr_sq) } kbucket::Entry::Absent(entry) => { let mut node = NodeEntry::new(record); @@ -1388,7 +1432,7 @@ impl Discv4Service { (entry.value().record, id) } kbucket::Entry::Pending(mut entry, _) => { - let id = entry.value().update_with_fork_id(fork_id); + let id = entry.value_mut().update_with_fork_id(fork_id); (entry.value().record, id) } _ => return, @@ -1538,7 +1582,7 @@ impl Discv4Service { } } } - BucketEntry::Pending(mut entry, _) => { + BucketEntry::Pending(entry, _) => { if entry.value().has_endpoint_proof { if entry .value() @@ -1642,7 +1686,7 @@ impl Discv4Service { entry.value().find_node_failures } kbucket::Entry::Pending(mut entry, _) => { - entry.value().inc_failed_request(); + entry.value_mut().inc_failed_request(); entry.value().find_node_failures } _ => continue, @@ -1962,80 +2006,100 @@ const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize; /// Continuously awaits new incoming messages and sends them back through the channel. /// -/// The receive loop enforce primitive rate limiting for ips to prevent message spams from -/// individual IPs +/// The receive loop enforces primitive rate limiting for IPs to prevent message spams from +/// individual IPs. pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_id: PeerId) { - let send = |event: IngressEvent| async { - let _ = tx.send(event).await.map_err(|err| { - debug!( - target: "discv4", - %err, - "failed send incoming packet", - ) - }); - }; - - let mut cache = ReceiveCache::default(); - - // tick at half the rate of the limit - let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2; - let mut interval = tokio::time::interval(Duration::from_secs(tick as u64)); - + let mut handler = IngressHandler::new(tx, local_id); let mut buf = [0; MAX_PACKET_SIZE]; loop { let res = udp.recv_from(&mut buf).await; match res { Err(err) => { debug!(target: "discv4", %err, "Failed to read datagram."); - send(IngressEvent::RecvError(err)).await; + handler.send(IngressEvent::RecvError(err)).await; } Ok((read, remote_addr)) => { - // rate limit incoming packets by IP - if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP { - trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP."); - continue - } + handler.handle_packet(&buf[..read], remote_addr).await; + } + } + } +} - let packet = &buf[..read]; - match Message::decode(packet) { - Ok(packet) => { - if packet.node_id == local_id { - // received our own message - debug!(target: "discv4", ?remote_addr, "Received own packet."); - continue - } +/// Handles decoding, rate-limiting, and deduplication of incoming discv4 packets. +/// +/// Used by both the standalone receive loop and the shared-port mode via +/// [`Discv4::bind_shared`]. +#[derive(Debug)] +pub struct IngressHandler { + tx: IngressSender, + local_id: PeerId, + tick: usize, + tick_interval: Duration, + cache: ReceiveCache, + last_tick: Instant, +} - // skip if we've already received the same packet - if cache.contains_packet(packet.hash) { - debug!(target: "discv4", ?remote_addr, "Received duplicate packet."); - continue - } +impl IngressHandler { + fn new(tx: IngressSender, local_id: PeerId) -> Self { + let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2; + Self { + tx, + local_id, + tick, + tick_interval: Duration::from_secs(tick as u64), + cache: ReceiveCache::default(), + last_tick: Instant::now(), + } + } - send(IngressEvent::Packet(remote_addr, packet)).await; - } - Err(err) => { - trace!(target: "discv4", %err,"Failed to decode packet"); - send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await - } - } - } + async fn send(&self, event: IngressEvent) { + let _ = self.tx.send(event).await.map_err(|err| { + debug!(target: "discv4", %err, "failed send incoming packet"); + }); + } + + /// Handles an incoming raw packet: decodes, rate-limits, deduplicates, and forwards to the + /// discv4 service. Used in shared-port mode to process unrecognized frames from discv5. + pub async fn handle_packet(&mut self, data: &[u8], src: SocketAddr) { + if self.last_tick.elapsed() >= self.tick_interval { + self.cache.tick_ips(self.tick); + self.last_tick = Instant::now(); } - // reset the tracked ips if the interval has passed - if poll_fn(|cx| match interval.poll_tick(cx) { - Poll::Ready(_) => Poll::Ready(true), - Poll::Pending => Poll::Ready(false), - }) - .await - { - cache.tick_ips(tick); + // rate limit incoming packets by IP + if self.cache.inc_ip(src.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP { + trace!(target: "discv4", ?src, "Too many incoming packets from IP."); + return } + + let event = match Message::decode(data) { + Ok(packet) => { + if packet.node_id == self.local_id { + debug!(target: "discv4", ?src, "Received own packet."); + return + } + + if self.cache.contains_packet(packet.hash) { + debug!(target: "discv4", ?src, "Received duplicate packet."); + return + } + + IngressEvent::Packet(src, packet) + } + Err(err) => { + trace!(target: "discv4", %err, "Failed to decode packet"); + IngressEvent::BadPacket(src, err, data.to_vec()) + } + }; + + self.send(event).await; } } /// A cache for received packets and their source address. /// /// This is used to discard duplicated packets and rate limit messages from the same source. +#[derive(Debug)] struct ReceiveCache { /// keeps track of how many messages we've received from a given IP address since the last /// tick. diff --git a/crates/net/discv5/src/config.rs b/crates/net/discv5/src/config.rs index c5677544416..1e07986ea6b 100644 --- a/crates/net/discv5/src/config.rs +++ b/crates/net/discv5/src/config.rs @@ -308,6 +308,18 @@ impl Config { } } + /// Returns a mutable reference to the inner [`discv5::Config`]. This allows overriding + /// the listen config after the config has been built. + pub const fn discv5_config_mut(&mut self) -> &mut discv5::Config { + &mut self.discv5_config + } + + /// Returns `true` if any socket in the discv5 listen config matches the given address. + pub fn has_matching_socket(&self, addr: SocketAddr) -> bool { + ipv4(&self.discv5_config.listen_config).is_some_and(|v4| SocketAddr::V4(v4) == addr) || + ipv6(&self.discv5_config.listen_config).is_some_and(|v6| SocketAddr::V6(v6) == addr) + } + /// Inserts a new boot node to the list of boot nodes. pub fn insert_boot_node(&mut self, boot_node: BootNode) { self.bootstrap_nodes.insert(boot_node); @@ -333,11 +345,11 @@ impl Config { /// socket, if both IPv4 and v6 are configured. This socket will be advertised to peers in the /// local [`Enr`](discv5::enr::Enr). pub fn discovery_socket(&self) -> SocketAddr { - match self.discv5_config.listen_config { - ListenConfig::Ipv4 { ip, port } => (ip, port).into(), - ListenConfig::Ipv6 { ip, port } => (ip, port).into(), - ListenConfig::DualStack { ipv6, ipv6_port, .. } => (ipv6, ipv6_port).into(), - } + // Prefer v6 when both are configured (matches original `DualStack` behavior). + ipv6(&self.discv5_config.listen_config) + .map(SocketAddr::V6) + .or_else(|| ipv4(&self.discv5_config.listen_config).map(SocketAddr::V4)) + .unwrap_or_else(|| SocketAddr::from((std::net::Ipv4Addr::UNSPECIFIED, 0))) } /// Returns the `RLPx` (TCP) socket contained in the [`discv5::Config`]. This socket will be @@ -348,24 +360,32 @@ impl Config { } /// Returns the IPv4 discovery socket if one is configured. -pub const fn ipv4(listen_config: &ListenConfig) -> Option { +pub fn ipv4(listen_config: &ListenConfig) -> Option { match listen_config { ListenConfig::Ipv4 { ip, port } | ListenConfig::DualStack { ipv4: ip, ipv4_port: port, .. } => { Some(SocketAddrV4::new(*ip, *port)) } - ListenConfig::Ipv6 { .. } => None, + ListenConfig::FromSockets { ipv4: Some(s), .. } => match s.local_addr().ok()? { + SocketAddr::V4(addr) => Some(addr), + SocketAddr::V6(_) => None, + }, + _ => None, } } /// Returns the IPv6 discovery socket if one is configured. -pub const fn ipv6(listen_config: &ListenConfig) -> Option { +pub fn ipv6(listen_config: &ListenConfig) -> Option { match listen_config { - ListenConfig::Ipv4 { .. } => None, ListenConfig::Ipv6 { ip, port } | ListenConfig::DualStack { ipv6: ip, ipv6_port: port, .. } => { Some(SocketAddrV6::new(*ip, *port, 0, 0)) } + ListenConfig::FromSockets { ipv6: Some(s), .. } => match s.local_addr().ok()? { + SocketAddr::V6(addr) => Some(addr), + SocketAddr::V4(_) => None, + }, + _ => None, } } diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index 83b37e056a3..5472e3c161a 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -18,7 +18,6 @@ use std::{ use ::enr::Enr; use alloy_primitives::bytes::Bytes; -use discv5::ListenConfig; use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper}; use futures::future::join_all; use itertools::Itertools; @@ -247,7 +246,9 @@ impl Discv5 { match update { discv5::Event::SocketUpdated(_) | discv5::Event::TalkRequest(_) | // `Discovered` not unique discovered peers - discv5::Event::Discovered(_) => None, + discv5::Event::Discovered(_) | + // Unrecognized frames are handled separately by the discovery layer + discv5::Event::UnrecognizedFrame(_) => None, discv5::Event::NodeInserted { .. } => { // node has been inserted into kbuckets @@ -472,39 +473,33 @@ pub fn build_local_enr( let Config { discv5_config, fork, tcp_socket, other_enr_kv_pairs, .. } = config; - let socket = match discv5_config.listen_config { - ListenConfig::Ipv4 { ip, port } => { - if ip != Ipv4Addr::UNSPECIFIED { - builder.ip4(ip); - } - builder.udp4(port); - builder.tcp4(tcp_socket.port()); + let socket = { + let v4 = crate::config::ipv4(&discv5_config.listen_config); + let v6 = crate::config::ipv6(&discv5_config.listen_config); - (ip, port).into() - } - ListenConfig::Ipv6 { ip, port } => { - if ip != Ipv6Addr::UNSPECIFIED { - builder.ip6(ip); + if let Some(addr) = v4 { + if *addr.ip() != Ipv4Addr::UNSPECIFIED { + builder.ip4(*addr.ip()); } - builder.udp6(port); - builder.tcp6(tcp_socket.port()); - - (ip, port).into() + builder.udp4(addr.port()); } - ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => { - if ipv4 != Ipv4Addr::UNSPECIFIED { - builder.ip4(ipv4); + if let Some(addr) = v6 { + if *addr.ip() != Ipv6Addr::UNSPECIFIED { + builder.ip6(*addr.ip()); } - builder.udp4(ipv4_port); + builder.udp6(addr.port()); + } + // Advertise tcp4 when v4 is configured, else tcp6. + if v4.is_some() { builder.tcp4(tcp_socket.port()); - - if ipv6 != Ipv6Addr::UNSPECIFIED { - builder.ip6(ipv6); - } - builder.udp6(ipv6_port); - - (ipv6, ipv6_port).into() + } else if v6.is_some() { + builder.tcp6(tcp_socket.port()); } + + // Prefer v6 when both are configured + v6.map(SocketAddr::V6) + .or_else(|| v4.map(SocketAddr::V4)) + .unwrap_or_else(|| SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0))) }; let rlpx_ip_mode = if tcp_socket.is_ipv4() { IpMode::Ip4 } else { IpMode::Ip6 }; @@ -711,6 +706,7 @@ mod test { #![allow(deprecated)] use super::*; use ::enr::{CombinedKey, EnrKey}; + use discv5::ListenConfig; use rand_08::thread_rng; use reth_chainspec::MAINNET; use std::{ diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index b005f0d4a3a..298d93abced 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -50,6 +50,7 @@ reth-ethereum-primitives.workspace = true futures.workspace = true pin-project.workspace = true tokio = { workspace = true, features = ["io-util", "net", "macros", "rt-multi-thread", "time"] } +socket2 = { workspace = true, features = ["all"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["codec"] } diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 715194ff6b1..fc5dc54ae27 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -23,7 +23,7 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{net::UdpSocket, sync::mpsc, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, Stream}; use tracing::{debug, trace}; @@ -54,6 +54,9 @@ pub struct Discovery { discv5: Option, /// All KAD table updates from the discv5 service. discv5_updates: Option>, + /// Background task that, in shared-port mode, drains `UnrecognizedFrame`s from discv5 and + /// feeds them into the discv4 ingress so packets advance without polling `Discovery`. + _discv5_forwarder: Option>, /// Handler to interact with the DNS discovery service _dns_discovery: Option, /// Updates from the DNS discovery service. @@ -76,39 +79,138 @@ impl Discovery { discovery_v4_addr: SocketAddr, sk: SecretKey, discv4_config: Option, - discv5_config: Option, // contains discv5 listen address + mut discv5_config: Option, // contains discv5 listen address dns_discovery_config: Option, ) -> Result { // setup discv4 with the discovery address and tcp port let local_enr = NodeRecord::from_secret_key(discovery_v4_addr, &sk).with_tcp_port(tcp_addr.port()); - let discv4_future = async { - let Some(disc_config) = discv4_config else { return Ok((None, None, None)) }; - let (discv4, mut discv4_service) = - Discv4::bind(discovery_v4_addr, local_enr, sk, disc_config).await.map_err( - |err| { + // For IPv6 we set IPV6_V6ONLY=true so an IPv4 sibling socket on the same port doesn't + // clash with the IPv6 one (Linux's default of V6ONLY=0 has IPv6 also claim the IPv4 + // port via mapped addresses), matching how discv5 binds its `DualStack` sockets. + let bind_socket = async |addr: SocketAddr| { + let result = match addr { + SocketAddr::V4(_) => UdpSocket::bind(addr).await, + SocketAddr::V6(_) => { + use socket2::{Domain, Protocol, Socket, Type}; + (|| { + let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?; + socket.set_only_v6(true)?; + socket.set_nonblocking(true)?; + socket.bind(&addr.into())?; + UdpSocket::from_std(socket.into()) + })() + } + }; + result + .map(Arc::new) + .map_err(|err| NetworkError::from_io_error(err, ServiceKind::Discovery(addr))) + }; + + // In shared-port mode, bind the shared socket and start discv4 without its own receive + // loop. Unrecognized frames from discv5 will be forwarded to the ingress handler. + let (discv4, discv4_updates, _discv4_service, discv4_ingress, shared_socket) = + if let Some(config) = discv4_config { + if let Some(discv5_config) = &mut discv5_config && + discv5_config.has_matching_socket(discovery_v4_addr) + { + let socket = bind_socket(discovery_v4_addr).await?; + + let (discv4, mut discv4_service, ingress) = Discv4::bind_shared( + socket.clone(), + local_enr, + sk, + config, + ) + .map_err(|err| { NetworkError::from_io_error(err, ServiceKind::Discovery(discovery_v4_addr)) - }, - )?; - let discv4_updates = discv4_service.update_stream(); - // spawn the service - let discv4_service = discv4_service.spawn(); + })?; + + let discv4_updates = discv4_service.update_stream(); + let discv4_service = discv4_service.spawn(); + debug!(target:"net", ?discovery_v4_addr, "started discovery v4 (shared port)"); + ( + Some(discv4), + Some(discv4_updates), + Some(discv4_service), + Some(ingress), + Some(socket), + ) + } else { + let (discv4, mut discv4_service) = + Discv4::bind(discovery_v4_addr, local_enr, sk, config).await.map_err( + |err| { + NetworkError::from_io_error( + err, + ServiceKind::Discovery(discovery_v4_addr), + ) + }, + )?; + let discv4_updates = discv4_service.update_stream(); + // spawn the service + let discv4_service = discv4_service.spawn(); + + debug!(target:"net", ?discovery_v4_addr, "started discovery v4"); + + (Some(discv4), Some(discv4_updates), Some(discv4_service), None, None) + } + } else { + (None, None, None, None, None) + }; - debug!(target:"net", ?discovery_v4_addr, "started discovery v4"); + // Start discv5, wiring in the shared socket if in shared-port mode. + let (discv5, discv5_updates) = if let Some(mut config) = discv5_config { + if let Some(socket) = shared_socket { + let discv5_cfg = config.discv5_config_mut(); + + // The shared socket covers discv4's address family; bind the opposite family + // only if discv5 was configured for dual-stack. + let (mut ipv4, mut ipv6) = (None, None); + if discovery_v4_addr.is_ipv4() { + ipv4 = Some(socket); + if let Some(addr) = reth_discv5::config::ipv6(&discv5_cfg.listen_config) { + ipv6 = Some(bind_socket(SocketAddr::V6(addr)).await?); + } + } else { + ipv6 = Some(socket); + if let Some(addr) = reth_discv5::config::ipv4(&discv5_cfg.listen_config) { + ipv4 = Some(bind_socket(SocketAddr::V4(addr)).await?); + } + } - Ok((Some(discv4), Some(discv4_updates), Some(discv4_service))) - }; + discv5_cfg.listen_config = discv5::ListenConfig::FromSockets { ipv4, ipv6 }; + } - let discv5_future = async { - let Some(config) = discv5_config else { return Ok::<_, NetworkError>((None, None)) }; let (discv5, discv5_updates) = Discv5::start(&sk, config).await?; - debug!(target:"net", discovery_v5_enr=? discv5.local_enr(), "started discovery v5"); - Ok((Some(discv5), Some(discv5_updates.into()))) + debug!(target:"net", discovery_v5_enr=?discv5.local_enr(), "started discovery v5"); + (Some(discv5), Some(discv5_updates)) + } else { + (None, None) }; - let ((discv4, discv4_updates, _discv4_service), (discv5, discv5_updates)) = - tokio::try_join!(discv4_future, discv5_future)?; + // In shared-port mode, spawn a task that peels `UnrecognizedFrame` events off the discv5 + // update stream and feeds them into discv4's ingress. Other events are forwarded through + // a new channel that `Discovery::poll` reads. This keeps both protocols moving without + // requiring the main `Discovery::poll` loop to be driven for packets to be routed. + let (discv5_updates, _discv5_forwarder) = match (discv4_ingress, discv5_updates) { + (Some(mut ingress), Some(mut updates)) => { + let (tx, rx) = mpsc::channel(updates.max_capacity()); + let handle = tokio::spawn(async move { + while let Some(event) = updates.recv().await { + if let discv5::Event::UnrecognizedFrame(frame) = &event { + ingress.handle_packet(&frame.packet, frame.src_address).await; + continue; + } + if tx.send(event).await.is_err() { + break; + } + } + }); + (Some(ReceiverStream::new(rx)), Some(handle)) + } + (_, updates) => (updates.map(ReceiverStream::new), None), + }; // setup DNS discovery let (_dns_discovery, dns_discovery_updates, _dns_disc_service) = @@ -132,6 +234,7 @@ impl Discovery { _discv4_service, discv5, discv5_updates, + _discv5_forwarder, discovered_nodes: LruMap::new(DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE), queued_events: Default::default(), _dns_disc_service, @@ -309,6 +412,9 @@ impl Drop for Discovery { if let Some(handle) = self._discv4_service.take() { handle.abort(); } + if let Some(handle) = self._discv5_forwarder.take() { + handle.abort(); + } if let Some(handle) = self._dns_disc_service.take() { handle.abort(); } @@ -342,10 +448,11 @@ impl Discovery { }, discv4: Default::default(), discv4_updates: Default::default(), + _discv4_service: Default::default(), + _discv5_forwarder: None, discv5: None, discv5_updates: None, queued_events: Default::default(), - _discv4_service: Default::default(), _dns_discovery: None, dns_discovery_updates: None, _dns_disc_service: None, @@ -487,4 +594,179 @@ mod tests { assert_eq!(1, node_1.discovered_nodes.len()); assert_eq!(1, node_2.discovered_nodes.len()); } + + /// Starts a discovery node with discv4 and discv5 sharing the same UDP port. + async fn start_shared_port_node(port: u16) -> Discovery { + let secret_key = SecretKey::new(&mut rand_08::thread_rng()); + let disc_addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap(); + // Use a non-zero TCP port so the node record isn't filtered out by + // `on_node_record_update` (which drops peers with tcp port == 0). + let tcp_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap(); + + let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build(); + + let discv5_listen_config = discv5::ListenConfig::from(disc_addr); + let discv5_config = reth_discv5::Config::builder(tcp_addr) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + // Both protocols use the same address, triggering shared-port mode + Discovery::new( + tcp_addr, + disc_addr, + secret_key, + Some(discv4_config), + Some(discv5_config), + None, + ) + .await + .expect("should start with shared port") + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_shared_port_setup() { + reth_tracing::init_test_tracing(); + + // Use port 0 so the OS picks a free port + let node = start_shared_port_node(0).await; + + // Both protocols should be active + assert!(node.discv4.is_some(), "discv4 should be running"); + assert!(node.discv5.is_some(), "discv5 should be running"); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_shared_port_discv5_discovery() { + reth_tracing::init_test_tracing(); + + let mut node_1 = start_shared_port_node(0).await; + let mut node_2 = start_shared_port_node(0).await; + + let discv5_enr_1 = node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr()); + let discv5_enr_2 = node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr()); + + let peer_id_1 = enr_to_discv4_id(&discv5_enr_1).unwrap(); + let peer_id_2 = enr_to_discv4_id(&discv5_enr_2).unwrap(); + + // Add node_2's ENR to node_1's discv5 kbuckets and trigger a ping to establish a session. + // send_ping awaits the PONG, so the handshake completes before we poll the Discovery + // stream. The discv5 service runs its own background task. + node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_2.clone()).into()).unwrap(); + node_1 + .discv5 + .as_ref() + .unwrap() + .with_discv5(|discv5| discv5.send_ping(discv5_enr_2)) + .await + .unwrap(); + + // Both SessionEstablished events should now be buffered in the update channels. + // Drive both nodes concurrently to collect them. + let mut event_1 = None; + let mut event_2 = None; + let timeout = tokio::time::sleep(std::time::Duration::from_secs(5)); + tokio::pin!(timeout); + loop { + tokio::select! { + ev = node_1.next(), if event_1.is_none() => { + event_1 = ev; + } + ev = node_2.next(), if event_2.is_none() => { + event_2 = ev; + } + _ = &mut timeout => { + panic!("timed out waiting for discv5 discovery events"); + } + } + if event_1.is_some() && event_2.is_some() { + break; + } + } + + assert!(matches!( + event_1.unwrap(), + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, .. }) + if peer_id == peer_id_2 + )); + assert!(matches!( + event_2.unwrap(), + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, .. }) + if peer_id == peer_id_1 + )); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_shared_port_discv4_discovery() { + reth_tracing::init_test_tracing(); + + let mut node_1 = start_shared_port_node(0).await; + let mut node_2 = start_shared_port_node(0).await; + + let enr_1 = node_1.discv4.as_ref().unwrap().node_record(); + let enr_2 = node_2.discv4.as_ref().unwrap().node_record(); + + // Introduce node_2 to node_1 via discv4 + node_1.add_discv4_node(enr_2); + + // Both nodes should discover each other via discv4 ping/pong + let event_1 = node_1.next().await.unwrap(); + let event_2 = node_2.next().await.unwrap(); + + assert_eq!( + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { + peer_id: enr_2.id, + addr: PeerAddr::new(enr_2.tcp_addr(), Some(enr_2.udp_addr())), + fork_id: None + }), + event_1 + ); + assert_eq!( + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { + peer_id: enr_1.id, + addr: PeerAddr::new(enr_1.tcp_addr(), Some(enr_1.udp_addr())), + fork_id: None + }), + event_2 + ); + } + + /// Verifies that shared-port mode binds correctly when discv5 is configured for dual-stack. + /// On Linux this exercises the IPv6 V6ONLY path: without it, the IPv4 sibling would clash + /// with the IPv6 socket bound to the same port. + #[tokio::test(flavor = "multi_thread")] + async fn test_shared_port_dual_stack() { + reth_tracing::init_test_tracing(); + + // Find a port that's free on the v4 wildcard so we can use it for both v4 and v6. + let probe = UdpSocket::bind("0.0.0.0:0").await.expect("probe bind"); + let port = probe.local_addr().unwrap().port(); + drop(probe); + + let secret_key = SecretKey::new(&mut rand_08::thread_rng()); + let v4_addr: SocketAddr = format!("0.0.0.0:{port}").parse().unwrap(); + let tcp_addr: SocketAddr = "0.0.0.0:30303".parse().unwrap(); + + let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build(); + + let discv5_listen_config = discv5::ListenConfig::DualStack { + ipv4: std::net::Ipv4Addr::UNSPECIFIED, + ipv4_port: port, + ipv6: std::net::Ipv6Addr::UNSPECIFIED, + ipv6_port: port, + }; + let discv5_config = reth_discv5::Config::builder(tcp_addr) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + Discovery::new( + tcp_addr, + v4_addr, + secret_key, + Some(discv4_config), + Some(discv5_config), + None, + ) + .await + .expect("discovery should start with shared port + dual-stack"); + } } diff --git a/crates/net/network/tests/it/startup.rs b/crates/net/network/tests/it/startup.rs index b3471f67e78..46fb714d2aa 100644 --- a/crates/net/network/tests/it/startup.rs +++ b/crates/net/network/tests/it/startup.rs @@ -4,7 +4,7 @@ use std::{ }; use reth_chainspec::MAINNET; -use reth_discv4::{Discv4Config, NatResolver, DEFAULT_DISCOVERY_ADDR, DEFAULT_DISCOVERY_PORT}; +use reth_discv4::{Discv4Config, NatResolver, DEFAULT_DISCOVERY_ADDR}; use reth_network::{ error::{NetworkError, ServiceKind}, Discovery, NetworkConfigBuilder, NetworkManager, @@ -73,27 +73,31 @@ async fn test_discovery_addr_in_use() { } #[tokio::test(flavor = "multi_thread")] -async fn test_discv5_and_discv4_same_socket_fails() { +async fn test_discv5_and_discv4_same_socket_ok() { + // Pick a free port for the shared UDP discovery socket and TCP RLPx listener. + let test_port: u16 = TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind to a port") + .local_addr() + .unwrap() + .port(); + let secret_key = SecretKey::new(&mut rand_08::thread_rng()); let config = NetworkConfigBuilder::eth(secret_key, Runtime::test()) - .listener_port(DEFAULT_DISCOVERY_PORT) + .listener_port(test_port) + .discovery_port(test_port) .discovery_v5( - reth_discv5::Config::builder((DEFAULT_DISCOVERY_ADDR, DEFAULT_DISCOVERY_PORT).into()) - .discv5_config( - discv5::ConfigBuilder::new(discv5::ListenConfig::from_ip( - DEFAULT_DISCOVERY_ADDR, - DEFAULT_DISCOVERY_PORT, - )) - .build(), - ), + reth_discv5::Config::builder((DEFAULT_DISCOVERY_ADDR, test_port).into()).discv5_config( + discv5::ConfigBuilder::new(discv5::ListenConfig::from_ip( + DEFAULT_DISCOVERY_ADDR, + test_port, + )) + .build(), + ), ) .disable_dns_discovery() .build(NoopProvider::default()); - let addr = config.listener_addr; - let result = NetworkManager::new(config).await; - let err = result.err().unwrap(); - - assert!(is_addr_in_use_kind(&err, ServiceKind::Listener(addr)), "{err:?}") + let _network = NetworkManager::new(config).await.expect("shared port discovery should start"); } #[tokio::test(flavor = "multi_thread")] diff --git a/deny.toml b/deny.toml index e54240b4936..7f792c3016f 100644 --- a/deny.toml +++ b/deny.toml @@ -94,4 +94,5 @@ allow-git = [ "https://github.com/alloy-rs/hardforks", "https://github.com/paradigmxyz/jsonrpsee", "https://github.com/DaniPopes/slotmap.git", + "https://github.com/sigp/discv5", ]