Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions cumulus/client/bootnodes/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@
//! known addresses to the parachain networking.
//! 5. If the content provider discovery had completed, all `FIND_NODE` queries finished, and all
//! requests over the `/paranode` protocol succeded or failed, but we have not found any
//! bootnode addresses, we repeat the discovery process up to `MAX_DISCOVERY_ATTEMPTS` times.
//! bootnode addresses, we repeat the discovery process after a cooldown period.

use crate::{config::MAX_ADDRESSES, schema::Response};
use codec::{CompactRef, Decode, Encode};
use cumulus_primitives_core::{relay_chain::Hash as RelayHash, ParaId};
use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult};
use futures::{
channel::oneshot, future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt,
channel::oneshot,
future::{BoxFuture, Fuse, FusedFuture},
pin_mut,
stream::FuturesUnordered,
FutureExt, StreamExt,
};
use log::{debug, error, info, trace, warn};
use parachains_common::Hash as ParaHash;
Expand All @@ -49,13 +53,15 @@ use sc_network::{
KademliaKey, Multiaddr, PeerId, ProtocolName,
};
use sp_consensus_babe::{Epoch, Randomness};
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
use tokio::time::{sleep, Sleep};

/// Log target for this file.
const LOG_TARGET: &str = "bootnodes::discovery";

/// Number of discovery attempts before giving up.
const MAX_DISCOVERY_ATTEMPTS: u32 = 5;
/// Delay before retrying discovery in case of failure. Needed to rate-limit the attempts,
/// especially in small testnets where a discovery attempt can be almost instant.
const RETRY_DELAY: Duration = Duration::from_secs(30);

/// Parachain bootnode discovery parameters.
pub struct BootnodeDiscoveryParams {
Expand Down Expand Up @@ -94,7 +100,7 @@ pub struct BootnodeDiscovery {
>,
direct_requests: HashSet<PeerId>,
find_node_queries: HashSet<PeerId>,
attempts_left: u32,
pending_start_discovery: Pin<Box<Fuse<Sleep>>>,
succeeded: bool,
}

Expand Down Expand Up @@ -124,7 +130,8 @@ impl BootnodeDiscovery {
pending_responses: FuturesUnordered::default(),
direct_requests: HashSet::new(),
find_node_queries: HashSet::new(),
attempts_left: MAX_DISCOVERY_ATTEMPTS,
// Trigger the discovery immediately on startup.
pending_start_discovery: Box::pin(sleep(Duration::ZERO).fuse()),
succeeded: false,
}
}
Expand Down Expand Up @@ -171,38 +178,35 @@ impl BootnodeDiscovery {
Ok(())
}

/// Start bootnode discovery if needed. Returns `false` if the discovery event loop should be
/// Schedule bootnode discovery if needed. Returns `false` if the discovery event loop should be
/// terminated.
async fn maybe_start_discovery(&mut self) -> RelayChainResult<bool> {
// Start discovery if it is not currently in progress.
if self.key_being_discovered.is_none() &&
self.pending_responses.is_empty() &&
self.find_node_queries.is_empty()
{
// No need to start discovey again if the previous attempt succeeded.
fn maybe_retry_discovery(&mut self) -> bool {
let discovery_in_progress = self.key_being_discovered.is_some() ||
!self.pending_responses.is_empty() ||
!self.find_node_queries.is_empty();
let discovery_scheduled = !self.pending_start_discovery.is_terminated();

if discovery_in_progress || discovery_scheduled {
// Discovery is already in progress or scheduled, just continue the event loop.
true
} else {
if self.succeeded {
// No need to start discovery again if the previous attempt succeeded.
info!(
target: LOG_TARGET,
"Parachain bootnode discovery on the relay chain DHT succeeded",
Comment thread
lexnv marked this conversation as resolved.
);
Ok(false)
} else if self.attempts_left > 0 {
// No discovery in progress and we have attempts left, start discovery.
self.attempts_left -= 1;
self.start_discovery().await?;
Ok(true)

false
} else {
warn!(
debug!(
target: LOG_TARGET,
"Failed to discover parachain bootnodes on the relay chain DHT after {} attempts, \
giving up",
MAX_DISCOVERY_ATTEMPTS,
"Retrying parachain bootnode discovery on the relay chain DHT in {RETRY_DELAY:?}",
);
Ok(false)
self.pending_start_discovery = Box::pin(sleep(RETRY_DELAY).fuse());

true
}
} else {
// Discovery is already in progress, just continue the event loop.
Ok(true)
}
}

Expand Down Expand Up @@ -444,11 +448,14 @@ impl BootnodeDiscovery {
self.latest_relay_chain_hash = Some(header.hash());

loop {
if !self.maybe_start_discovery().await? {
if !self.maybe_retry_discovery() {
return Ok(());
}

tokio::select! {
_ = &mut self.pending_start_discovery => {
self.start_discovery().await?;
},
header = import_notification_stream.select_next_some() => {
self.latest_relay_chain_hash = Some(header.hash());
},
Expand Down
4 changes: 3 additions & 1 deletion cumulus/zombienet/zombienet-sdk/tests/bootnodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ async fn dht_bootnodes_test() -> Result<(), anyhow::Error> {
.wait_metric_with_timeout("substrate_sync_peers", |count| count == 1.0, 300u64)
.await?;

let log_line_options = LogLineCountOptions::new(|n| n == 1, Duration::from_secs(30), false);
// In case of initial failure (alpha was first to start) the discovery is retried in 30 seconds,
// so timeout in double that time.
let log_line_options = LogLineCountOptions::new(|n| n == 1, Duration::from_secs(60), false);

// Make sure the DHT bootnode discovery was successful.
let result = alpha
Expand Down