diff --git a/Cargo.lock b/Cargo.lock index 0dbafc147f6bb..79014184de1b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -504,6 +504,7 @@ dependencies = [ "sp-application-crypto", "sp-arithmetic", "sp-blockchain", + "sp-consensus", "sp-core", "sp-keystore", "sp-runtime", @@ -8329,6 +8330,8 @@ dependencies = [ "substrate-test-runtime-client", "tempfile", "thiserror", + "tokio", + "tokio-stream", "unsigned-varint 0.6.0", "void", "zeroize", @@ -10654,13 +10657,14 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b2f3f698253f03119ac0102beaa64f67a67e08074d03a22d18784104543727f" +checksum = "50145484efff8818b5ccd256697f36863f587da82cf8b409c53adf1e840798e3" dependencies = [ "futures-core", "pin-project-lite 0.2.6", "tokio", + "tokio-util", ] [[package]] diff --git a/client/beefy/Cargo.toml b/client/beefy/Cargo.toml index 9761f18d78450..7a666937be449 100644 --- a/client/beefy/Cargo.toml +++ b/client/beefy/Cargo.toml @@ -25,6 +25,7 @@ sp-blockchain = { version = "4.0.0-dev", path = "../../primitives/blockchain" } sp-core = { version = "4.1.0-dev", path = "../../primitives/core" } sp-keystore = { version = "0.10.0", path = "../../primitives/keystore" } sp-runtime = { version = "4.1.0-dev", path = "../../primitives/runtime" } +sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/common" } sc-chain-spec = { version = "4.0.0-dev", path = "../../client/chain-spec" } sc-utils = { version = "4.0.0-dev", path = "../utils" } diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 9b2bf383df8ef..d831c5a65a661 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -26,6 +26,7 @@ use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; +pub use sp_consensus::SyncOracle; use sp_keystore::SyncCryptoStorePtr; use sp_runtime::traits::Block; @@ -112,7 +113,7 @@ where BE: Backend, C: Client, C::Api: BeefyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, { /// BEEFY client pub client: Arc, @@ -143,7 +144,7 @@ where BE: Backend, C: Client, C::Api: BeefyApi, - N: GossipNetwork + Clone + Send + 'static, + N: GossipNetwork + Clone + SyncOracle + Send + Sync + 'static, { let BeefyParams { client, @@ -157,6 +158,7 @@ where protocol_name, } = beefy_params; + let sync_oracle = network.clone(); let gossip_validator = Arc::new(gossip::GossipValidator::new()); let gossip_engine = GossipEngine::new(network, protocol_name, gossip_validator.clone(), None); @@ -184,9 +186,10 @@ where gossip_validator, min_block_delta, metrics, + sync_oracle, }; - let worker = worker::BeefyWorker::<_, _, _>::new(worker_params); + let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params); worker.run().await } diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 0c7d8d4ffdc9c..62fda45df3542 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -47,10 +47,10 @@ use crate::{ metric_inc, metric_set, metrics::Metrics, notification::{BeefyBestBlockSender, BeefySignedCommitmentSender}, - round, Client, + round, Client, SyncOracle, }; -pub(crate) struct WorkerParams +pub(crate) struct WorkerParams where B: Block, { @@ -63,14 +63,16 @@ where pub gossip_validator: Arc>, pub min_block_delta: u32, pub metrics: Option, + pub sync_oracle: SO, } /// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker +pub(crate) struct BeefyWorker where B: Block, BE: Backend, C: Client, + SO: SyncOracle + Send + Sync + Clone + 'static, { client: Arc, backend: Arc, @@ -91,16 +93,19 @@ where beefy_best_block_sender: BeefyBestBlockSender, /// Validator set id for the last signed commitment last_signed_id: u64, + /// Handle to the sync oracle + sync_oracle: SO, // keep rustc happy _backend: PhantomData, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, C: Client, C::Api: BeefyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { /// Return a new BEEFY worker instance. /// @@ -108,7 +113,7 @@ where /// BEEFY pallet has been deployed on-chain. /// /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new(worker_params: WorkerParams) -> Self { let WorkerParams { client, backend, @@ -119,6 +124,7 @@ where gossip_validator, min_block_delta, metrics, + sync_oracle, } = worker_params; BeefyWorker { @@ -136,17 +142,19 @@ where best_beefy_block: None, last_signed_id: 0, beefy_best_block_sender, + sync_oracle, _backend: PhantomData, } } } -impl BeefyWorker +impl BeefyWorker where B: Block, BE: Backend, C: Client, C::Api: BeefyApi, + SO: SyncOracle + Send + Sync + Clone + 'static, { /// Return `true`, if we should vote on block `number` fn should_vote_on(&self, number: NumberFor) -> bool { @@ -400,6 +408,10 @@ where )); loop { + if self.sync_oracle.is_major_syncing() { + debug!(target: "beefy", "Waiting for major sync to complete."); + self.sync_oracle.wait_for_major_syncing().await; + } let engine = self.gossip_engine.clone(); let gossip_engine = future::poll_fn(|cx| engine.lock().poll_unpin(cx)); diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 354991b32ba5b..5da6fee3abc87 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -29,6 +29,8 @@ either = "1.5.3" fnv = "1.0.6" fork-tree = { version = "3.0.0", path = "../../utils/fork-tree" } futures = "0.3.9" +tokio = { version = "1.15.0", features = [ "sync" ] } +tokio-stream = { version = "0.1.8", features = [ "sync" ] } futures-timer = "3.0.2" asynchronous-codec = "0.5" hex = "0.4.0" diff --git a/client/network/src/service.rs b/client/network/src/service.rs index b6a1d3c88e7f3..d1ff7d7e15a0e 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -43,7 +43,8 @@ use crate::{ sync::{Status as SyncStatus, SyncState}, NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready, }, - transactions, transport, DhtEvent, ExHashT, NetworkStateInfo, NetworkStatus, ReputationChange, + transactions, transport, utils, DhtEvent, ExHashT, NetworkStateInfo, NetworkStatus, + ReputationChange, }; use codec::Encode as _; @@ -85,6 +86,7 @@ use std::{ }, task::Poll, }; +use tokio::sync::watch; pub use behaviour::{ IfDisconnected, InboundFailure, OutboundFailure, RequestFailure, ResponseFailure, @@ -113,6 +115,8 @@ pub struct NetworkService { external_addresses: Arc>>, /// Are we actively catching up with the chain? is_major_syncing: Arc, + /// A channel that receives notifications about the major sync state + major_sync_stream: utils::MajorSyncStream>, /// Local copy of the `PeerId` of the local node. local_peer_id: PeerId, /// The `KeyPair` that defines the `PeerId` of the local node. @@ -249,6 +253,7 @@ impl NetworkWorker { let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); + let (major_sync_sender, major_sync_stream) = utils::MajorSyncStream::new(None); // Build the swarm. let client = params.chain.clone(); @@ -415,6 +420,7 @@ impl NetworkWorker { external_addresses: external_addresses.clone(), num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), + major_sync_stream, peerset: peerset_handle, local_peer_id, local_identity, @@ -438,6 +444,7 @@ impl NetworkWorker { external_addresses, num_connected, is_major_syncing, + major_sync_sender: Arc::new(major_sync_sender), network_service: swarm, service, import_queue: params.import_queue, @@ -1277,6 +1284,7 @@ impl NetworkService { } } +#[async_trait::async_trait] impl sp_consensus::SyncOracle for NetworkService { fn is_major_syncing(&mut self) -> bool { Self::is_major_syncing(self) @@ -1285,8 +1293,25 @@ impl sp_consensus::SyncOracle for NetworkServic fn is_offline(&mut self) -> bool { self.num_connected.load(Ordering::Relaxed) == 0 } -} + async fn wait_for_major_syncing(&mut self) -> () { + self.major_sync_stream + .clone() + .filter(|val| { + if let Some(val) = val { + future::ready(!val) + } + // if the stream yields None, we know it's still at the initial value so we discard + // it + else { + future::ready(false) + } + }) + .next() + .await; + } +} +#[async_trait::async_trait] impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for &'a NetworkService { fn is_major_syncing(&mut self) -> bool { NetworkService::is_major_syncing(self) @@ -1295,6 +1320,23 @@ impl<'a, B: BlockT + 'static, H: ExHashT> sp_consensus::SyncOracle for &'a Netwo fn is_offline(&mut self) -> bool { self.num_connected.load(Ordering::Relaxed) == 0 } + + async fn wait_for_major_syncing(&mut self) -> () { + self.major_sync_stream + .clone() + .filter(|val| { + if let Some(val) = val { + future::ready(!val) + } + // if the stream yields None, we know it's still at the initial value so we discard + // it + else { + future::ready(false) + } + }) + .next() + .await; + } } impl sc_consensus::JustificationSyncLink for NetworkService { @@ -1457,6 +1499,8 @@ pub struct NetworkWorker { num_connected: Arc, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. is_major_syncing: Arc, + /// A channel that sends updates about the major sync state + major_sync_sender: Arc>>, /// The network service that can be extracted and shared through the codebase. service: Arc>, /// The *actual* network. @@ -2076,6 +2120,7 @@ impl Future for NetworkWorker { this.tx_handler_controller.set_gossip_enabled(!is_major_syncing); this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); + let _ = this.major_sync_sender.send(Some(is_major_syncing)); if let Some(metrics) = this.metrics.as_ref() { for (proto, buckets) in this.network_service.behaviour_mut().num_entries_per_kbucket() { diff --git a/client/network/src/utils.rs b/client/network/src/utils.rs index d0e61a0d0475d..a378b8f18d2c9 100644 --- a/client/network/src/utils.rs +++ b/client/network/src/utils.rs @@ -16,10 +16,12 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::{stream::unfold, FutureExt, Stream, StreamExt}; +use futures::{ready, stream::unfold, FutureExt, Stream, StreamExt}; use futures_timer::Delay; use linked_hash_set::LinkedHashSet; -use std::{hash::Hash, num::NonZeroUsize, time::Duration}; +use std::{hash::Hash, num::NonZeroUsize, task::Poll, time::Duration}; +use tokio::sync::watch; +use tokio_stream::wrappers::WatchStream; /// Creates a stream that returns a new value every `duration`. pub fn interval(duration: Duration) -> impl Stream + Unpin { @@ -57,6 +59,46 @@ impl LruHashSet { } } +/// A clone-able stream that yields [`T`]. +/// Every instance of the stream will recieve the same data. +/// The stream is implemented on top of a spmc channel +/// see [`tokio::sync::watch::Receiver`] +pub struct MajorSyncStream { + consumer: watch::Receiver, + inner: WatchStream, +} + +impl Clone for MajorSyncStream { + fn clone(&self) -> Self { + let consumer = self.consumer.clone(); + let inner = WatchStream::new(consumer.clone()); + Self { consumer, inner } + } +} + +impl MajorSyncStream { + pub fn new(init: T) -> (watch::Sender, Self) { + let (tx, rx) = watch::channel(init); + let consumer = rx.clone(); + let inner = WatchStream::new(consumer.clone()); + (tx, Self { consumer, inner }) + } +} + +impl Stream for MajorSyncStream { + type Item = T; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match ready!(self.inner.poll_next_unpin(cx)) { + Some(item) => Poll::Ready(Some(item)), + _ => Poll::Ready(None), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/primitives/consensus/common/src/lib.rs b/primitives/consensus/common/src/lib.rs index 492ad83ddf5bd..e0c119f780718 100644 --- a/primitives/consensus/common/src/lib.rs +++ b/primitives/consensus/common/src/lib.rs @@ -234,19 +234,25 @@ pub trait Proposer { /// /// Generally, consensus authoring work isn't undertaken while well behind /// the head of the chain. -pub trait SyncOracle { +#[async_trait::async_trait] +pub trait SyncOracle: Send { /// Whether the synchronization service is undergoing major sync. /// Returns true if so. fn is_major_syncing(&mut self) -> bool; /// Whether the synchronization service is offline. /// Returns true if so. fn is_offline(&mut self) -> bool; + /// A future that completes when major syncing is false + async fn wait_for_major_syncing(&mut self) -> () { + () + } } /// A synchronization oracle for when there is no network. #[derive(Clone, Copy, Debug)] pub struct NoNetwork; +#[async_trait::async_trait] impl SyncOracle for NoNetwork { fn is_major_syncing(&mut self) -> bool { false @@ -256,9 +262,10 @@ impl SyncOracle for NoNetwork { } } +#[async_trait::async_trait] impl SyncOracle for Arc where - T: ?Sized, + T: ?Sized + Sync + Send, for<'r> &'r T: SyncOracle, { fn is_major_syncing(&mut self) -> bool {