diff --git a/network/Cargo.toml b/network/Cargo.toml index 0718898dab14..21387dce3ce2 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -14,6 +14,7 @@ polkadot-primitives = { path = "../primitives" } polkadot-erasure-coding = { path = "../erasure-coding" } codec = { package = "parity-scale-codec", version = "1.1.0", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } +sc-network-gossip = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } futures = "0.3.1" diff --git a/network/src/gossip.rs b/network/src/gossip.rs index 77d964e2a91a..b135267071dc 100644 --- a/network/src/gossip.rs +++ b/network/src/gossip.rs @@ -51,10 +51,10 @@ use sp_runtime::{generic::BlockId, traits::{ProvideRuntimeApi, BlakeTwo256, Hash as HashT}}; use sp_blockchain::Error as ClientError; -use sc_network::{config::Roles, PeerId, ReputationChange}; -use sc_network::consensus_gossip::{ - self as network_gossip, ValidationResult as GossipValidationResult, - ValidatorContext, MessageIntent, ConsensusMessage, +use sc_network::{config::Roles, Context, PeerId, ReputationChange}; +use sc_network_gossip::{ + ValidationResult as GossipValidationResult, + ValidatorContext, MessageIntent, }; use polkadot_validation::{SignedStatement}; use polkadot_primitives::{Block, Hash}; @@ -68,11 +68,12 @@ use std::collections::HashMap; use std::sync::Arc; use arrayvec::ArrayVec; +use futures03::{prelude::*, compat::Compat}; use parking_lot::RwLock; use log::warn; use super::PolkadotNetworkService; -use crate::router::attestation_topic; +use crate::{GossipMessageStream, NetworkService, PolkadotProtocol, router::attestation_topic}; use attestation::{View as AttestationView, PeerData as AttestationPeerData}; use message_routing::{View as MessageRoutingView}; @@ -133,7 +134,7 @@ mod cost { } /// A gossip message. -#[derive(Encode, Decode, Clone)] +#[derive(Encode, Decode, Clone, PartialEq)] pub enum GossipMessage { /// A packet sent to a neighbor but not relayed. #[codec(index = "1")] @@ -151,15 +152,6 @@ pub enum GossipMessage { ErasureChunk(ErasureChunkMessage), } -impl GossipMessage { - fn to_consensus_message(&self) -> ConsensusMessage { - ConsensusMessage { - data: self.encode(), - engine_id: POLKADOT_ENGINE_ID, - } - } -} - impl From for GossipMessage { fn from(packet: NeighborPacket) -> Self { GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet)) @@ -179,7 +171,7 @@ impl From for GossipMessage { } /// A gossip message containing a statement. -#[derive(Encode, Decode, Clone)] +#[derive(Encode, Decode, Clone, PartialEq)] pub struct GossipStatement { /// The block hash of the relay chain being referred to. In context, this should /// be a leaf. @@ -200,7 +192,7 @@ impl GossipStatement { /// A gossip message containing one erasure chunk of a candidate block. /// For each chunk of block erasure encoding one of this messages is constructed. -#[derive(Encode, Decode, Clone, Debug)] +#[derive(Encode, Decode, Clone, Debug, PartialEq)] pub struct ErasureChunkMessage { /// The chunk itself. pub chunk: PrimitiveChunk, @@ -221,7 +213,7 @@ impl From for GossipMessage { /// These are all the messages posted from one parachain to another during the /// execution of a single parachain block. Since this parachain block may have been /// included in many forks of the relay chain, there is no relay-chain leaf parameter. -#[derive(Encode, Decode, Clone)] +#[derive(Encode, Decode, Clone, PartialEq)] pub struct GossipParachainMessages { /// The root of the message queue. pub queue_root: Hash, @@ -241,7 +233,7 @@ impl GossipParachainMessages { } /// A versioned neighbor message. -#[derive(Encode, Decode, Clone)] +#[derive(Encode, Decode, Clone, PartialEq)] pub enum VersionedNeighborPacket { #[codec(index = "1")] V1(NeighborPacket), @@ -249,13 +241,13 @@ pub enum VersionedNeighborPacket { /// Contains information on which chain heads the peer is /// accepting messages for. -#[derive(Encode, Decode, Clone)] +#[derive(Encode, Decode, Clone, PartialEq)] pub struct NeighborPacket { chain_heads: Vec, } /// whether a block is known. -#[derive(Clone, Copy)] +#[derive(Clone, Copy, PartialEq)] pub enum Known { /// The block is a known leaf. Leaf, @@ -318,6 +310,7 @@ impl ChainContext for (F, P) where pub fn register_validator( service: Arc, chain: C, + executor: &impl futures03::task::Spawn, ) -> RegisteredMessageValidator { let s = service.clone(); @@ -338,19 +331,26 @@ pub fn register_validator( }); let gossip_side = validator.clone(); - service.with_gossip(|gossip, ctx| - gossip.register_validator(ctx, POLKADOT_ENGINE_ID, gossip_side) + let gossip_engine = sc_network_gossip::GossipEngine::new( + service.clone(), + executor, + POLKADOT_ENGINE_ID, + gossip_side ); - RegisteredMessageValidator { inner: validator as _ } + RegisteredMessageValidator { + inner: validator as _, + service: Some(service), + gossip_engine: Some(gossip_engine), + } } #[derive(PartialEq)] enum NewLeafAction { // (who, message) - TargetedMessage(PeerId, ConsensusMessage), + TargetedMessage(PeerId, GossipMessage), // (topic, message) - Multicast(Hash, ConsensusMessage), + Multicast(Hash, GossipMessage), } /// Actions to take after noting a new block-DAG leaf. @@ -365,15 +365,14 @@ impl NewLeafActions { /// Perform the queued actions, feeding into gossip. pub fn perform( self, - gossip: &mut dyn crate::GossipService, - ctx: &mut dyn sc_network::Context, + gossip: &dyn crate::NetworkService, ) { for action in self.actions { match action { NewLeafAction::TargetedMessage(who, message) - => gossip.send_message(ctx, &who, message), + => gossip.send_message(who, message), NewLeafAction::Multicast(topic, message) - => gossip.multicast(ctx, &topic, message), + => gossip.gossip_message(topic, message), } } } @@ -385,6 +384,10 @@ impl NewLeafActions { #[derive(Clone)] pub struct RegisteredMessageValidator { inner: Arc>, + // Note: this is always `Some` in real code and `None` in tests. + service: Option>, + // Note: this is always `Some` in real code and `None` in tests. + gossip_engine: Option>, } impl RegisteredMessageValidator { @@ -395,7 +398,11 @@ impl RegisteredMessageValidator { ) -> Self { let validator = Arc::new(MessageValidator::new_test(chain, report_handle)); - RegisteredMessageValidator { inner: validator as _ } + RegisteredMessageValidator { + inner: validator as _, + service: None, + gossip_engine: None, + } } pub fn register_availability_store(&mut self, availability_store: av_store::Store) { @@ -449,7 +456,7 @@ impl RegisteredMessageValidator { let message = GossipMessage::from(GossipParachainMessages { queue_root: *queue_root, messages, - }).to_consensus_message(); + }); actions.push(NewLeafAction::Multicast(*topic, message)); @@ -463,6 +470,49 @@ impl RegisteredMessageValidator { } } +impl NetworkService for RegisteredMessageValidator { + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { + let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() { + gossip_engine.messages_for(topic) + } else { + log::error!("Called gossip_messages_for on a test engine"); + futures03::channel::mpsc::unbounded().1 + }; + + GossipMessageStream::new(Box::new(Compat::new(topic_stream.map(Ok)))) + } + + fn gossip_message(&self, topic: Hash, message: GossipMessage) { + if let Some(gossip_engine) = self.gossip_engine.as_ref() { + gossip_engine.gossip_message( + topic, + message.encode(), + false, + ); + } else { + log::error!("Called gossip_message on a test engine"); + } + } + + fn send_message(&self, who: PeerId, message: GossipMessage) { + if let Some(gossip_engine) = self.gossip_engine.as_ref() { + gossip_engine.send_message(vec![who], message.encode()); + } else { + log::error!("Called send_message on a test engine"); + } + } + + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context) + { + if let Some(service) = self.service.as_ref() { + service.with_spec(with) + } else { + log::error!("Called with_spec on a test engine"); + } + } +} + /// The data needed for validating gossip messages. #[derive(Default)] pub(crate) struct MessageValidationData { @@ -585,13 +635,13 @@ impl Inner { } } - fn multicast_neighbor_packet( + fn multicast_neighbor_packet( &self, mut send_neighbor_packet: F, ) { let neighbor_packet = GossipMessage::from(NeighborPacket { chain_heads: self.attestation_view.neighbor_info().collect(), - }).to_consensus_message(); + }); for peer in self.peers.keys() { send_neighbor_packet(peer, neighbor_packet.clone()) @@ -628,7 +678,7 @@ impl MessageValidator { } } -impl network_gossip::Validator for MessageValidator { +impl sc_network_gossip::Validator for MessageValidator { fn new_peer(&self, _context: &mut dyn ValidatorContext, who: &PeerId, _roles: Roles) { let mut inner = self.inner.write(); inner.peers.insert(who.clone(), PeerData::default()); @@ -746,7 +796,7 @@ impl network_gossip::Validator for MessageValid #[cfg(test)] mod tests { use super::*; - use sc_network::consensus_gossip::Validator as ValidatorT; + use sc_network_gossip::Validator as ValidatorT; use std::sync::mpsc; use parking_lot::Mutex; use polkadot_primitives::parachain::{CandidateReceipt, HeadData}; @@ -776,7 +826,7 @@ mod tests { } } - impl network_gossip::ValidatorContext for MockValidatorContext { + impl sc_network_gossip::ValidatorContext for MockValidatorContext { fn broadcast_topic(&mut self, topic: Hash, force: bool) { self.events.push(ContextEvent::BroadcastTopic(topic, force)); } @@ -792,12 +842,12 @@ mod tests { } impl NewLeafActions { - fn has_message(&self, who: PeerId, message: ConsensusMessage) -> bool { + fn has_message(&self, who: PeerId, message: GossipMessage) -> bool { let x = NewLeafAction::TargetedMessage(who, message); self.actions.iter().find(|&m| m == &x).is_some() } - fn has_multicast(&self, topic: Hash, message: ConsensusMessage) -> bool { + fn has_multicast(&self, topic: Hash, message: GossipMessage) -> bool { let x = NewLeafAction::Multicast(topic, message); self.actions.iter().find(|&m| m == &x).is_some() } @@ -1082,12 +1132,12 @@ mod tests { assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket { chain_heads: vec![hash_a], - }).to_consensus_message())); + }))); assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages { queue_root: root_a, messages: root_a_messages.clone(), - }).to_consensus_message())); + }))); } // ensure that we are allowed to multicast to a peer with same chain head, @@ -1154,12 +1204,12 @@ mod tests { assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket { chain_heads: vec![hash_a], - }).to_consensus_message())); + }))); assert!(actions.has_multicast(root_a_topic, GossipMessage::from(GossipParachainMessages { queue_root: root_a, messages: root_a_messages.clone(), - }).to_consensus_message())); + }))); } // ensure that we are not allowed to multicast to either peer, as they @@ -1168,12 +1218,12 @@ mod tests { let message = GossipMessage::from(GossipParachainMessages { queue_root: root_a, messages: root_a_messages.clone(), - }).encode(); + }); let mut allowed = validator.inner.message_allowed(); let intent = MessageIntent::Broadcast; - assert!(!allowed(&peer_a, intent, &root_a_topic, &message[..])); - assert!(!allowed(&peer_b, intent, &root_a_topic, &message[..])); + assert!(!allowed(&peer_a, intent, &root_a_topic, &message.encode())); + assert!(!allowed(&peer_b, intent, &root_a_topic, &message.encode())); } // peer A gets updated to the chain head. now we'll attempt to broadcast @@ -1259,17 +1309,17 @@ mod tests { let queue_messages = GossipMessage::from(GossipParachainMessages { queue_root: root_a, messages: root_a_messages.clone(), - }).to_consensus_message(); + }); let not_queue_messages = GossipMessage::from(GossipParachainMessages { queue_root: root_a, messages: not_root_a_messages.clone(), - }).encode(); + }); let queue_messages_wrong_root = GossipMessage::from(GossipParachainMessages { queue_root: not_root_a, messages: root_a_messages.clone(), - }).encode(); + }); // ensure that we attempt to multicast all relevant queues after noting a leaf. { @@ -1281,7 +1331,7 @@ mod tests { assert!(actions.has_message(peer_a.clone(), GossipMessage::from(NeighborPacket { chain_heads: vec![hash_a], - }).to_consensus_message())); + }))); // we don't know this queue! no broadcast :( assert!(!actions.has_multicast(root_a_topic, queue_messages.clone())); @@ -1292,7 +1342,7 @@ mod tests { let res = validator.inner.validate( &mut validator_context, &peer_a, - &queue_messages_wrong_root[..], + &queue_messages_wrong_root.encode(), ); match res { @@ -1308,7 +1358,7 @@ mod tests { let res = validator.inner.validate( &mut validator_context, &peer_a, - ¬_queue_messages[..], + ¬_queue_messages.encode(), ); match res { @@ -1324,7 +1374,7 @@ mod tests { let res = validator.inner.validate( &mut validator_context, &peer_a, - &queue_messages.data[..], + &queue_messages.encode(), ); match res { @@ -1333,7 +1383,7 @@ mod tests { } assert_eq!(validator_context.events, vec![ - ContextEvent::BroadcastMessage(root_a_topic, queue_messages.data.clone(), false), + ContextEvent::BroadcastMessage(root_a_topic, queue_messages.encode(), false), ]); } } diff --git a/network/src/gossip/attestation.rs b/network/src/gossip/attestation.rs index 677bfe1e0bd8..d8fc4c50a1fc 100644 --- a/network/src/gossip/attestation.rs +++ b/network/src/gossip/attestation.rs @@ -30,7 +30,7 @@ //! a `Candidate` we are aware of. Otherwise, it is possible we could be forced to //! consider an infinite amount of attestations produced by a misbehaving validator. -use sc_network::consensus_gossip::{ValidationResult as GossipValidationResult}; +use sc_network_gossip::{ValidationResult as GossipValidationResult}; use sc_network::ReputationChange; use polkadot_validation::GenericStatement; use polkadot_primitives::Hash; diff --git a/network/src/lib.rs b/network/src/lib.rs index 312e0ed69d2a..c35cbabb65f0 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -28,7 +28,7 @@ pub mod gossip; use codec::{Decode, Encode}; use futures::channel::{oneshot, mpsc}; use futures::prelude::*; -use futures::future::Either; +use futures::{compat::Stream01CompatExt, future::Either}; use polkadot_primitives::{Block, Hash, Header}; use polkadot_primitives::parachain::{ Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, @@ -38,9 +38,7 @@ use sc_network::{ PeerId, RequestId, Context, StatusMessage as GenericFullStatus, specialization::NetworkSpecialization as Specialization, }; -use sc_network::consensus_gossip::{ - self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage, -}; +use sc_network_gossip::TopicNotification; use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey}; use self::collator_pool::{CollatorPool, Role, Action}; use self::local_collations::LocalCollations; @@ -50,7 +48,7 @@ use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::task::{Context as PollContext, Poll}; -use crate::gossip::{POLKADOT_ENGINE_ID, GossipMessage, ErasureChunkMessage}; +use crate::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator}; #[cfg(test)] mod tests; @@ -91,13 +89,12 @@ pub trait NetworkService: Send + Sync + 'static { /// Gossip a message on given topic. fn gossip_message(&self, topic: Hash, message: GossipMessage); - /// Execute a closure with the gossip service. - fn with_gossip(&self, with: F) - where F: FnOnce(&mut dyn GossipService, &mut dyn Context); + /// Send a message to a specific peer we're connected to. + fn send_message(&self, who: PeerId, message: GossipMessage); /// Execute a closure with the polkadot protocol. fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context); + where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context); } /// This is a newtype that implements a [`ProvideGossipMessages`] shim trait. @@ -107,11 +104,10 @@ pub trait NetworkService: Send + Sync + 'static { /// /// [`NetworkService`]: ./trait.NetworkService.html /// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html -pub struct AvailabilityNetworkShim(pub std::sync::Arc); +#[derive(Clone)] +pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator); -impl av_store::ProvideGossipMessages for AvailabilityNetworkShim - where T: NetworkService -{ +impl av_store::ProvideGossipMessages for AvailabilityNetworkShim { fn gossip_messages_for(&self, topic: Hash) -> Pin + Send>> { @@ -146,67 +142,6 @@ impl av_store::ProvideGossipMessages for AvailabilityNetworkShim } } -impl Clone for AvailabilityNetworkShim { - fn clone(&self) -> Self { - AvailabilityNetworkShim(self.0.clone()) - } -} - -impl NetworkService for PolkadotNetworkService { - fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { - let (tx, rx) = std::sync::mpsc::channel(); - - PolkadotNetworkService::with_gossip(self, move |gossip, _| { - let inner_rx = gossip.messages_for(POLKADOT_ENGINE_ID, topic); - let _ = tx.send(inner_rx); - }); - - let topic_stream = match rx.recv() { - Ok(rx) => rx, - Err(_) => mpsc::unbounded().1, // return empty channel. - }; - - GossipMessageStream::new(topic_stream.boxed()) - } - - fn gossip_message(&self, topic: Hash, message: GossipMessage) { - self.gossip_consensus_message( - topic, - POLKADOT_ENGINE_ID, - message.encode(), - GossipMessageRecipient::BroadcastToAll, - ); - } - - fn with_gossip(&self, with: F) - where F: FnOnce(&mut dyn GossipService, &mut dyn Context) - { - PolkadotNetworkService::with_gossip(self, move |gossip, ctx| with(gossip, ctx)) - } - - fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context) - { - PolkadotNetworkService::with_spec(self, with) - } -} - -/// A gossip network subservice. -pub trait GossipService { - fn send_message(&mut self, ctx: &mut dyn Context, who: &PeerId, message: ConsensusMessage); - fn multicast(&mut self, ctx: &mut dyn Context, topic: &Hash, message: ConsensusMessage); -} - -impl GossipService for consensus_gossip::ConsensusGossip { - fn send_message(&mut self, ctx: &mut dyn Context, who: &PeerId, message: ConsensusMessage) { - consensus_gossip::ConsensusGossip::send_message(self, ctx, who, message) - } - - fn multicast(&mut self, ctx: &mut dyn Context, topic: &Hash, message: ConsensusMessage) { - consensus_gossip::ConsensusGossip::multicast(self, ctx, *topic, message, false) - } -} - /// A stream of gossip messages and an optional sender for a topic. pub struct GossipMessageStream { topic_stream: Pin + Send>>, diff --git a/network/src/router.rs b/network/src/router.rs index 78922a1307ec..00553d7f9906 100644 --- a/network/src/router.rs +++ b/network/src/router.rs @@ -71,18 +71,18 @@ pub(crate) fn checked_statements(network: &N, topic: Hash) -> } /// Table routing implementation. -pub struct Router { +pub struct Router { table: Arc, attestation_topic: Hash, - fetcher: LeafWorkDataFetcher, + fetcher: LeafWorkDataFetcher, deferred_statements: Arc>, message_validator: RegisteredMessageValidator, } -impl Router { +impl Router { pub(crate) fn new( table: Arc, - fetcher: LeafWorkDataFetcher, + fetcher: LeafWorkDataFetcher, message_validator: RegisteredMessageValidator, ) -> Self { let parent_hash = fetcher.parent_hash(); @@ -102,19 +102,19 @@ impl Router { /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. pub(crate) fn checked_statements(&self) -> impl Stream { - checked_statements(&**self.network(), self.attestation_topic) + checked_statements(&*self.network(), self.attestation_topic) } fn parent_hash(&self) -> Hash { self.fetcher.parent_hash() } - fn network(&self) -> &Arc { + fn network(&self) -> &RegisteredMessageValidator { self.fetcher.network() } } -impl Clone for Router { +impl Clone for Router { fn clone(&self) -> Self { Router { table: self.table.clone(), @@ -126,9 +126,8 @@ impl Clone for Router { } } -impl Router where +impl Router where P::Api: ParachainHost, - N: NetworkService, T: Clone + Executor + Send + 'static, E: Future + Clone + Send + Unpin + 'static, { @@ -225,9 +224,8 @@ impl Router w } } -impl TableRouter for Router where +impl TableRouter for Router where P::Api: ParachainHost, - N: NetworkService, T: Clone + Executor + Send + 'static, E: Future + Clone + Send + 'static, { @@ -283,7 +281,7 @@ impl TableRouter for Router wh } } -impl Drop for Router { +impl Drop for Router { fn drop(&mut self) { let parent_hash = self.parent_hash(); self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); }); diff --git a/network/src/tests/validation.rs b/network/src/tests/validation.rs index fc976f9bdea7..2d4cd4610631 100644 --- a/network/src/tests/validation.rs +++ b/network/src/tests/validation.rs @@ -19,11 +19,11 @@ #![allow(unused)] use crate::gossip::GossipMessage; -use sc_network::Context as NetContext; -use sc_network::consensus_gossip::TopicNotification; +use sc_network::{Context as NetContext, PeerId}; +use sc_network_gossip::TopicNotification; use sp_core::{NativeOrEncoded, ExecutionContext}; use sp_keyring::Sr25519Keyring; -use crate::{GossipService, PolkadotProtocol, NetworkService, GossipMessageStream}; +use crate::{PolkadotProtocol, NetworkService, GossipMessageStream}; use polkadot_validation::{SharedTable, Network}; use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId}; @@ -153,17 +153,15 @@ impl NetworkService for TestNetwork { GossipMessageStream::new(rx.boxed()) } + fn send_message(&self, _: PeerId, _: GossipMessage) { + unimplemented!() + } + fn gossip_message(&self, topic: Hash, message: GossipMessage) { let notification = TopicNotification { message: message.encode(), sender: None }; let _ = self.gossip.send_message.unbounded_send((topic, notification)); } - fn with_gossip(&self, with: F) - where F: FnOnce(&mut dyn GossipService, &mut dyn NetContext) - { - unimplemented!() - } - fn with_spec(&self, with: F) where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext) { @@ -339,7 +337,6 @@ impl ParachainHost for RuntimeApi { type TestValidationNetwork = crate::validation::ValidationNetwork< TestApi, NeverExit, - TestNetwork, TaskExecutor, >; @@ -366,9 +363,8 @@ fn build_network(n: usize, executor: TaskExecutor) -> Built { ); TestValidationNetwork::new( - net, - NeverExit, message_val, + NeverExit, runtime_api.clone(), executor.clone(), ) diff --git a/network/src/validation.rs b/network/src/validation.rs index f6e652fae280..53a3c59730af 100644 --- a/network/src/validation.rs +++ b/network/src/validation.rs @@ -63,44 +63,40 @@ pub struct LeafWorkParams { } /// Wrapper around the network service -pub struct ValidationNetwork { - network: Arc, +pub struct ValidationNetwork { api: Arc

, executor: T, - message_validator: RegisteredMessageValidator, + network: RegisteredMessageValidator, exit: E, } -impl ValidationNetwork { +impl ValidationNetwork { /// Create a new consensus networking object. pub fn new( - network: Arc, + network: RegisteredMessageValidator, exit: E, - message_validator: RegisteredMessageValidator, api: Arc

, executor: T, ) -> Self { - ValidationNetwork { network, exit, message_validator, api, executor } + ValidationNetwork { network, exit, api, executor } } } -impl Clone for ValidationNetwork { +impl Clone for ValidationNetwork { fn clone(&self) -> Self { ValidationNetwork { network: self.network.clone(), exit: self.exit.clone(), api: self.api.clone(), executor: self.executor.clone(), - message_validator: self.message_validator.clone(), } } } -impl ValidationNetwork where +impl ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, E: Clone + Future + Send + Sync + 'static, - N: NetworkService, T: Clone + Executor + Send + Sync + 'static, { /// Instantiate block-DAG leaf work @@ -118,27 +114,26 @@ impl ValidationNetwork where /// leaf-work instances safely, but they should all be coordinated on which session keys /// are being used. pub fn instantiate_leaf_work(&self, params: LeafWorkParams) - -> oneshot::Receiver> + -> oneshot::Receiver> { let parent_hash = params.parent_hash; let network = self.network.clone(); let api = self.api.clone(); let task_executor = self.executor.clone(); let exit = self.exit.clone(); - let message_validator = self.message_validator.clone(); let authorities = params.authorities.clone(); let (tx, rx) = oneshot::channel(); self.network.with_spec(move |spec, ctx| { - let actions = message_validator.new_local_leaf( + let actions = network.new_local_leaf( parent_hash, MessageValidationData { authorities }, |queue_root| spec.availability_store.as_ref() .and_then(|store| store.queue_by_root(queue_root)) ); - network.with_gossip(move |gossip, ctx| actions.perform(gossip, ctx)); + actions.perform(&network); let work = spec.new_validation_leaf_work(ctx, params); let _ = tx.send(LeafWorkDataFetcher { @@ -148,7 +143,6 @@ impl ValidationNetwork where parent_hash, knowledge: work.knowledge().clone(), exit, - message_validator, }); }); @@ -156,7 +150,7 @@ impl ValidationNetwork where } } -impl ValidationNetwork where N: NetworkService { +impl ValidationNetwork { /// Convert the given `CollatorId` to a `PeerId`. pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> impl Future> + Send @@ -178,20 +172,19 @@ impl ValidationNetwork where N: NetworkService { /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream { - crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent)) + crate::router::checked_statements(&self.network, crate::router::attestation_topic(relay_parent)) } } /// A long-lived network which can create parachain statement routing processes on demand. -impl ParachainNetwork for ValidationNetwork where +impl ParachainNetwork for ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, E: Clone + Future + Send + Sync + Unpin + 'static, - N: NetworkService, T: Clone + Executor + Send + Sync + 'static, { type Error = String; - type TableRouter = Router; + type TableRouter = Router; type BuildTableRouter = Box> + Send + Unpin>; fn communication_for( @@ -208,16 +201,16 @@ impl ParachainNetwork for ValidationNetwork where parent_hash, authorities: authorities.to_vec(), }); - let message_validator = self.message_validator.clone(); let executor = self.executor.clone(); + let network = self.network.clone(); let work = build_fetcher .map_err(|e| format!("{:?}", e)) .map_ok(move |fetcher| { let table_router = Router::new( table, fetcher, - message_validator, + network, ); let table_router_clone = table_router.clone(); @@ -268,10 +261,9 @@ impl Future for AwaitingCollation { } } -impl Collators for ValidationNetwork where +impl Collators for ValidationNetwork where P: ProvideRuntimeApi + Send + Sync + 'static, P::Api: ParachainHost, - N: NetworkService, { type Error = NetworkDown; type Collation = AwaitingCollation; @@ -595,17 +587,16 @@ impl Future for PoVReceiver { } /// Can fetch data for a given validation leaf-work instance. -pub struct LeafWorkDataFetcher { - network: Arc, +pub struct LeafWorkDataFetcher { + network: RegisteredMessageValidator, api: Arc

, exit: E, task_executor: T, knowledge: Arc>, parent_hash: Hash, - message_validator: RegisteredMessageValidator, } -impl LeafWorkDataFetcher { +impl LeafWorkDataFetcher { /// Get the parent hash. pub(crate) fn parent_hash(&self) -> Hash { self.parent_hash @@ -622,7 +613,7 @@ impl LeafWorkDataFetcher { } /// Get the network service. - pub(crate) fn network(&self) -> &Arc { + pub(crate) fn network(&self) -> &RegisteredMessageValidator { &self.network } @@ -637,7 +628,7 @@ impl LeafWorkDataFetcher { } } -impl Clone for LeafWorkDataFetcher { +impl Clone for LeafWorkDataFetcher { fn clone(&self) -> Self { LeafWorkDataFetcher { network: self.network.clone(), @@ -646,14 +637,12 @@ impl Clone for LeafWorkDataFetcher LeafWorkDataFetcher where +impl LeafWorkDataFetcher where P::Api: ParachainHost, - N: NetworkService, T: Clone + Executor + Send + 'static, E: Future + Clone + Send + 'static, { diff --git a/service/src/lib.rs b/service/src/lib.rs index 13c4ad11595e..c391787d001d 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -238,6 +238,7 @@ pub fn new_full(config: Configuration) let mut gossip_validator = network_gossip::register_validator( service.network(), (is_known, client.clone()), + &service.spawn_task_handle(), ); if participates_in_consensus { @@ -247,7 +248,7 @@ pub fn new_full(config: Configuration) let mut path = PathBuf::from(db_path); path.push("availability"); - let gossip = polkadot_network::AvailabilityNetworkShim(service.network()); + let gossip = polkadot_network::AvailabilityNetworkShim(gossip_validator.clone()); #[cfg(not(target_os = "unknown"))] { @@ -275,9 +276,8 @@ pub fn new_full(config: Configuration) // collator connections and validation network both fulfilled by this let validation_network = ValidationNetwork::new( - service.network(), - service.on_exit(), gossip_validator, + service.on_exit(), service.client(), WrappedExecutor(service.spawn_task_handle()), ); @@ -374,6 +374,7 @@ pub fn new_full(config: Configuration) on_exit: service.on_exit(), telemetry_on_connect: Some(service.telemetry_on_connect_stream()), voting_rule: grandpa::VotingRulesBuilder::default().build(), + executor: service.spawn_task_handle(), }; service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?); } else {