Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 79 additions & 5 deletions crates/net/discv5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ impl Discv5 {
bootstrap_lookup_interval,
bootstrap_lookup_countdown,
metrics.clone(),
discv5.clone(),
Arc::downgrade(&discv5),
);

Ok((
Expand Down Expand Up @@ -573,14 +573,19 @@ pub fn spawn_populate_kbuckets_bg(
bootstrap_lookup_interval: u64,
bootstrap_lookup_countdown: u64,
metrics: Discv5Metrics,
discv5: Arc<discv5::Discv5>,
discv5: std::sync::Weak<discv5::Discv5>,
) {
let local_node_id = discv5.local_enr().node_id();
let lookup_interval = Duration::from_secs(lookup_interval);
let metrics = metrics.discovered_peers;
let mut kbucket_index = MAX_KBUCKET_INDEX;
let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
task::spawn(async move {
let Some(discv5_handle) = discv5.upgrade() else {
return;
};
let local_node_id = discv5_handle.local_enr().node_id();
drop(discv5_handle);

// make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
// log2distance from local node
for i in (0..bootstrap_lookup_countdown).rev() {
Expand All @@ -593,7 +598,12 @@ pub fn spawn_populate_kbuckets_bg(
"starting bootstrap boost lookup query"
);

lookup(target, &discv5, &metrics).await;
{
let Some(discv5_handle) = discv5.upgrade() else {
return;
};
lookup(target, &discv5_handle, &metrics).await;
}

tokio::time::sleep(pulse_lookup_interval).await;
}
Expand All @@ -610,7 +620,12 @@ pub fn spawn_populate_kbuckets_bg(
"starting periodic lookup query"
);

lookup(target, &discv5, &metrics).await;
{
let Some(discv5_handle) = discv5.upgrade() else {
return;
};
lookup(target, &discv5_handle, &metrics).await;
}

if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
// try to populate bucket one step closer
Expand Down Expand Up @@ -698,6 +713,10 @@ mod test {
use ::enr::{CombinedKey, EnrKey};
use rand_08::thread_rng;
use reth_chainspec::MAINNET;
use std::{
net::UdpSocket,
time::{Duration, Instant},
};
use tracing::trace;

fn discv5_noop() -> Discv5 {
Expand Down Expand Up @@ -736,6 +755,61 @@ mod test {
Discv5::start(&secret_key, discv5_config).await.expect("should build discv5")
}

async fn start_discovery_node_with_key(
secret_key: &SecretKey,
udp_port_discv5: u16,
) -> Result<(Discv5, mpsc::Receiver<discv5::Event>), Error> {
let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();

let discv5_listen_config = ListenConfig::from(discv5_addr);
let discv5_config = Config::builder(rlpx_addr)
.discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
.build();

Discv5::start(secret_key, discv5_config).await
}

fn unused_udp_port() -> u16 {
UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port()
}

async fn wait_for_udp_port_release(port: u16, timeout: Duration) {
let deadline = Instant::now() + timeout;

loop {
match UdpSocket::bind(("127.0.0.1", port)) {
Ok(socket) => {
drop(socket);
return;
}
Err(err) if Instant::now() < deadline => {
trace!(target: "net::discv5::test", %port, %err, "waiting for discv5 port release");
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(err) => panic!("discv5 did not release port {port} before timeout: {err}"),
}
}
}

#[tokio::test(flavor = "multi_thread")]
async fn discv5_releases_port_on_drop() {
reth_tracing::init_test_tracing();

let secret_key = SecretKey::new(&mut thread_rng());
let port = unused_udp_port();

let (node, updates) =
start_discovery_node_with_key(&secret_key, port).await.expect("should start discv5");
drop(updates);
drop(node);

wait_for_udp_port_release(port, Duration::from_secs(1)).await;

let restarted = start_discovery_node_with_key(&secret_key, port).await;
assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}");
}

#[tokio::test(flavor = "multi_thread")]
async fn discv5() {
reth_tracing::init_test_tracing();
Expand Down
Loading