diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index c690376193f37..cfd1bfdbb382b 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -34,8 +34,8 @@ use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, futur use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use finality_grandpa::{voter, voter_set::VoterSet}; use log::{debug, trace}; -use sc_network::ReputationChange; -use sc_network_gossip::{GossipEngine, Network}; +use sc_network::{NetworkService, ReputationChange}; +use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use parity_scale_codec::{Encode, Decode}; use sp_core::Pair; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; @@ -95,6 +95,30 @@ mod benefit { pub(super) const PER_EQUIVOCATION: i32 = 10; } +/// A handle to the network. +/// +/// Something that provides both the capabilities needed for the `gossip_network::Network` trait as +/// well as the ability to set a fork sync request for a particular block. +pub trait Network: GossipNetwork + Clone + Send + 'static { + /// Notifies the sync service to try and sync the given block from the given + /// peers. + /// + /// If the given vector of peers is empty then the underlying implementation + /// should make a best effort to fetch the block from any peers it is + /// connected to (NOTE: this assumption will change in the future #3629). + fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); +} + +impl Network for Arc> where + B: BlockT, + S: sc_network::specialization::NetworkSpecialization, + H: sc_network::ExHashT, +{ + fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { + NetworkService::set_sync_fork_request(self, peers, hash, number) + } +} + /// Create a unique topic for a round and set-id combo. pub(crate) fn round_topic(round: RoundNumber, set_id: SetIdNumber) -> B::Hash { <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) @@ -106,18 +130,19 @@ pub(crate) fn global_topic(set_id: SetIdNumber) -> B::Hash { } /// Bridge between the underlying network service, gossiping consensus messages and Grandpa -pub(crate) struct NetworkBridge { +pub(crate) struct NetworkBridge> { + service: N, gossip_engine: GossipEngine, validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, } -impl NetworkBridge { +impl> NetworkBridge { /// Create a new NetworkBridge to the given NetworkService. Returns the service /// handle. /// On creation it will register previous rounds' votes with the gossip /// service taken from the VoterSetState. - pub(crate) fn new + Clone + Send + 'static>( + pub(crate) fn new( service: N, config: crate::Config, set_state: crate::environment::SharedVoterSetState, @@ -130,7 +155,7 @@ impl NetworkBridge { ); let validator = Arc::new(validator); - let gossip_engine = GossipEngine::new(service, executor, GRANDPA_ENGINE_ID, validator.clone()); + let gossip_engine = GossipEngine::new(service.clone(), executor, GRANDPA_ENGINE_ID, validator.clone()); { // register all previous votes with the gossip service so that they're @@ -173,7 +198,7 @@ impl NetworkBridge { let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone()); let reporting_job = report_stream.consume(gossip_engine.clone()); - let bridge = NetworkBridge { gossip_engine, validator, neighbor_sender }; + let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender }; let executor = Compat::new(executor); executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(())))) @@ -356,8 +381,13 @@ impl NetworkBridge { /// If the given vector of peers is empty then the underlying implementation /// should make a best effort to fetch the block from any peers it is /// connected to (NOTE: this assumption will change in the future #3629). - pub(crate) fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - self.gossip_engine.set_sync_fork_request(peers, hash, number) + pub(crate) fn set_sync_fork_request( + &self, + peers: Vec, + hash: B::Hash, + number: NumberFor + ) { + Network::set_sync_fork_request(&self.service, peers, hash, number) } } @@ -493,9 +523,10 @@ fn incoming_global( .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) } -impl Clone for NetworkBridge { +impl> Clone for NetworkBridge { fn clone(&self) -> Self { NetworkBridge { + service: self.service.clone(), gossip_engine: self.gossip_engine.clone(), validator: Arc::clone(&self.validator), neighbor_sender: self.neighbor_sender.clone(), diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index 4c0223402fa5a..9b5884e857468 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -67,7 +67,9 @@ impl sc_network_gossip::Network for TestNetwork { fn announce(&self, block: Hash, _associated_data: Vec) { let _ = self.sender.unbounded_send(Event::Announce(block)); } +} +impl super::Network for TestNetwork { fn set_sync_fork_request( &self, _peers: Vec, @@ -94,7 +96,7 @@ impl sc_network_gossip::ValidatorContext for TestNetwork { } struct Tester { - net_handle: super::NetworkBridge, + net_handle: super::NetworkBridge, gossip_validator: Arc>, events: mpsc::UnboundedReceiver, } diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 998e63b6b514b..77153554c9466 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -56,6 +56,7 @@ use crate::{ use sp_consensus::SelectChain; use crate::authorities::{AuthoritySet, SharedAuthoritySet}; +use crate::communication::Network as NetworkT; use crate::consensus_changes::SharedConsensusChanges; use crate::justification::GrandpaJustification; use crate::until_imported::UntilVoteTargetImported; @@ -376,20 +377,20 @@ impl SharedVoterSetState { } /// The environment we run GRANDPA in. -pub(crate) struct Environment { +pub(crate) struct Environment, RA, SC, VR> { pub(crate) client: Arc>, pub(crate) select_chain: SC, pub(crate) voters: Arc>, pub(crate) config: Config, pub(crate) authority_set: SharedAuthoritySet>, pub(crate) consensus_changes: SharedConsensusChanges>, - pub(crate) network: crate::communication::NetworkBridge, + pub(crate) network: crate::communication::NetworkBridge, pub(crate) set_id: SetId, pub(crate) voter_set_state: SharedVoterSetState, pub(crate) voting_rule: VR, } -impl Environment { +impl, RA, SC, VR> Environment { /// Updates the voter set state using the given closure. The write lock is /// held during evaluation of the closure and the environment's voter set /// state is set to its result if successful. @@ -405,13 +406,14 @@ impl Environment { } } -impl, B, E, RA, SC, VR> +impl, B, E, N, RA, SC, VR> finality_grandpa::Chain> -for Environment +for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, + N: NetworkT + 'static + Send, SC: SelectChain + 'static, VR: VotingRule>, RA: Send + Sync, @@ -554,13 +556,14 @@ pub(crate) fn ancestry, E, RA>( Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect()) } -impl, RA, SC, VR> +impl, N, RA, SC, VR> voter::Environment> -for Environment +for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + 'static + Send + Sync, + N: NetworkT + 'static + Send, RA: 'static + Send + Sync, SC: SelectChain + 'static, VR: VotingRule>, diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index 7d3b26a6328c4..b6745baf69955 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -90,7 +90,6 @@ mod observer; mod until_imported; mod voting_rule; -pub use sc_network_gossip::Network; pub use finality_proof::FinalityProofProvider; pub use justification::GrandpaJustification; pub use light_import::light_block_import; @@ -103,7 +102,7 @@ use aux_schema::PersistentData; use environment::{Environment, VoterSetState}; use import::GrandpaBlockImport; use until_imported::UntilGlobalMessageBlocksImported; -use communication::NetworkBridge; +use communication::{NetworkBridge, Network as NetworkT}; use sp_finality_grandpa::{AuthorityList, AuthorityPair, AuthoritySignature, SetId}; // Re-export these two because it's just so damn convenient. @@ -276,8 +275,9 @@ pub(crate) trait BlockSyncRequester { fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor); } -impl BlockSyncRequester for NetworkBridge where +impl BlockSyncRequester for NetworkBridge where Block: BlockT, + Network: NetworkT, { fn set_sync_fork_request(&self, peers: Vec, hash: Block::Hash, number: NumberFor) { NetworkBridge::set_sync_fork_request(self, peers, hash, number) @@ -446,11 +446,11 @@ where )) } -fn global_communication, B, E, RA>( +fn global_communication, B, E, N, RA>( set_id: SetId, voters: &Arc>, client: &Arc>, - network: &NetworkBridge, + network: &NetworkBridge, keystore: &Option, ) -> ( impl Stream< @@ -464,6 +464,7 @@ fn global_communication, B, E, RA>( ) where B: Backend, E: CallExecutor + Send + Sync, + N: NetworkT, RA: Send + Sync, NumberFor: BlockNumberOps, { @@ -548,7 +549,7 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X, Sp>( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Sync + Clone + 'static, SC: SelectChain + 'static, VR: VotingRule> + Clone + 'static, NumberFor: BlockNumberOps, @@ -641,15 +642,16 @@ pub fn run_grandpa_voter, N, RA, SC, VR, X, Sp>( /// Future that powers the voter. #[must_use] -struct VoterWork { +struct VoterWork, RA, SC, VR> { voter: Box>> + Send>, - env: Arc>, + env: Arc>, voter_commands_rx: mpsc::UnboundedReceiver>>, } -impl VoterWork +impl VoterWork where Block: BlockT, + N: NetworkT + Sync, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -660,7 +662,7 @@ where fn new( client: Arc>, config: Config, - network: NetworkBridge, + network: NetworkBridge, select_chain: SC, voting_rule: VR, persistent_data: PersistentData, @@ -821,9 +823,10 @@ where } } -impl Future for VoterWork +impl Future for VoterWork where Block: BlockT, + N: NetworkT + Sync, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -880,7 +883,7 @@ pub fn run_grandpa, N, RA, SC, VR, X, Sp>( Block::Hash: Ord, B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Sync + Clone + 'static, SC: SelectChain + 'static, NumberFor: BlockNumberOps, DigestFor: Encode, @@ -906,7 +909,7 @@ pub fn setup_disabled_grandpa, RA, N>( B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, RA: Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Clone + 'static, { register_finality_tracker_inherent_data_provider( client, diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 350d9d31c0601..2cb3c18045ea3 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -32,10 +32,10 @@ use sp_core::{H256, Blake2Hasher}; use crate::{ global_communication, CommandOrError, CommunicationIn, Config, environment, - LinkHalf, Network, Error, aux_schema::PersistentData, VoterCommand, VoterSetState, + LinkHalf, Error, aux_schema::PersistentData, VoterCommand, VoterSetState, }; use crate::authorities::SharedAuthoritySet; -use crate::communication::NetworkBridge; +use crate::communication::{Network as NetworkT, NetworkBridge}; use crate::consensus_changes::SharedConsensusChanges; use sp_finality_grandpa::AuthorityId; @@ -160,7 +160,7 @@ pub fn run_grandpa_observer, N, RA, SC, Sp>( ) -> ::sp_blockchain::Result + Send + 'static> where B: Backend + 'static, E: CallExecutor + Send + Sync + 'static, - N: Network + Send + Clone + 'static, + N: NetworkT + Send + Clone + 'static, SC: SelectChain + 'static, NumberFor: BlockNumberOps, RA: Send + Sync + 'static, @@ -202,18 +202,19 @@ pub fn run_grandpa_observer, N, RA, SC, Sp>( /// Future that powers the observer. #[must_use] -struct ObserverWork, E, Backend, RA> { +struct ObserverWork, N: NetworkT, E, Backend, RA> { observer: Box>> + Send>, client: Arc>, - network: NetworkBridge, + network: NetworkBridge, persistent_data: PersistentData, keystore: Option, voter_commands_rx: mpsc::UnboundedReceiver>>, } -impl ObserverWork +impl ObserverWork where B: BlockT, + N: NetworkT, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, @@ -221,7 +222,7 @@ where { fn new( client: Arc>, - network: NetworkBridge, + network: NetworkBridge, persistent_data: PersistentData, keystore: Option, voter_commands_rx: mpsc::UnboundedReceiver>>, @@ -325,9 +326,10 @@ where } } -impl Future for ObserverWork +impl Future for ObserverWork where B: BlockT, + N: NetworkT, NumberFor: BlockNumberOps, RA: 'static + Send + Sync, E: CallExecutor + Send + Sync + 'static, diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 34d2fa6180b23..70b0f78cf3194 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -24,7 +24,7 @@ use sc_network::{Event, ReputationChange}; use futures::{prelude::*, channel::mpsc, compat::Compat01As03, task::SpawnExt as _}; use libp2p::PeerId; use parking_lot::Mutex; -use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::{sync::Arc, time::Duration}; /// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on @@ -234,19 +234,6 @@ impl GossipEngine { pub fn announce(&self, block: B::Hash, associated_data: Vec) { self.inner.lock().context_ext.announce(block, associated_data); } - - /// Notifies the sync service to try and sync the given block from the given - /// peers. - /// - /// If the given vector of peers is empty then the underlying implementation - /// should make a best effort to fetch the block from any peers it is - /// connected to (NOTE: this assumption will change in the future #3629). - /// - /// Note: this method isn't strictly related to gossiping and should eventually be moved - /// somewhere else. - pub fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - self.inner.lock().context_ext.set_sync_fork_request(peers, hash, number); - } } impl Clone for GossipEngine { @@ -287,15 +274,10 @@ impl> Context for ContextOverService { trait ContextExt { fn announce(&self, block: B::Hash, associated_data: Vec); - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); } impl> ContextExt for ContextOverService { fn announce(&self, block: B::Hash, associated_data: Vec) { Network::announce(&self.network, block, associated_data) } - - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - Network::set_sync_fork_request(&self.network, peers, hash, number) - } } diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index 5459123c41f05..86bc41af4b941 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -60,7 +60,7 @@ pub use self::state_machine::{Validator, ValidatorContext, ValidationResult}; pub use self::state_machine::DiscardAll; use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange}; -use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; +use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; use std::sync::Arc; mod bridge; @@ -93,17 +93,6 @@ pub trait Network { /// Note: this method isn't strictly related to gossiping and should eventually be moved /// somewhere else. fn announce(&self, block: B::Hash, associated_data: Vec); - - /// Notifies the sync service to try and sync the given block from the given - /// peers. - /// - /// If the given vector of peers is empty then the underlying implementation - /// should make a best effort to fetch the block from any peers it is - /// connected to (NOTE: this assumption will change in the future #3629). - /// - /// Note: this method isn't strictly related to gossiping and should eventually be moved - /// somewhere else. - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor); } impl, H: ExHashT> Network for Arc> { @@ -133,8 +122,4 @@ impl, H: ExHashT> Network for Arc) { NetworkService::announce_block(self, block, associated_data) } - - fn set_sync_fork_request(&self, peers: Vec, hash: B::Hash, number: NumberFor) { - NetworkService::set_sync_fork_request(self, peers, hash, number) - } }