From 383d3cfd121593ced2bebd845d66e000bc78ab27 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 6 Jun 2025 18:07:45 +0300 Subject: [PATCH 1/5] DHT bootnodes: rate limit the discovery retry attempts istead of max retry count --- cumulus/client/bootnodes/src/discovery.rs | 55 +++++++++++-------- .../zombienet-sdk/tests/bootnodes/mod.rs | 4 +- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/cumulus/client/bootnodes/src/discovery.rs b/cumulus/client/bootnodes/src/discovery.rs index 456dfb8eb6a2a..245ac6522ca80 100644 --- a/cumulus/client/bootnodes/src/discovery.rs +++ b/cumulus/client/bootnodes/src/discovery.rs @@ -30,14 +30,18 @@ //! known addresses to the parachain networking. //! 5. If the content provider discovery had completed, all `FIND_NODE` queries finished, and all //! requests over the `/paranode` protocol succeded or failed, but we have not found any -//! bootnode addresses, we repeat the discovery process up to `MAX_DISCOVERY_ATTEMPTS` times. +//! bootnode addresses, we repeat the discovery process after a cooldown period. use crate::{config::MAX_ADDRESSES, schema::Response}; use codec::{CompactRef, Decode, Encode}; use cumulus_primitives_core::{relay_chain::Hash as RelayHash, ParaId}; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; use futures::{ - channel::oneshot, future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt, + channel::oneshot, + future::{BoxFuture, Fuse, FusedFuture}, + pin_mut, + stream::FuturesUnordered, + FutureExt, StreamExt, }; use log::{debug, error, info, trace, warn}; use parachains_common::Hash as ParaHash; @@ -49,13 +53,15 @@ use sc_network::{ KademliaKey, Multiaddr, PeerId, ProtocolName, }; use sp_consensus_babe::{Epoch, Randomness}; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; +use tokio::time::{sleep, Sleep}; /// Log target for this file. const LOG_TARGET: &str = "bootnodes::discovery"; -/// Number of discovery attempts before giving up. -const MAX_DISCOVERY_ATTEMPTS: u32 = 5; +/// Delay before retrying discovery in case of failure. Needed to rate-limit the attempts, +/// especially in small testnets where a discovery attempt can be almost instant. +const RETRY_DELAY: Duration = Duration::from_secs(30); /// Parachain bootnode discovery parameters. pub struct BootnodeDiscoveryParams { @@ -94,7 +100,7 @@ pub struct BootnodeDiscovery { >, direct_requests: HashSet, find_node_queries: HashSet, - attempts_left: u32, + start_discovery: Pin>>, succeeded: bool, } @@ -124,7 +130,8 @@ impl BootnodeDiscovery { pending_responses: FuturesUnordered::default(), direct_requests: HashSet::new(), find_node_queries: HashSet::new(), - attempts_left: MAX_DISCOVERY_ATTEMPTS, + // Trigger the discovery immediately on startup. + start_discovery: Box::pin(sleep(Duration::ZERO).fuse()), succeeded: false, } } @@ -171,13 +178,14 @@ impl BootnodeDiscovery { Ok(()) } - /// Start bootnode discovery if needed. Returns `false` if the discovery event loop should be + /// Schedule bootnode discovery if needed. Returns `false` if the discovery event loop should be /// terminated. - async fn maybe_start_discovery(&mut self) -> RelayChainResult { - // Start discovery if it is not currently in progress. + fn maybe_retry_discovery(&mut self) -> bool { + // Schedule discovery if it is not currently in progress or scheduled. if self.key_being_discovered.is_none() && self.pending_responses.is_empty() && - self.find_node_queries.is_empty() + self.find_node_queries.is_empty() && + self.start_discovery.is_terminated() { // No need to start discovey again if the previous attempt succeeded. if self.succeeded { @@ -185,24 +193,20 @@ impl BootnodeDiscovery { target: LOG_TARGET, "Parachain bootnode discovery on the relay chain DHT succeeded", ); - Ok(false) - } else if self.attempts_left > 0 { - // No discovery in progress and we have attempts left, start discovery. - self.attempts_left -= 1; - self.start_discovery().await?; - Ok(true) + + false } else { - warn!( + debug!( target: LOG_TARGET, - "Failed to discover parachain bootnodes on the relay chain DHT after {} attempts, \ - giving up", - MAX_DISCOVERY_ATTEMPTS, + "Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}", ); - Ok(false) + self.start_discovery = Box::pin(sleep(RETRY_DELAY).fuse()); + + true } } else { // Discovery is already in progress, just continue the event loop. - Ok(true) + true } } @@ -444,11 +448,14 @@ impl BootnodeDiscovery { self.latest_relay_chain_hash = Some(header.hash()); loop { - if !self.maybe_start_discovery().await? { + if !self.maybe_retry_discovery() { return Ok(()); } tokio::select! { + _ = &mut self.start_discovery => { + self.start_discovery().await?; + }, header = import_notification_stream.select_next_some() => { self.latest_relay_chain_hash = Some(header.hash()); }, diff --git a/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs b/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs index 7ee1ae45038a6..2d6f38c91a71f 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs @@ -77,7 +77,9 @@ async fn dht_bootnodes_test() -> Result<(), anyhow::Error> { .wait_metric_with_timeout("substrate_sync_peers", |count| count == 1.0, 300u64) .await?; - let log_line_options = LogLineCountOptions::new(|n| n == 1, Duration::from_secs(30), false); + // In case of initial failure (alpha was first to start) the discovery is retried in 30 seconds, + // so timeout in double that time. + let log_line_options = LogLineCountOptions::new(|n| n == 1, Duration::from_secs(60), false); // Make sure the DHT bootnode discovery was successful. let result = alpha From 9d57e49e0fe2fc0397d4d1021e19d85e729b581b Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 9 Jun 2025 14:30:18 +0300 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> --- cumulus/client/bootnodes/src/discovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus/client/bootnodes/src/discovery.rs b/cumulus/client/bootnodes/src/discovery.rs index 245ac6522ca80..a6e0b97346d9b 100644 --- a/cumulus/client/bootnodes/src/discovery.rs +++ b/cumulus/client/bootnodes/src/discovery.rs @@ -187,7 +187,7 @@ impl BootnodeDiscovery { self.find_node_queries.is_empty() && self.start_discovery.is_terminated() { - // No need to start discovey again if the previous attempt succeeded. + // No need to start discovery again if the previous attempt succeeded. if self.succeeded { info!( target: LOG_TARGET, From f21451e12d9fb31d75853db88549b2a97f481f01 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 19 Jun 2025 15:12:03 +0300 Subject: [PATCH 3/5] minor: better naming for things --- cumulus/client/bootnodes/src/discovery.rs | 24 ++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/cumulus/client/bootnodes/src/discovery.rs b/cumulus/client/bootnodes/src/discovery.rs index a6e0b97346d9b..24f73c4139e98 100644 --- a/cumulus/client/bootnodes/src/discovery.rs +++ b/cumulus/client/bootnodes/src/discovery.rs @@ -100,7 +100,7 @@ pub struct BootnodeDiscovery { >, direct_requests: HashSet, find_node_queries: HashSet, - start_discovery: Pin>>, + pending_start_discovery: Pin>>, succeeded: bool, } @@ -131,7 +131,7 @@ impl BootnodeDiscovery { direct_requests: HashSet::new(), find_node_queries: HashSet::new(), // Trigger the discovery immediately on startup. - start_discovery: Box::pin(sleep(Duration::ZERO).fuse()), + pending_start_discovery: Box::pin(sleep(Duration::ZERO).fuse()), succeeded: false, } } @@ -181,6 +181,7 @@ impl BootnodeDiscovery { /// Schedule bootnode discovery if needed. Returns `false` if the discovery event loop should be /// terminated. fn maybe_retry_discovery(&mut self) -> bool { +<<<<<<< Updated upstream // Schedule discovery if it is not currently in progress or scheduled. if self.key_being_discovered.is_none() && self.pending_responses.is_empty() && @@ -188,7 +189,19 @@ impl BootnodeDiscovery { self.start_discovery.is_terminated() { // No need to start discovery again if the previous attempt succeeded. +======= + let discovery_in_progress = self.key_being_discovered.is_some() || + !self.pending_responses.is_empty() || + !self.find_node_queries.is_empty(); + let discovery_scheduled = !self.pending_start_discovery.is_terminated(); + + if discovery_in_progress || discovery_scheduled { + // Discovery is already in progress or scheduled, just continue the event loop. + true + } else { +>>>>>>> Stashed changes if self.succeeded { + // No need to start discovey again if the previous attempt succeeded. info!( target: LOG_TARGET, "Parachain bootnode discovery on the relay chain DHT succeeded", @@ -200,13 +213,10 @@ impl BootnodeDiscovery { target: LOG_TARGET, "Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}", ); - self.start_discovery = Box::pin(sleep(RETRY_DELAY).fuse()); + self.pending_start_discovery = Box::pin(sleep(RETRY_DELAY).fuse()); true } - } else { - // Discovery is already in progress, just continue the event loop. - true } } @@ -453,7 +463,7 @@ impl BootnodeDiscovery { } tokio::select! { - _ = &mut self.start_discovery => { + _ = &mut self.pending_start_discovery => { self.start_discovery().await?; }, header = import_notification_stream.select_next_some() => { From 74a313fe19d0248aee0bb4afbdcee3bd87a901a4 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 19 Jun 2025 15:18:08 +0300 Subject: [PATCH 4/5] Resolve forgotten merge conflict --- cumulus/client/bootnodes/src/discovery.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/cumulus/client/bootnodes/src/discovery.rs b/cumulus/client/bootnodes/src/discovery.rs index 24f73c4139e98..618730b4cbed5 100644 --- a/cumulus/client/bootnodes/src/discovery.rs +++ b/cumulus/client/bootnodes/src/discovery.rs @@ -181,15 +181,6 @@ impl BootnodeDiscovery { /// Schedule bootnode discovery if needed. Returns `false` if the discovery event loop should be /// terminated. fn maybe_retry_discovery(&mut self) -> bool { -<<<<<<< Updated upstream - // Schedule discovery if it is not currently in progress or scheduled. - if self.key_being_discovered.is_none() && - self.pending_responses.is_empty() && - self.find_node_queries.is_empty() && - self.start_discovery.is_terminated() - { - // No need to start discovery again if the previous attempt succeeded. -======= let discovery_in_progress = self.key_being_discovered.is_some() || !self.pending_responses.is_empty() || !self.find_node_queries.is_empty(); @@ -199,7 +190,6 @@ impl BootnodeDiscovery { // Discovery is already in progress or scheduled, just continue the event loop. true } else { ->>>>>>> Stashed changes if self.succeeded { // No need to start discovey again if the previous attempt succeeded. info!( From 44ba4718be033c33b065a6cb7d4bcb464290fefd Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 19 Jun 2025 15:29:10 +0300 Subject: [PATCH 5/5] minor: typo --- cumulus/client/bootnodes/src/discovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus/client/bootnodes/src/discovery.rs b/cumulus/client/bootnodes/src/discovery.rs index 618730b4cbed5..73b6890ff9bc8 100644 --- a/cumulus/client/bootnodes/src/discovery.rs +++ b/cumulus/client/bootnodes/src/discovery.rs @@ -191,7 +191,7 @@ impl BootnodeDiscovery { true } else { if self.succeeded { - // No need to start discovey again if the previous attempt succeeded. + // No need to start discovery again if the previous attempt succeeded. info!( target: LOG_TARGET, "Parachain bootnode discovery on the relay chain DHT succeeded",