From d95269fc803735e833be822bb5a93887b9f414f4 Mon Sep 17 00:00:00 2001 From: romanbrodetski-ai Date: Mon, 30 Mar 2026 13:51:32 +0000 Subject: [PATCH 1/4] fix(discv5): use Weak reference in kbuckets bg task to release port on shutdown The `spawn_populate_kbuckets_bg` function held an `Arc` which prevented the discv5 node from being fully dropped when the `Discv5` handle was dropped. This caused the UDP port to remain bound, making it impossible to restart a node on the same port without waiting for the OS to reclaim it. Switch to `Weak` so the background task gracefully exits when the last strong reference is dropped, allowing the port to be released immediately. --- crates/net/discv5/src/lib.rs | 55 ++++++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index 2b5a1432057..a9ac9d6234f 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -225,7 +225,7 @@ impl Discv5 { bootstrap_lookup_interval, bootstrap_lookup_countdown, metrics.clone(), - discv5.clone(), + Arc::downgrade(&discv5), ); Ok(( @@ -573,17 +573,25 @@ pub fn spawn_populate_kbuckets_bg( bootstrap_lookup_interval: u64, bootstrap_lookup_countdown: u64, metrics: Discv5Metrics, - discv5: Arc, + discv5: std::sync::Weak, ) { - let local_node_id = discv5.local_enr().node_id(); let lookup_interval = Duration::from_secs(lookup_interval); let metrics = metrics.discovered_peers; let mut kbucket_index = MAX_KBUCKET_INDEX; let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval); task::spawn(async move { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; + let local_node_id = discv5_handle.local_enr().node_id(); + drop(discv5_handle); + // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest // log2distance from local node for i in (0..bootstrap_lookup_countdown).rev() { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; let target = discv5::enr::NodeId::random(); trace!(target: "net::discv5", @@ -593,13 +601,16 @@ pub fn spawn_populate_kbuckets_bg( "starting bootstrap boost lookup query" ); - lookup(target, &discv5, &metrics).await; + lookup(target, &discv5_handle, &metrics).await; tokio::time::sleep(pulse_lookup_interval).await; } // initiate regular lookups to populate kbuckets loop { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; // make sure node is connected to each subtree in the network by target // selection (ref kademlia) let target = get_lookup_target(kbucket_index, local_node_id); @@ -610,7 +621,7 @@ pub fn spawn_populate_kbuckets_bg( "starting periodic lookup query" ); - lookup(target, &discv5, &metrics).await; + lookup(target, &discv5_handle, &metrics).await; if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX { // try to populate bucket one step closer @@ -736,6 +747,40 @@ mod test { Discv5::start(&secret_key, discv5_config).await.expect("should build discv5") } + async fn start_discovery_node_with_key( + secret_key: &SecretKey, + udp_port_discv5: u16, + ) -> Result<(Discv5, mpsc::Receiver), Error> { + let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap(); + let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap(); + + let discv5_listen_config = ListenConfig::from(discv5_addr); + let discv5_config = Config::builder(rlpx_addr) + .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build()) + .build(); + + Discv5::start(secret_key, discv5_config).await + } + + #[tokio::test(flavor = "multi_thread")] + async fn discv5_releases_port_on_drop() { + reth_tracing::init_test_tracing(); + + let secret_key = SecretKey::new(&mut thread_rng()); + let port = 30366; + + let (node, updates) = start_discovery_node_with_key(&secret_key, port) + .await + .expect("should start discv5"); + drop(updates); + drop(node); + + tokio::task::yield_now().await; + + let restarted = start_discovery_node_with_key(&secret_key, port).await; + assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}"); + } + #[tokio::test(flavor = "multi_thread")] async fn discv5() { reth_tracing::init_test_tracing(); From 879bd5c1ebb151cf744c48baee4799061dbc3bec Mon Sep 17 00:00:00 2001 From: romanbrodetski-ai Date: Fri, 24 Apr 2026 10:43:05 +0000 Subject: [PATCH 2/4] fix(discv5): address review nits on shutdown cleanup --- crates/net/discv5/src/lib.rs | 51 +++++++++++++++++++++++++++++------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index a9ac9d6234f..58058767c85 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -589,9 +589,6 @@ pub fn spawn_populate_kbuckets_bg( // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest // log2distance from local node for i in (0..bootstrap_lookup_countdown).rev() { - let Some(discv5_handle) = discv5.upgrade() else { - return; - }; let target = discv5::enr::NodeId::random(); trace!(target: "net::discv5", @@ -601,16 +598,18 @@ pub fn spawn_populate_kbuckets_bg( "starting bootstrap boost lookup query" ); - lookup(target, &discv5_handle, &metrics).await; + { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; + lookup(target, &discv5_handle, &metrics).await; + } tokio::time::sleep(pulse_lookup_interval).await; } // initiate regular lookups to populate kbuckets loop { - let Some(discv5_handle) = discv5.upgrade() else { - return; - }; // make sure node is connected to each subtree in the network by target // selection (ref kademlia) let target = get_lookup_target(kbucket_index, local_node_id); @@ -621,7 +620,12 @@ pub fn spawn_populate_kbuckets_bg( "starting periodic lookup query" ); - lookup(target, &discv5_handle, &metrics).await; + { + let Some(discv5_handle) = discv5.upgrade() else { + return; + }; + lookup(target, &discv5_handle, &metrics).await; + } if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX { // try to populate bucket one step closer @@ -709,6 +713,11 @@ mod test { use ::enr::{CombinedKey, EnrKey}; use rand_08::thread_rng; use reth_chainspec::MAINNET; + use std::{ + env, + net::UdpSocket, + time::{Duration, Instant}, + }; use tracing::trace; fn discv5_noop() -> Discv5 { @@ -762,12 +771,34 @@ mod test { Discv5::start(secret_key, discv5_config).await } + fn unused_udp_port() -> u16 { + UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port() + } + + async fn wait_for_udp_port_release(port: u16, timeout: Duration) { + let deadline = Instant::now() + timeout; + + loop { + match UdpSocket::bind(("127.0.0.1", port)) { + Ok(socket) => { + drop(socket); + return; + } + Err(err) if Instant::now() < deadline => { + trace!(target: "net::discv5::test", %port, %err, "waiting for discv5 port release"); + tokio::time::sleep(Duration::from_millis(10)).await; + } + Err(err) => panic!("discv5 did not release port {port} before timeout: {err}"), + } + } + } + #[tokio::test(flavor = "multi_thread")] async fn discv5_releases_port_on_drop() { reth_tracing::init_test_tracing(); let secret_key = SecretKey::new(&mut thread_rng()); - let port = 30366; + let port = unused_udp_port(); let (node, updates) = start_discovery_node_with_key(&secret_key, port) .await @@ -775,7 +806,7 @@ mod test { drop(updates); drop(node); - tokio::task::yield_now().await; + wait_for_udp_port_release(port, Duration::from_secs(1)).await; let restarted = start_discovery_node_with_key(&secret_key, port).await; assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}"); From 453cd5588987c514306dab875e8a603a7865fe06 Mon Sep 17 00:00:00 2001 From: romanbrodetski-ai Date: Fri, 24 Apr 2026 10:44:22 +0000 Subject: [PATCH 3/4] fix(discv5): drop unused test import after rebase --- crates/net/discv5/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index 58058767c85..49776f8e2c2 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -714,7 +714,6 @@ mod test { use rand_08::thread_rng; use reth_chainspec::MAINNET; use std::{ - env, net::UdpSocket, time::{Duration, Instant}, }; From 73fb8d069b934b9422f796779090a070662986f0 Mon Sep 17 00:00:00 2001 From: romanbrodetski-ai Date: Fri, 24 Apr 2026 14:12:10 +0000 Subject: [PATCH 4/4] style(discv5): format shutdown test --- crates/net/discv5/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/crates/net/discv5/src/lib.rs b/crates/net/discv5/src/lib.rs index 49776f8e2c2..83b37e056a3 100644 --- a/crates/net/discv5/src/lib.rs +++ b/crates/net/discv5/src/lib.rs @@ -799,9 +799,8 @@ mod test { let secret_key = SecretKey::new(&mut thread_rng()); let port = unused_udp_port(); - let (node, updates) = start_discovery_node_with_key(&secret_key, port) - .await - .expect("should start discv5"); + let (node, updates) = + start_discovery_node_with_key(&secret_key, port).await.expect("should start discv5"); drop(updates); drop(node);