diff --git a/cumulus/client/bootnodes/src/discovery.rs b/cumulus/client/bootnodes/src/discovery.rs index 456dfb8eb6a2a..73b6890ff9bc8 100644 --- a/cumulus/client/bootnodes/src/discovery.rs +++ b/cumulus/client/bootnodes/src/discovery.rs @@ -30,14 +30,18 @@ //! known addresses to the parachain networking. //! 5. If the content provider discovery had completed, all `FIND_NODE` queries finished, and all //! requests over the `/paranode` protocol succeded or failed, but we have not found any -//! bootnode addresses, we repeat the discovery process up to `MAX_DISCOVERY_ATTEMPTS` times. +//! bootnode addresses, we repeat the discovery process after a cooldown period. use crate::{config::MAX_ADDRESSES, schema::Response}; use codec::{CompactRef, Decode, Encode}; use cumulus_primitives_core::{relay_chain::Hash as RelayHash, ParaId}; use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; use futures::{ - channel::oneshot, future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt, + channel::oneshot, + future::{BoxFuture, Fuse, FusedFuture}, + pin_mut, + stream::FuturesUnordered, + FutureExt, StreamExt, }; use log::{debug, error, info, trace, warn}; use parachains_common::Hash as ParaHash; @@ -49,13 +53,15 @@ use sc_network::{ KademliaKey, Multiaddr, PeerId, ProtocolName, }; use sp_consensus_babe::{Epoch, Randomness}; -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration}; +use tokio::time::{sleep, Sleep}; /// Log target for this file. const LOG_TARGET: &str = "bootnodes::discovery"; -/// Number of discovery attempts before giving up. -const MAX_DISCOVERY_ATTEMPTS: u32 = 5; +/// Delay before retrying discovery in case of failure. Needed to rate-limit the attempts, +/// especially in small testnets where a discovery attempt can be almost instant. +const RETRY_DELAY: Duration = Duration::from_secs(30); /// Parachain bootnode discovery parameters. pub struct BootnodeDiscoveryParams { @@ -94,7 +100,7 @@ pub struct BootnodeDiscovery { >, direct_requests: HashSet, find_node_queries: HashSet, - attempts_left: u32, + pending_start_discovery: Pin>>, succeeded: bool, } @@ -124,7 +130,8 @@ impl BootnodeDiscovery { pending_responses: FuturesUnordered::default(), direct_requests: HashSet::new(), find_node_queries: HashSet::new(), - attempts_left: MAX_DISCOVERY_ATTEMPTS, + // Trigger the discovery immediately on startup. + pending_start_discovery: Box::pin(sleep(Duration::ZERO).fuse()), succeeded: false, } } @@ -171,38 +178,35 @@ impl BootnodeDiscovery { Ok(()) } - /// Start bootnode discovery if needed. Returns `false` if the discovery event loop should be + /// Schedule bootnode discovery if needed. Returns `false` if the discovery event loop should be /// terminated. - async fn maybe_start_discovery(&mut self) -> RelayChainResult { - // Start discovery if it is not currently in progress. - if self.key_being_discovered.is_none() && - self.pending_responses.is_empty() && - self.find_node_queries.is_empty() - { - // No need to start discovey again if the previous attempt succeeded. + fn maybe_retry_discovery(&mut self) -> bool { + let discovery_in_progress = self.key_being_discovered.is_some() || + !self.pending_responses.is_empty() || + !self.find_node_queries.is_empty(); + let discovery_scheduled = !self.pending_start_discovery.is_terminated(); + + if discovery_in_progress || discovery_scheduled { + // Discovery is already in progress or scheduled, just continue the event loop. + true + } else { if self.succeeded { + // No need to start discovery again if the previous attempt succeeded. info!( target: LOG_TARGET, "Parachain bootnode discovery on the relay chain DHT succeeded", ); - Ok(false) - } else if self.attempts_left > 0 { - // No discovery in progress and we have attempts left, start discovery. - self.attempts_left -= 1; - self.start_discovery().await?; - Ok(true) + + false } else { - warn!( + debug!( target: LOG_TARGET, - "Failed to discover parachain bootnodes on the relay chain DHT after {} attempts, \ - giving up", - MAX_DISCOVERY_ATTEMPTS, + "Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}", ); - Ok(false) + self.pending_start_discovery = Box::pin(sleep(RETRY_DELAY).fuse()); + + true } - } else { - // Discovery is already in progress, just continue the event loop. - Ok(true) } } @@ -444,11 +448,14 @@ impl BootnodeDiscovery { self.latest_relay_chain_hash = Some(header.hash()); loop { - if !self.maybe_start_discovery().await? { + if !self.maybe_retry_discovery() { return Ok(()); } tokio::select! { + _ = &mut self.pending_start_discovery => { + self.start_discovery().await?; + }, header = import_notification_stream.select_next_some() => { self.latest_relay_chain_hash = Some(header.hash()); }, diff --git a/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs b/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs index 7ee1ae45038a6..2d6f38c91a71f 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs @@ -77,7 +77,9 @@ async fn dht_bootnodes_test() -> Result<(), anyhow::Error> { .wait_metric_with_timeout("substrate_sync_peers", |count| count == 1.0, 300u64) .await?; - let log_line_options = LogLineCountOptions::new(|n| n == 1, Duration::from_secs(30), false); + // In case of initial failure (alpha was first to start) the discovery is retried in 30 seconds, + // so timeout in double that time. + let log_line_options = LogLineCountOptions::new(|n| n == 1, Duration::from_secs(60), false); // Make sure the DHT bootnode discovery was successful. let result = alpha