Skip to content

Commit

Permalink
Get rid of polling in WarpSync (#1265)
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin authored Sep 5, 2023
1 parent adf847a commit 1219444
Show file tree
Hide file tree
Showing 9 changed files with 344 additions and 234 deletions.
2 changes: 1 addition & 1 deletion polkadot/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ pub fn new_full<OverseerGenerator: OverseerGen>(
}: NewFullParams<OverseerGenerator>,
) -> Result<NewFull, Error> {
use polkadot_node_network_protocol::request_response::IncomingRequest;
use sc_network_common::sync::warp::WarpSyncParams;
use sc_network_sync::warp::WarpSyncParams;

let is_offchain_indexing_enabled = config.offchain_worker.indexing_enabled;
let role = config.role.clone();
Expand Down
3 changes: 1 addition & 2 deletions substrate/bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use sc_client_api::{Backend, BlockBackend};
use sc_consensus_babe::{self, SlotProportion};
use sc_executor::NativeElseWasmExecutor;
use sc_network::{event::Event, NetworkEventStream, NetworkService};
use sc_network_common::sync::warp::WarpSyncParams;
use sc_network_sync::SyncingService;
use sc_network_sync::{warp::WarpSyncParams, SyncingService};
use sc_service::{config::Configuration, error::Error as ServiceError, RpcHandlers, TaskManager};
use sc_statement_store::Store as StatementStore;
use sc_telemetry::{Telemetry, TelemetryWorker};
Expand Down
13 changes: 1 addition & 12 deletions substrate/client/network/common/src/sync/warp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use codec::{Decode, Encode};
use futures::channel::oneshot;
pub use sp_consensus_grandpa::{AuthorityList, SetId};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{fmt, sync::Arc};
use std::fmt;

/// Scale-encoded warp sync proof response.
pub struct EncodedProof(pub Vec<u8>);
Expand All @@ -30,16 +29,6 @@ pub struct WarpProofRequest<B: BlockT> {
pub begin: B::Hash,
}

/// The different types of warp syncing.
pub enum WarpSyncParams<Block: BlockT> {
/// Standard warp sync for the chain.
WithProvider(Arc<dyn WarpSyncProvider<Block>>),
/// Skip downloading proofs and wait for a header of the state that should be downloaded.
///
/// It is expected that the header provider ensures that the header is trusted.
WaitForTarget(oneshot::Receiver<<Block as BlockT>::Header>),
}

/// Proof verification result.
pub enum VerificationResult<Block: BlockT> {
/// Proof is valid, but the target was not reached.
Expand Down
118 changes: 88 additions & 30 deletions substrate/client/network/sync/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ use crate::{
BlockAnnounceValidationResult, BlockAnnounceValidator as BlockAnnounceValidatorStream,
},
service::{self, chain_sync::ToServiceCommand},
warp::WarpSyncParams,
ChainSync, ClientError, SyncingService,
};

use codec::{Decode, Encode};
use futures::{FutureExt, StreamExt};
use futures::{
channel::oneshot,
future::{BoxFuture, Fuse},
FutureExt, StreamExt,
};
use futures_timer::Delay;
use libp2p::PeerId;
use prometheus_endpoint::{
Expand All @@ -47,7 +52,6 @@ use sc_network_common::{
role::Roles,
sync::{
message::{BlockAnnounce, BlockAnnouncesHandshake, BlockState},
warp::WarpSyncParams,
BadPeer, ChainSync as ChainSyncT, ExtendedPeerInfo, SyncEvent,
},
};
Expand All @@ -67,6 +71,9 @@ use std::{
time::{Duration, Instant},
};

/// Log target for this file.
const LOG_TARGET: &'static str = "sync";

/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(1100);

Expand Down Expand Up @@ -251,6 +258,10 @@ pub struct SyncingEngine<B: BlockT, Client> {
/// The `PeerId`'s of all boot nodes.
boot_node_ids: HashSet<PeerId>,

/// A channel to get target block header if we skip over proofs downloading during warp sync.
warp_sync_target_block_header_rx:
Fuse<BoxFuture<'static, Result<B::Header, oneshot::Canceled>>>,

/// Protocol name used for block announcements
block_announce_protocol_name: ProtocolName,

Expand Down Expand Up @@ -299,7 +310,11 @@ where
let max_blocks_per_request = if net_config.network_config.max_blocks_per_request >
crate::MAX_BLOCKS_IN_RESPONSE as u32
{
log::info!(target: "sync", "clamping maximum blocks per request to {}", crate::MAX_BLOCKS_IN_RESPONSE);
log::info!(
target: LOG_TARGET,
"clamping maximum blocks per request to {}",
crate::MAX_BLOCKS_IN_RESPONSE,
);
crate::MAX_BLOCKS_IN_RESPONSE as u32
} else {
net_config.network_config.max_blocks_per_request
Expand Down Expand Up @@ -352,6 +367,19 @@ where
total.saturating_sub(net_config.network_config.default_peers_set_num_full) as usize
};

// Split warp sync params into warp sync config and a channel to retreive target block
// header.
let (warp_sync_config, warp_sync_target_block_header_rx) =
warp_sync_params.map_or((None, None), |params| {
let (config, target_block_rx) = params.split();
(Some(config), target_block_rx)
});

// Make sure polling of the target block channel is a no-op if there is no block to
// retrieve.
let warp_sync_target_block_header_rx = warp_sync_target_block_header_rx
.map_or(futures::future::pending().boxed().fuse(), |rx| rx.boxed().fuse());

let (chain_sync, block_announce_config) = ChainSync::new(
mode,
client.clone(),
Expand All @@ -360,7 +388,7 @@ where
roles,
max_parallel_downloads,
max_blocks_per_request,
warp_sync_params,
warp_sync_config,
metrics_registry,
network_service.clone(),
import_queue,
Expand Down Expand Up @@ -404,6 +432,7 @@ where
genesis_hash,
important_peers,
default_peers_set_no_slot_connected_peers: HashSet::new(),
warp_sync_target_block_header_rx,
boot_node_ids,
default_peers_set_no_slot_peers,
default_peers_set_num_full,
Expand All @@ -418,7 +447,7 @@ where
match Metrics::register(r, is_major_syncing.clone()) {
Ok(metrics) => Some(metrics),
Err(err) => {
log::error!(target: "sync", "Failed to register metrics {err:?}");
log::error!(target: LOG_TARGET, "Failed to register metrics {err:?}");
None
},
}
Expand Down Expand Up @@ -510,7 +539,10 @@ where
let peer = match self.peers.get_mut(&peer_id) {
Some(p) => p,
None => {
log::error!(target: "sync", "Received block announce from disconnected peer {}", peer_id);
log::error!(
target: LOG_TARGET,
"Received block announce from disconnected peer {peer_id}",
);
debug_assert!(false);
return
},
Expand All @@ -536,11 +568,11 @@ where
let header = match self.client.header(hash) {
Ok(Some(header)) => header,
Ok(None) => {
log::warn!(target: "sync", "Trying to announce unknown block: {}", hash);
log::warn!(target: LOG_TARGET, "Trying to announce unknown block: {hash}");
return
},
Err(e) => {
log::warn!(target: "sync", "Error reading block header {}: {}", hash, e);
log::warn!(target: LOG_TARGET, "Error reading block header {hash}: {e}");
return
},
};
Expand All @@ -551,7 +583,7 @@ where
}

let is_best = self.client.info().best_hash == hash;
log::debug!(target: "sync", "Reannouncing block {:?} is_best: {}", hash, is_best);
log::debug!(target: LOG_TARGET, "Reannouncing block {hash:?} is_best: {is_best}");

let data = data
.or_else(|| self.block_announce_data_cache.get(&hash).cloned())
Expand All @@ -560,7 +592,7 @@ where
for (peer_id, ref mut peer) in self.peers.iter_mut() {
let inserted = peer.known_blocks.insert(hash);
if inserted {
log::trace!(target: "sync", "Announcing block {:?} to {}", hash, peer_id);
log::trace!(target: LOG_TARGET, "Announcing block {hash:?} to {peer_id}");
let message = BlockAnnounce {
header: header.clone(),
state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) },
Expand All @@ -575,7 +607,7 @@ where

/// Inform sync about new best imported block.
pub fn new_best_block_imported(&mut self, hash: B::Hash, number: NumberFor<B>) {
log::debug!(target: "sync", "New best block imported {:?}/#{}", hash, number);
log::debug!(target: LOG_TARGET, "New best block imported {hash:?}/#{number}");

self.chain_sync.update_chain_info(&hash, number);
self.network_service.set_notification_handshake(
Expand Down Expand Up @@ -619,7 +651,10 @@ where
// consider it connected or are also all stalled. In order to unstall the node,
// disconnect all peers and allow `ProtocolController` to establish new connections.
if self.last_notification_io.elapsed() > INACTIVITY_EVICT_THRESHOLD {
log::debug!(target: "sync", "syncing has halted due to inactivity, evicting all peers");
log::debug!(
target: LOG_TARGET,
"syncing has halted due to inactivity, evicting all peers",
);

for peer in self.peers.keys() {
self.network_service.report_peer(*peer, rep::INACTIVE_SUBSTREAM);
Expand Down Expand Up @@ -658,7 +693,10 @@ where
ToServiceCommand::JustificationImported(peer_id, hash, number, success) => {
self.chain_sync.on_justification_import(hash, number, success);
if !success {
log::info!(target: "sync", "💔 Invalid justification provided by {} for #{}", peer_id, hash);
log::info!(
target: LOG_TARGET,
"💔 Invalid justification provided by {peer_id} for #{hash}",
);
self.network_service
.disconnect_peer(peer_id, self.block_announce_protocol_name.clone());
self.network_service.report_peer(
Expand Down Expand Up @@ -723,7 +761,7 @@ where
},
Err(()) => {
log::debug!(
target: "sync",
target: LOG_TARGET,
"Failed to register peer {remote:?}: {received_handshake:?}",
);
let _ = tx.send(false);
Expand All @@ -732,7 +770,7 @@ where
sc_network::SyncEvent::NotificationStreamClosed { remote } => {
if self.on_sync_peer_disconnected(remote).is_err() {
log::trace!(
target: "sync",
target: LOG_TARGET,
"Disconnected peer which had earlier been refused by on_sync_peer_connected {}",
remote
);
Expand All @@ -749,9 +787,8 @@ where
}
} else {
log::trace!(
target: "sync",
"Received sync for peer earlier refused by sync layer: {}",
remote
target: LOG_TARGET,
"Received sync for peer earlier refused by sync layer: {remote}",
);
}
}
Expand All @@ -764,6 +801,21 @@ where
}
}

// Retreive warp sync target block header just before polling `ChainSync`
// to make progress as soon as we receive it.
match self.warp_sync_target_block_header_rx.poll_unpin(cx) {
Poll::Ready(Ok(target)) => {
self.chain_sync.set_warp_sync_target_block(target);
},
Poll::Ready(Err(err)) => {
log::error!(
target: LOG_TARGET,
"Failed to get target block for warp sync. Error: {err:?}",
);
},
Poll::Pending => {},
}

// Drive `ChainSync`.
while let Poll::Ready(()) = self.chain_sync.poll(cx) {}

Expand All @@ -784,9 +836,9 @@ where
pub fn on_sync_peer_disconnected(&mut self, peer_id: PeerId) -> Result<(), ()> {
if let Some(info) = self.peers.remove(&peer_id) {
if self.important_peers.contains(&peer_id) {
log::warn!(target: "sync", "Reserved peer {} disconnected", peer_id);
log::warn!(target: LOG_TARGET, "Reserved peer {peer_id} disconnected");
} else {
log::debug!(target: "sync", "{} disconnected", peer_id);
log::debug!(target: LOG_TARGET, "{peer_id} disconnected");
}

if !self.default_peers_set_no_slot_connected_peers.remove(&peer_id) &&
Expand All @@ -798,7 +850,7 @@ where
},
None => {
log::error!(
target: "sync",
target: LOG_TARGET,
"trying to disconnect an inbound node which is not counted as inbound"
);
debug_assert!(false);
Expand Down Expand Up @@ -828,10 +880,13 @@ where
sink: NotificationsSink,
inbound: bool,
) -> Result<(), ()> {
log::trace!(target: "sync", "New peer {} {:?}", peer_id, status);
log::trace!(target: LOG_TARGET, "New peer {peer_id} {status:?}");

if self.peers.contains_key(&peer_id) {
log::error!(target: "sync", "Called on_sync_peer_connected with already connected peer {}", peer_id);
log::error!(
target: LOG_TARGET,
"Called on_sync_peer_connected with already connected peer {peer_id}",
);
debug_assert!(false);
return Err(())
}
Expand All @@ -841,23 +896,23 @@ where

if self.important_peers.contains(&peer_id) {
log::error!(
target: "sync",
target: LOG_TARGET,
"Reserved peer id `{}` is on a different chain (our genesis: {} theirs: {})",
peer_id,
self.genesis_hash,
status.genesis_hash,
);
} else if self.boot_node_ids.contains(&peer_id) {
log::error!(
target: "sync",
target: LOG_TARGET,
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
peer_id,
self.genesis_hash,
status.genesis_hash,
);
} else {
log::debug!(
target: "sync",
target: LOG_TARGET,
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
Expand All @@ -874,7 +929,10 @@ where
status.roles.is_full() &&
inbound && self.num_in_peers == self.max_in_peers
{
log::debug!(target: "sync", "All inbound slots have been consumed, rejecting {peer_id}");
log::debug!(
target: LOG_TARGET,
"All inbound slots have been consumed, rejecting {peer_id}",
);
return Err(())
}

Expand All @@ -884,15 +942,15 @@ where
self.default_peers_set_no_slot_connected_peers.len() +
this_peer_reserved_slot
{
log::debug!(target: "sync", "Too many full nodes, rejecting {}", peer_id);
log::debug!(target: LOG_TARGET, "Too many full nodes, rejecting {peer_id}");
return Err(())
}

if status.roles.is_light() &&
(self.peers.len() - self.chain_sync.num_peers()) >= self.default_peers_set_num_light
{
// Make sure that not all slots are occupied by light clients.
log::debug!(target: "sync", "Too many light nodes, rejecting {}", peer_id);
log::debug!(target: LOG_TARGET, "Too many light nodes, rejecting {peer_id}");
return Err(())
}

Expand Down Expand Up @@ -921,7 +979,7 @@ where
None
};

log::debug!(target: "sync", "Connected {}", peer_id);
log::debug!(target: LOG_TARGET, "Connected {peer_id}");

self.peers.insert(peer_id, peer);

Expand Down
Loading

0 comments on commit 1219444

Please sign in to comment.