Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use futures03::{compat::Compat, stream::StreamExt, future::FutureExt as _, futur
use finality_grandpa::Message::{Prevote, Precommit, PrimaryPropose};
use finality_grandpa::{voter, voter_set::VoterSet};
use log::{debug, trace};
use sc_network::ReputationChange;
use sc_network_gossip::{GossipEngine, Network};
use sc_network::{NetworkService, ReputationChange};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork};
use parity_scale_codec::{Encode, Decode};
use sp_core::Pair;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor};
Expand Down Expand Up @@ -95,6 +95,30 @@ mod benefit {
pub(super) const PER_EQUIVOCATION: i32 = 10;
}

/// A handle to the network.
///
/// Something that provides both the capabilities needed for the `gossip_network::Network` trait as
/// well as the ability to set a fork sync request for a particular block.
pub trait Network<Block: BlockT>: GossipNetwork<Block> + Clone + Send + 'static {
/// Notifies the sync service to try and sync the given block from the given
/// peers.
///
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}

impl<B, S, H> Network<B> for Arc<NetworkService<B, S, H>> where
B: BlockT,
S: sc_network::specialization::NetworkSpecialization<B>,
H: sc_network::ExHashT,
{
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
NetworkService::set_sync_fork_request(self, peers, hash, number)
}
}

/// Create a unique topic for a round and set-id combo.
pub(crate) fn round_topic<B: BlockT>(round: RoundNumber, set_id: SetIdNumber) -> B::Hash {
<<B::Header as HeaderT>::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes())
Expand All @@ -106,18 +130,19 @@ pub(crate) fn global_topic<B: BlockT>(set_id: SetIdNumber) -> B::Hash {
}

