diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index 2b5a1432057..83b37e056a3 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -225,7 +225,7 @@ impl Discv5 { bootstrap_lookup_interval, bootstrap_lookup_countdown, metrics.clone(), - discv5.clone(), + Arc::downgrade(&discv5), ); Ok(( @@ -573,14 +573,19 @@ pub fn spawn_populate_kbuckets_bg( bootstrap_lookup_interval: u64, bootstrap_lookup_countdown: u64, metrics: Discv5Metrics, - discv5: Arc, + discv5: std::sync::Weak, ) { - let local_node_id = discv5.local_enr().node_id(); let lookup_interval = Duration::from_secs(lookup_interval); let metrics = metrics.discovered_peers; let mut kbucket_index = MAX_KBUCKET_INDEX; let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval); task::spawn(async move { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; + let local_node_id = discv5_handle.local_enr().node_id(); + drop(discv5_handle); + // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest // log2distance from local node for i in (0..bootstrap_lookup_countdown).rev() { @@ -593,7 +598,12 @@ pub fn spawn_populate_kbuckets_bg( "starting bootstrap boost lookup query" ); - lookup(target, &discv5, &metrics).await; + { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; + lookup(target, &discv5_handle, &metrics).await; + } tokio::time::sleep(pulse_lookup_interval).await; } @@ -610,7 +620,12 @@ pub fn spawn_populate_kbuckets_bg( "starting periodic lookup query" ); - lookup(target, &discv5, &metrics).await; + { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; + lookup(target, &discv5_handle, &metrics).await; + } if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX { // try to populate bucket one step closer @@ -698,6 +713,10 @@ mod test { use ::enr::{CombinedKey, EnrKey}; use rand_08::thread_rng; use reth_chainspec::MAINNET; + use std::{ + net::UdpSocket, + time::{Duration, Instant}, + }; use tracing::trace; fn discv5_noop() -> Discv5 { @@ -736,6 +755,61 @@ mod test { Discv5::start(&secret_key, discv5_config).await.expect("should build discv5") } + async fn start_discovery_node_with_key( + secret_key: &SecretKey, + udp_port_discv5: u16, + ) -> Result<(Discv5, mpsc::Receiver), Error> { + let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap(); + let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap(); + + let discv5_listen_config = ListenConfig::from(discv5_addr); + let discv5_config = Config::builder(rlpx_addr) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + Discv5::start(secret_key, discv5_config).await + } + + fn unused_udp_port() -> u16 { + UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port() + } + + async fn wait_for_udp_port_release(port: u16, timeout: Duration) { + let deadline = Instant::now() + timeout; + + loop { + match UdpSocket::bind(("127.0.0.1", port)) { + Ok(socket) => { + drop(socket); + return; + } + Err(err) if Instant::now() < deadline => { + trace!(target: "net::discv5::test", %port, %err, "waiting for discv5 port release"); + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(err) => panic!("discv5 did not release port {port} before timeout: {err}"), + } + } + } + + #[tokio::test(flavor = "multi_thread")] + async fn discv5_releases_port_on_drop() { + reth_tracing::init_test_tracing(); + + let secret_key = SecretKey::new(&mut thread_rng()); + let port = unused_udp_port(); + + let (node, updates) = + start_discovery_node_with_key(&secret_key, port).await.expect("should start discv5"); + drop(updates); + drop(node); + + wait_for_udp_port_release(port, Duration::from_secs(1)).await; + + let restarted = start_discovery_node_with_key(&secret_key, port).await; + assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}"); + } + #[tokio::test(flavor = "multi_thread")] async fn discv5() { reth_tracing::init_test_tracing();