/// Bridge between the underlying network service, gossiping consensus messages and Grandpa
pub(crate) struct NetworkBridge<B: BlockT> {
pub(crate) struct NetworkBridge<B: BlockT, N: Network<B>> {
service: N,
gossip_engine: GossipEngine<B>,
validator: Arc<GossipValidator<B>>,
neighbor_sender: periodic::NeighborPacketSender<B>,
}

impl<B: BlockT> NetworkBridge<B> {
impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
/// Create a new NetworkBridge to the given NetworkService. Returns the service
/// handle.
/// On creation it will register previous rounds' votes with the gossip
/// service taken from the VoterSetState.
pub(crate) fn new<N: Network<B> + Clone + Send + 'static>(
pub(crate) fn new(
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
Expand All @@ -130,7 +155,7 @@ impl<B: BlockT> NetworkBridge<B> {
);

let validator = Arc::new(validator);
let gossip_engine = GossipEngine::new(service, executor, GRANDPA_ENGINE_ID, validator.clone());
let gossip_engine = GossipEngine::new(service.clone(), executor, GRANDPA_ENGINE_ID, validator.clone());

{
// register all previous votes with the gossip service so that they're
Expand Down Expand Up @@ -173,7 +198,7 @@ impl<B: BlockT> NetworkBridge<B> {
let (rebroadcast_job, neighbor_sender) = periodic::neighbor_packet_worker(gossip_engine.clone());
let reporting_job = report_stream.consume(gossip_engine.clone());

let bridge = NetworkBridge { gossip_engine, validator, neighbor_sender };
let bridge = NetworkBridge { service, gossip_engine, validator, neighbor_sender };

let executor = Compat::new(executor);
executor.execute(Box::new(rebroadcast_job.select(on_exit.clone().map(Ok).compat()).then(|_| Ok(()))))
Expand Down Expand Up @@ -356,8 +381,13 @@ impl<B: BlockT> NetworkBridge<B> {
/// If the given vector of peers is empty then the underlying implementation
/// should make a best effort to fetch the block from any peers it is
/// connected to (NOTE: this assumption will change in the future #3629).
pub(crate) fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: B::Hash, number: NumberFor<B>) {
self.gossip_engine.set_sync_fork_request(peers, hash, number)
pub(crate) fn set_sync_fork_request(
&self,
peers: Vec<sc_network::PeerId>,
hash: B::Hash,
number: NumberFor<B>
) {
Network::set_sync_fork_request(&self.service, peers, hash, number)
}
}

Expand Down Expand Up @@ -493,9 +523,10 @@ fn incoming_global<B: BlockT>(
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")))
}

impl<B: BlockT> Clone for NetworkBridge<B> {
impl<B: BlockT, N: Network<B>> Clone for NetworkBridge<B, N> {
fn clone(&self) -> Self {
NetworkBridge {
service: self.service.clone(),
gossip_engine: self.gossip_engine.clone(),
validator: Arc::clone(&self.validator),
neighbor_sender: self.neighbor_sender.clone(),
Expand Down
4 changes: 3 additions & 1 deletion client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ impl sc_network_gossip::Network<Block> for TestNetwork {
fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
}
}

impl super::Network<Block> for TestNetwork {
fn set_sync_fork_request(
&self,
_peers: Vec<sc_network::PeerId>,
Expand All @@ -94,7 +96,7 @@ impl sc_network_gossip::ValidatorContext<Block> for TestNetwork {
}

struct Tester {
net_handle: super::NetworkBridge<Block>,
net_handle: super::NetworkBridge<Block, TestNetwork>,
gossip_validator: Arc<GossipValidator<Block>>,
events: mpsc::UnboundedReceiver<Event>,
}
Expand Down
17 changes: 10 additions & 7 deletions client/finality-grandpa/src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::{
use sp_consensus::SelectChain;

use crate::authorities::{AuthoritySet, SharedAuthoritySet};
use crate::communication::Network as NetworkT;
use crate::consensus_changes::SharedConsensusChanges;
use crate::justification::GrandpaJustification;
use crate::until_imported::UntilVoteTargetImported;
Expand Down Expand Up @@ -376,20 +377,20 @@ impl<Block: BlockT> SharedVoterSetState<Block> {
}

/// The environment we run GRANDPA in.
pub(crate) struct Environment<B, E, Block: BlockT, RA, SC, VR> {
pub(crate) struct Environment<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
pub(crate) client: Arc<Client<B, E, Block, RA>>,
pub(crate) select_chain: SC,
pub(crate) voters: Arc<VoterSet<AuthorityId>>,
pub(crate) config: Config,
pub(crate) authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
pub(crate) consensus_changes: SharedConsensusChanges<Block::Hash, NumberFor<Block>>,
pub(crate) network: crate::communication::NetworkBridge<Block>,
pub(crate) network: crate::communication::NetworkBridge<Block, N>,
pub(crate) set_id: SetId,
pub(crate) voter_set_state: SharedVoterSetState<Block>,
pub(crate) voting_rule: VR,
}

impl<B, E, Block: BlockT, RA, SC, VR> Environment<B, E, Block, RA, SC, VR> {
impl<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> Environment<B, E, Block, N, RA, SC, VR> {
/// Updates the voter set state using the given closure. The write lock is
/// held during evaluation of the closure and the environment's voter set
/// state is set to its result if successful.
Expand All @@ -405,13 +406,14 @@ impl<B, E, Block: BlockT, RA, SC, VR> Environment<B, E, Block, RA, SC, VR> {
}
}

impl<Block: BlockT<Hash=H256>, B, E, RA, SC, VR>
impl<Block: BlockT<Hash=H256>, B, E, N, RA, SC, VR>
finality_grandpa::Chain<Block::Hash, NumberFor<Block>>
for Environment<B, E, Block, RA, SC, VR>
for Environment<B, E, Block, N, RA, SC, VR>
where
Block: 'static,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: NetworkT<Block> + 'static + Send,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>>,
RA: Send + Sync,
Expand Down Expand Up @@ -554,13 +556,14 @@ pub(crate) fn ancestry<B, Block: BlockT<Hash=H256>, E, RA>(
Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
}

impl<B, E, Block: BlockT<Hash=H256>, RA, SC, VR>
impl<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR>
voter::Environment<Block::Hash, NumberFor<Block>>
for Environment<B, E, Block, RA, SC, VR>
for Environment<B, E, Block, N, RA, SC, VR>
where
Block: 'static,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + 'static + Send + Sync,
N: NetworkT<Block> + 'static + Send,
RA: 'static + Send + Sync,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>>,
Expand Down
29 changes: 16 additions & 13 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ mod observer;
mod until_imported;
mod voting_rule;

pub use sc_network_gossip::Network;
pub use finality_proof::FinalityProofProvider;
pub use justification::GrandpaJustification;
pub use light_import::light_block_import;
Expand All @@ -103,7 +102,7 @@ use aux_schema::PersistentData;
use environment::{Environment, VoterSetState};
use import::GrandpaBlockImport;
use until_imported::UntilGlobalMessageBlocksImported;
use communication::NetworkBridge;
use communication::{NetworkBridge, Network as NetworkT};
use sp_finality_grandpa::{AuthorityList, AuthorityPair, AuthoritySignature, SetId};

// Re-export these two because it's just so damn convenient.
Expand Down Expand Up @@ -276,8 +275,9 @@ pub(crate) trait BlockSyncRequester<Block: BlockT> {
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>);
}

impl<Block> BlockSyncRequester<Block> for NetworkBridge<Block> where
impl<Block, Network> BlockSyncRequester<Block> for NetworkBridge<Block, Network> where
Block: BlockT,
Network: NetworkT<Block>,
{
fn set_sync_fork_request(&self, peers: Vec<sc_network::PeerId>, hash: Block::Hash, number: NumberFor<Block>) {
NetworkBridge::set_sync_fork_request(self, peers, hash, number)
Expand Down Expand Up @@ -446,11 +446,11 @@ where
))
}

fn global_communication<Block: BlockT<Hash=H256>, B, E, RA>(
fn global_communication<Block: BlockT<Hash=H256>, B, E, N, RA>(
set_id: SetId,
voters: &Arc<VoterSet<AuthorityId>>,
client: &Arc<Client<B, E, Block, RA>>,
network: &NetworkBridge<Block>,
network: &NetworkBridge<Block, N>,
keystore: &Option<KeyStorePtr>,
) -> (
impl Stream<
Expand All @@ -464,6 +464,7 @@ fn global_communication<Block: BlockT<Hash=H256>, B, E, RA>(
) where
B: Backend<Block, Blake2Hasher>,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync,
N: NetworkT<Block>,
RA: Send + Sync,
NumberFor<Block>: BlockNumberOps,
{
Expand Down Expand Up @@ -548,7 +549,7 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X, Sp>(
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + Send + Clone + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
SC: SelectChain<Block> + 'static,
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
NumberFor<Block>: BlockNumberOps,
Expand Down Expand Up @@ -641,15 +642,16 @@ pub fn run_grandpa_voter<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X, Sp>(

/// Future that powers the voter.
#[must_use]
struct VoterWork<B, E, Block: BlockT, RA, SC, VR> {
struct VoterWork<B, E, Block: BlockT, N: NetworkT<Block>, RA, SC, VR> {
voter: Box<dyn Future<Item = (), Error = CommandOrError<Block::Hash, NumberFor<Block>>> + Send>,
env: Arc<Environment<B, E, Block, RA, SC, VR>>,
env: Arc<Environment<B, E, Block, N, RA, SC, VR>>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<Block::Hash, NumberFor<Block>>>,
}

impl<B, E, Block, RA, SC, VR> VoterWork<B, E, Block, RA, SC, VR>
impl<B, E, Block, N, RA, SC, VR> VoterWork<B, E, Block, N, RA, SC, VR>
where
Block: BlockT<Hash=H256>,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
Expand All @@ -660,7 +662,7 @@ where
fn new(
client: Arc<Client<B, E, Block, RA>>,
config: Config,
network: NetworkBridge<Block>,
network: NetworkBridge<Block, N>,
select_chain: SC,
voting_rule: VR,
persistent_data: PersistentData<Block>,
Expand Down Expand Up @@ -821,9 +823,10 @@ where
}
}

impl<B, E, Block, RA, SC, VR> Future for VoterWork<B, E, Block, RA, SC, VR>
impl<B, E, Block, N, RA, SC, VR> Future for VoterWork<B, E, Block, N, RA, SC, VR>
where
Block: BlockT<Hash=H256>,
N: NetworkT<Block> + Sync,
NumberFor<Block>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
Expand Down Expand Up @@ -880,7 +883,7 @@ pub fn run_grandpa<B, E, Block: BlockT<Hash=H256>, N, RA, SC, VR, X, Sp>(
Block::Hash: Ord,
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + Send + Clone + 'static,
N: NetworkT<Block> + Send + Sync + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
DigestFor<Block>: Encode,
Expand All @@ -906,7 +909,7 @@ pub fn setup_disabled_grandpa<B, E, Block: BlockT<Hash=H256>, RA, N>(
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
RA: Send + Sync + 'static,
N: Network<Block> + Send + Clone + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
{
register_finality_tracker_inherent_data_provider(
client,
Expand Down
18 changes: 10 additions & 8 deletions client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use sp_core::{H256, Blake2Hasher};

use crate::{
global_communication, CommandOrError, CommunicationIn, Config, environment,
LinkHalf, Network, Error, aux_schema::PersistentData, VoterCommand, VoterSetState,
LinkHalf, Error, aux_schema::PersistentData, VoterCommand, VoterSetState,
};
use crate::authorities::SharedAuthoritySet;
use crate::communication::NetworkBridge;
use crate::communication::{Network as NetworkT, NetworkBridge};
use crate::consensus_changes::SharedConsensusChanges;
use sp_finality_grandpa::AuthorityId;

Expand Down Expand Up @@ -160,7 +160,7 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC, Sp>(
) -> ::sp_blockchain::Result<impl Future<Item=(),Error=()> + Send + 'static> where
B: Backend<Block, Blake2Hasher> + 'static,
E: CallExecutor<Block, Blake2Hasher> + Send + Sync + 'static,
N: Network<Block> + Send + Clone + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
RA: Send + Sync + 'static,
Expand Down Expand Up @@ -202,26 +202,27 @@ pub fn run_grandpa_observer<B, E, Block: BlockT<Hash=H256>, N, RA, SC, Sp>(

/// Future that powers the observer.
#[must_use]
struct ObserverWork<B: BlockT<Hash=H256>, E, Backend, RA> {
struct ObserverWork<B: BlockT<Hash=H256>, N: NetworkT<B>, E, Backend, RA> {
observer: Box<dyn Future<Item = (), Error = CommandOrError<B::Hash, NumberFor<B>>> + Send>,
client: Arc<Client<Backend, E, B, RA>>,
network: NetworkBridge<B>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
keystore: Option<sc_keystore::KeyStorePtr>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
}

impl<B, E, Bk, RA> ObserverWork<B, E, Bk, RA>
impl<B, N, E, Bk, RA> ObserverWork<B, N, E, Bk, RA>
where
B: BlockT<Hash=H256>,
N: NetworkT<B>,
NumberFor<B>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<B, Blake2Hasher> + Send + Sync + 'static,
Bk: Backend<B, Blake2Hasher> + 'static,
{
fn new(
client: Arc<Client<Bk, E, B, RA>>,
network: NetworkBridge<B>,
network: NetworkBridge<B, N>,
persistent_data: PersistentData<B>,
keystore: Option<sc_keystore::KeyStorePtr>,
voter_commands_rx: mpsc::UnboundedReceiver<VoterCommand<B::Hash, NumberFor<B>>>,
Expand Down Expand Up @@ -325,9 +326,10 @@ where
}
}

impl<B, E, Bk, RA> Future for ObserverWork<B, E, Bk, RA>
impl<B, N, E, Bk, RA> Future for ObserverWork<B, N, E, Bk, RA>
where
B: BlockT<Hash=H256>,
N: NetworkT<B>,
NumberFor<B>: BlockNumberOps,
RA: 'static + Send + Sync,
E: CallExecutor<B, Blake2Hasher> + Send + Sync + 'static,
Expand Down
Loading