diff --git a/Cargo.lock b/Cargo.lock index 9fdd1dbb60d7..0e6f3499f80c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3772,8 +3772,11 @@ name = "polkadot-network" version = "0.7.20" dependencies = [ "arrayvec 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)", + "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "exit-future 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/collator/src/lib.rs b/collator/src/lib.rs index bdf8e6ce57c5..11b6d614fe26 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -65,10 +65,10 @@ use polkadot_cli::{ ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama, service::{self, Roles, SelectChain} }; -use polkadot_network::validation::{LeafWorkParams, ValidationNetwork}; +use polkadot_network::legacy::validation::{LeafWorkParams, ValidationNetwork}; pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration}; -pub use polkadot_network::validation::Incoming; +pub use polkadot_network::legacy::validation::Incoming; pub use polkadot_validation::SignedStatement; pub use polkadot_primitives::parachain::CollatorId; pub use sc_network::PeerId; @@ -316,7 +316,7 @@ fn run_collator_node( let is_known = move |block_hash: &Hash| { use consensus_common::BlockStatus; - use polkadot_network::gossip::Known; + use polkadot_network::legacy::gossip::Known; match known_oracle.block_status(&BlockId::hash(*block_hash)) { Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, @@ -333,7 +333,7 @@ fn run_collator_node( } }; - let message_validator = polkadot_network::gossip::register_validator( + let message_validator = polkadot_network::legacy::gossip::register_validator( network.clone(), (is_known, client.clone()), &spawner, diff --git a/network/Cargo.toml b/network/Cargo.toml index 6f26823d0513..655f00973383 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -7,7 +7,9 @@ edition = "2018" [dependencies] arrayvec = "0.4.12" +bytes = "0.5" parking_lot = "0.9.0" +derive_more = "0.14.1" av_store = { package = "polkadot-availability-store", path = "../availability-store" } polkadot-validation = { path = "../validation" } polkadot-primitives = { path = "../primitives" } @@ -20,6 +22,7 @@ sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "polkad futures = "0.3.4" log = "0.4.8" exit-future = "0.2.0" +futures-timer = "2.0" sc-client = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } sp-api = { git = "https://github.com/paritytech/substrate", branch = "polkadot-master" } diff --git a/network/src/collator_pool.rs b/network/src/legacy/collator_pool.rs similarity index 99% rename from network/src/collator_pool.rs rename to network/src/legacy/collator_pool.rs index 892de0b5160e..89b7ba3a5209 100644 --- a/network/src/collator_pool.rs +++ b/network/src/legacy/collator_pool.rs @@ -118,6 +118,7 @@ struct ParachainCollators { } /// Manages connected collators and role assignments from the perspective of a validator. +#[derive(Default)] pub struct CollatorPool { collators: HashMap, parachain_collators: HashMap, diff --git a/network/src/gossip/attestation.rs b/network/src/legacy/gossip/attestation.rs similarity index 99% rename from network/src/gossip/attestation.rs rename to network/src/legacy/gossip/attestation.rs index 62e06e0a1261..ad7c4e770502 100644 --- a/network/src/gossip/attestation.rs +++ b/network/src/legacy/gossip/attestation.rs @@ -38,7 +38,7 @@ use polkadot_primitives::Hash; use std::collections::{HashMap, HashSet}; use log::warn; -use crate::router::attestation_topic; +use crate::legacy::router::attestation_topic; use super::{cost, benefit, MAX_CHAIN_HEADS, LeavesVec, ChainContext, Known, MessageValidationData, GossipStatement diff --git a/network/src/gossip/message_routing.rs b/network/src/legacy/gossip/message_routing.rs similarity index 99% rename from network/src/gossip/message_routing.rs rename to network/src/legacy/gossip/message_routing.rs index 314539a67bcd..66debb945e33 100644 --- a/network/src/gossip/message_routing.rs +++ b/network/src/legacy/gossip/message_routing.rs @@ -210,8 +210,8 @@ impl View { #[cfg(test)] mod tests { use super::*; - use crate::tests::TestChainContext; - use crate::gossip::{Known, GossipParachainMessages}; + use crate::legacy::tests::TestChainContext; + use crate::legacy::gossip::{Known, GossipParachainMessages}; use polkadot_primitives::parachain::Message as ParachainMessage; fn hash(x: u8) -> Hash { diff --git a/network/src/gossip.rs b/network/src/legacy/gossip/mod.rs similarity index 96% rename from network/src/gossip.rs rename to network/src/legacy/gossip/mod.rs index 825f1633d2fa..078131847b3e 100644 --- a/network/src/gossip.rs +++ b/network/src/legacy/gossip/mod.rs @@ -52,6 +52,7 @@ use sp_runtime::{generic::BlockId, traits::{BlakeTwo256, Hash as HashT}}; use sp_blockchain::Error as ClientError; use sc_network::{config::Roles, Context, PeerId, ReputationChange}; +use sc_network::{NetworkService as SubstrateNetworkService, specialization::NetworkSpecialization}; use sc_network_gossip::{ ValidationResult as GossipValidationResult, ValidatorContext, MessageIntent, @@ -73,8 +74,7 @@ use futures::prelude::*; use parking_lot::RwLock; use log::warn; -use super::PolkadotNetworkService; -use crate::{GossipMessageStream, NetworkService, PolkadotProtocol, router::attestation_topic}; +use crate::legacy::{GossipMessageStream, NetworkService, GossipService, PolkadotProtocol, router::attestation_topic}; use attestation::{View as AttestationView, PeerData as AttestationPeerData}; use message_routing::{View as MessageRoutingView}; @@ -308,11 +308,11 @@ impl ChainContext for (F, P) where // NOTE: since RegisteredMessageValidator is meant to be a type-safe proof // that we've actually done the registration, this should be the only way // to construct it outside of tests. -pub fn register_validator( - service: Arc, +pub fn register_validator>( + service: Arc>, chain: C, executor: &impl futures::task::Spawn, -) -> RegisteredMessageValidator +) -> RegisteredMessageValidator { let s = service.clone(); let report_handle = Box::new(move |peer: &PeerId, cost_benefit: ReputationChange| { @@ -366,7 +366,7 @@ impl NewLeafActions { /// Perform the queued actions, feeding into gossip. pub fn perform( self, - gossip: &dyn crate::NetworkService, + gossip: &dyn crate::legacy::GossipService, ) { for action in self.actions { match action { @@ -382,16 +382,25 @@ impl NewLeafActions { /// A registered message validator. /// /// Create this using `register_validator`. -#[derive(Clone)] -pub struct RegisteredMessageValidator { +pub struct RegisteredMessageValidator> { inner: Arc>, // Note: this is always `Some` in real code and `None` in tests. - service: Option>, + service: Option>>, // Note: this is always `Some` in real code and `None` in tests. gossip_engine: Option>, } -impl RegisteredMessageValidator { +impl> Clone for RegisteredMessageValidator { + fn clone(&self) -> Self { + RegisteredMessageValidator { + inner: self.inner.clone(), + service: self.service.clone(), + gossip_engine: self.gossip_engine.clone(), + } + } +} + +impl RegisteredMessageValidator { #[cfg(test)] pub(crate) fn new_test( chain: C, @@ -405,7 +414,9 @@ impl RegisteredMessageValidator { gossip_engine: None, } } +} +impl> RegisteredMessageValidator { pub fn register_availability_store(&mut self, availability_store: av_store::Store) { self.inner.inner.write().availability_store = Some(availability_store); } @@ -469,10 +480,8 @@ impl RegisteredMessageValidator { NewLeafActions { actions } } -} -impl NetworkService for RegisteredMessageValidator { - fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { + pub(crate) fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let topic_stream = if let Some(gossip_engine) = self.gossip_engine.as_ref() { gossip_engine.messages_for(topic) } else { @@ -483,7 +492,7 @@ impl NetworkService for RegisteredMessageValidator { GossipMessageStream::new(topic_stream.boxed()) } - fn gossip_message(&self, topic: Hash, message: GossipMessage) { + pub(crate) fn gossip_message(&self, topic: Hash, message: GossipMessage) { if let Some(gossip_engine) = self.gossip_engine.as_ref() { gossip_engine.gossip_message( topic, @@ -495,14 +504,30 @@ impl NetworkService for RegisteredMessageValidator { } } - fn send_message(&self, who: PeerId, message: GossipMessage) { + pub(crate) fn send_message(&self, who: PeerId, message: GossipMessage) { if let Some(gossip_engine) = self.gossip_engine.as_ref() { gossip_engine.send_message(vec![who], message.encode()); } else { log::error!("Called send_message on a test engine"); } } +} + +impl> GossipService for RegisteredMessageValidator { + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { + RegisteredMessageValidator::gossip_messages_for(self, topic) + } + + fn gossip_message(&self, topic: Hash, message: GossipMessage) { + RegisteredMessageValidator::gossip_message(self, topic, message) + } + + fn send_message(&self, who: PeerId, message: GossipMessage) { + RegisteredMessageValidator::send_message(self, who, message) + } +} +impl NetworkService for RegisteredMessageValidator { fn with_spec(&self, with: F) where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context) { @@ -806,7 +831,7 @@ mod tests { use polkadot_validation::GenericStatement; use super::message_routing::queue_topic; - use crate::tests::TestChainContext; + use crate::legacy::tests::TestChainContext; #[derive(PartialEq, Clone, Debug)] enum ContextEvent { diff --git a/network/src/local_collations.rs b/network/src/legacy/local_collations.rs similarity index 97% rename from network/src/local_collations.rs rename to network/src/legacy/local_collations.rs index 85bb1d643b45..f1a6615e88b8 100644 --- a/network/src/local_collations.rs +++ b/network/src/legacy/local_collations.rs @@ -20,7 +20,7 @@ //! a validator changes his session key, or when they are generated. use polkadot_primitives::{Hash, parachain::{ValidatorId}}; -use crate::collator_pool::Role; +use crate::legacy::collator_pool::Role; use std::collections::{HashMap, HashSet}; use std::time::Duration; use wasm_timer::Instant; @@ -39,6 +39,12 @@ pub struct LocalCollations { local_collations: HashMap>, } +impl Default for LocalCollations { + fn default() -> Self { + Self::new() + } +} + impl LocalCollations { /// Create a new `LocalCollations` tracker. pub fn new() -> Self { diff --git a/network/src/legacy/mod.rs b/network/src/legacy/mod.rs new file mode 100644 index 000000000000..553cc8ff36cf --- /dev/null +++ b/network/src/legacy/mod.rs @@ -0,0 +1,790 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Polkadot-specific network implementation. +//! +//! This manages routing for parachain statements, parachain block and outgoing message +//! data fetching, communication between collators and validators, and more. + +pub mod collator_pool; +pub mod local_collations; +pub mod router; +pub mod validation; +pub mod gossip; + +use codec::{Decode, Encode}; +use futures::channel::oneshot; +use futures::prelude::*; +use polkadot_primitives::{Block, Hash, Header}; +use polkadot_primitives::parachain::{ + Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, + StructuredUnroutedIngress, ValidatorId, OutgoingMessages, ErasureChunk, +}; +use sc_network::{ + PeerId, RequestId, Context, StatusMessage as GenericFullStatus, + specialization::NetworkSpecialization as Specialization, +}; +use sc_network_gossip::TopicNotification; +use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey}; +use self::collator_pool::{CollatorPool, Role, Action}; +use self::local_collations::LocalCollations; +use log::{trace, debug, warn}; + +use std::collections::{HashMap, HashSet}; +use std::pin::Pin; +use std::task::{Context as PollContext, Poll}; + +use self::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator}; +use crate::{cost, benefit}; + +#[cfg(test)] +mod tests; + +type FullStatus = GenericFullStatus; + +/// Specialization of the network service for the polkadot protocol. +pub type PolkadotNetworkService = sc_network::NetworkService; + +/// Basic gossip functionality that a network has to fulfill. +pub trait GossipService: Send + Sync + 'static { + /// Get a stream of gossip messages for a given hash. + fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream; + + /// Gossip a message on given topic. + fn gossip_message(&self, topic: Hash, message: GossipMessage); + + /// Send a message to a specific peer we're connected to. + fn send_message(&self, who: PeerId, message: GossipMessage); +} + +/// Basic functionality that a network has to fulfill. +pub trait NetworkService: GossipService + Send + Sync + 'static { + /// Execute a closure with the polkadot protocol. + fn with_spec(&self, with: F) + where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context); +} + +/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait. +/// +/// For any wrapped [`NetworkService`] type it implements a [`ProvideGossipMessages`]. +/// For more details see documentation of [`ProvideGossipMessages`]. +/// +/// [`NetworkService`]: ./trait.NetworkService.html +/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html +#[derive(Clone)] +pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator); + +impl av_store::ProvideGossipMessages for AvailabilityNetworkShim { + fn gossip_messages_for(&self, topic: Hash) + -> Pin + Send>> + { + self.0.gossip_messages_for(topic) + .filter_map(|(msg, _)| async move { + match msg { + GossipMessage::ErasureChunk(chunk) => { + Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk)) + }, + _ => None, + } + }) + .boxed() + } + + fn gossip_erasure_chunk( + &self, + relay_parent: Hash, + candidate_hash: Hash, + erasure_root: Hash, + chunk: ErasureChunk + ) { + let topic = av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index); + self.0.gossip_message( + topic, + GossipMessage::ErasureChunk(ErasureChunkMessage { + chunk, + relay_parent, + candidate_hash, + }) + ) + } +} + +/// A stream of gossip messages and an optional sender for a topic. +pub struct GossipMessageStream { + topic_stream: Pin + Send>>, +} + +impl GossipMessageStream { + /// Create a new instance with the given topic stream. + pub fn new(topic_stream: Pin + Send>>) -> Self { + Self { + topic_stream, + } + } +} + +impl Stream for GossipMessageStream { + type Item = (GossipMessage, Option); + + fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll> { + let this = Pin::into_inner(self); + + loop { + let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) { + Poll::Ready(Some(msg)) => msg, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + }; + + debug!(target: "validation", "Processing statement for live validation leaf-work"); + if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) { + return Poll::Ready(Some((gmsg, msg.sender))) + } + } + } +} + +/// Status of a Polkadot node. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] +pub struct Status { + collating_for: Option<(CollatorId, ParaId)>, +} + +struct PoVBlockRequest { + attempted_peers: HashSet, + validation_leaf: Hash, + candidate_hash: Hash, + block_data_hash: Hash, + sender: oneshot::Sender, + canon_roots: StructuredUnroutedIngress, +} + +impl PoVBlockRequest { + // Attempt to process a response. If the provided block is invalid, + // this returns an error result containing the unmodified request. + // + // If `Ok(())` is returned, that indicates that the request has been processed. + fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> { + if pov_block.block_data.hash() != self.block_data_hash { + return Err(self); + } + + match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) { + Ok(()) => { + let _ = self.sender.send(pov_block); + Ok(()) + } + Err(_) => Err(self) + } + } +} + +// ensures collator-protocol messages are sent in correct order. +// session key must be sent before collator role. +enum CollatorState { + Fresh, + RolePending(Role), + Primed(Option), +} + +impl CollatorState { + fn send_key(&mut self, key: ValidatorId, mut f: F) { + f(Message::ValidatorId(key)); + if let CollatorState::RolePending(role) = *self { + f(Message::CollatorRole(role)); + *self = CollatorState::Primed(Some(role)); + } + } + + fn set_role(&mut self, role: Role, mut f: F) { + if let CollatorState::Primed(ref mut r) = *self { + f(Message::CollatorRole(role)); + *r = Some(role); + } else { + *self = CollatorState::RolePending(role); + } + } + + fn role(&self) -> Option { + match *self { + CollatorState::Fresh => None, + CollatorState::RolePending(role) => Some(role), + CollatorState::Primed(role) => role, + } + } +} + +struct PeerInfo { + collating_for: Option<(CollatorId, ParaId)>, + validator_keys: RecentValidatorIds, + claimed_validator: bool, + collator_state: CollatorState, +} + +impl PeerInfo { + fn should_send_key(&self) -> bool { + self.claimed_validator || self.collating_for.is_some() + } +} + +/// Polkadot-specific messages. +#[derive(Debug, Encode, Decode)] +pub enum Message { + /// As a validator, tell the peer your current session key. + // TODO: do this with a cryptographic proof of some kind + // https://github.com/paritytech/polkadot/issues/47 + ValidatorId(ValidatorId), + /// Requesting parachain proof-of-validation block (relay_parent, candidate_hash). + RequestPovBlock(RequestId, Hash, Hash), + /// Provide requested proof-of-validation block data by candidate hash or nothing if unknown. + PovBlock(RequestId, Option), + /// Tell a collator their role. + CollatorRole(Role), + /// A collation provided by a peer. Relay parent and collation. + Collation(Hash, Collation), +} + +fn send_polkadot_message(ctx: &mut dyn Context, to: PeerId, message: Message) { + trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message); + let encoded = message.encode(); + ctx.send_chain_specific(to, encoded) +} + +/// Polkadot protocol attachment for substrate. +pub struct PolkadotProtocol { + peers: HashMap, + collating_for: Option<(CollatorId, ParaId)>, + collators: CollatorPool, + validators: HashMap, + local_collations: LocalCollations, + live_validation_leaves: LiveValidationLeaves, + in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>, + pending: Vec, + availability_store: Option, + next_req_id: u64, +} + +impl PolkadotProtocol { + /// Instantiate a polkadot protocol handler. + pub fn new(collating_for: Option<(CollatorId, ParaId)>) -> Self { + PolkadotProtocol { + peers: HashMap::new(), + collators: CollatorPool::new(), + collating_for, + validators: HashMap::new(), + local_collations: LocalCollations::new(), + live_validation_leaves: LiveValidationLeaves::new(), + in_flight: HashMap::new(), + pending: Vec::new(), + availability_store: None, + next_req_id: 1, + } + } + + /// Fetch block data by candidate receipt. + fn fetch_pov_block( + &mut self, + ctx: &mut dyn Context, + candidate: &CandidateReceipt, + relay_parent: Hash, + canon_roots: StructuredUnroutedIngress, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + + self.pending.push(PoVBlockRequest { + attempted_peers: Default::default(), + validation_leaf: relay_parent, + candidate_hash: candidate.hash(), + block_data_hash: candidate.block_data_hash, + sender: tx, + canon_roots, + }); + + self.dispatch_pending_requests(ctx); + rx + } + + /// Note new leaf to do validation work at + fn new_validation_leaf_work( + &mut self, + ctx: &mut dyn Context, + params: validation::LeafWorkParams, + ) -> validation::LiveValidationLeaf { + + let (work, new_local) = self.live_validation_leaves + .new_validation_leaf(params); + + if let Some(new_local) = new_local { + for (id, peer_data) in self.peers.iter_mut() + .filter(|&(_, ref info)| info.should_send_key()) + { + peer_data.collator_state.send_key(new_local.clone(), |msg| send_polkadot_message( + ctx, + id.clone(), + msg + )); + } + } + + work + } + + // true indicates that it was removed actually. + fn remove_validation_session(&mut self, parent_hash: Hash) -> bool { + self.live_validation_leaves.remove(parent_hash) + } + + fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context) { + let mut new_pending = Vec::new(); + let validator_keys = &mut self.validators; + let next_req_id = &mut self.next_req_id; + let in_flight = &mut self.in_flight; + + for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { + let parent = pending.validation_leaf; + let c_hash = pending.candidate_hash; + + let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x { + Ok(data @ &_) => { + // answer locally. + let _ = pending.sender.send(data.clone()); + None + } + Err(Some(known_keys)) => { + let next_peer = known_keys.iter() + .filter_map(|x| validator_keys.get(x).map(|id| (x.clone(), id.clone()))) + .find(|&(ref key, _)| pending.attempted_peers.insert(key.clone())) + .map(|(_, id)| id); + + // dispatch to peer + if let Some(who) = next_peer { + let req_id = *next_req_id; + *next_req_id += 1; + + send_polkadot_message( + ctx, + who.clone(), + Message::RequestPovBlock(req_id, parent, c_hash), + ); + + in_flight.insert((req_id, who), pending); + + None + } else { + Some(pending) + } + } + Err(None) => None, // no such known validation leaf-work. prune out. + }); + + if let Some(pending) = still_pending { + new_pending.push(pending); + } + } + + self.pending = new_pending; + } + + fn on_polkadot_message(&mut self, ctx: &mut dyn Context, who: PeerId, msg: Message) { + trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); + match msg { + Message::ValidatorId(key) => self.on_session_key(ctx, who, key), + Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => { + let pov_block = self.live_validation_leaves.with_pov_block( + &relay_parent, + &candidate_hash, + |res| res.ok().map(|b| b.clone()), + ); + + send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block)); + } + Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data), + Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), + Message::CollatorRole(role) => self.on_new_role(ctx, who, role), + } + } + + fn on_session_key(&mut self, ctx: &mut dyn Context, who: PeerId, key: ValidatorId) { + { + let info = match self.peers.get_mut(&who) { + Some(peer) => peer, + None => { + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); + return + } + }; + + if !info.claimed_validator { + ctx.report_peer(who, cost::UNEXPECTED_MESSAGE); + return; + } + ctx.report_peer(who.clone(), benefit::EXPECTED_MESSAGE); + + let local_collations = &mut self.local_collations; + let new_collations = match info.validator_keys.insert(key.clone()) { + InsertedRecentKey::AlreadyKnown => Vec::new(), + InsertedRecentKey::New(Some(old_key)) => { + self.validators.remove(&old_key); + local_collations.fresh_key(&old_key, &key) + } + InsertedRecentKey::New(None) => info.collator_state.role() + .map(|r| local_collations.note_validator_role(key.clone(), r)) + .unwrap_or_else(Vec::new), + }; + + for (relay_parent, collation) in new_collations { + send_polkadot_message( + ctx, + who.clone(), + Message::Collation(relay_parent, collation), + ) + } + + self.validators.insert(key, who); + } + + self.dispatch_pending_requests(ctx); + } + + fn on_pov_block( + &mut self, + ctx: &mut dyn Context, + who: PeerId, + req_id: RequestId, + pov_block: Option, + ) { + match self.in_flight.remove(&(req_id, who.clone())) { + Some(mut req) => { + match pov_block { + Some(pov_block) => { + match req.process_response(pov_block) { + Ok(()) => { + ctx.report_peer(who, benefit::GOOD_POV_BLOCK); + return; + } + Err(r) => { + ctx.report_peer(who, cost::BAD_POV_BLOCK); + req = r; + } + } + }, + None => { + ctx.report_peer(who, benefit::EXPECTED_MESSAGE); + } + } + + self.pending.push(req); + self.dispatch_pending_requests(ctx); + } + None => ctx.report_peer(who, cost::UNEXPECTED_MESSAGE), + } + } + + // when a validator sends us (a collator) a new role. + fn on_new_role(&mut self, ctx: &mut dyn Context, who: PeerId, role: Role) { + let info = match self.peers.get_mut(&who) { + Some(peer) => peer, + None => { + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); + return + } + }; + + debug!(target: "p_net", "New collator role {:?} from {}", role, who); + + if info.validator_keys.as_slice().is_empty() { + ctx.report_peer(who, cost::UNEXPECTED_ROLE) + } else { + // update role for all saved session keys for this validator. + let local_collations = &mut self.local_collations; + for (relay_parent, collation) in info.validator_keys + .as_slice() + .iter() + .cloned() + .flat_map(|k| local_collations.note_validator_role(k, role)) + { + debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent); + send_polkadot_message( + ctx, + who.clone(), + Message::Collation(relay_parent, collation), + ) + } + } + } + + /// Convert the given `CollatorId` to a `PeerId`. + pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> { + self.collators.collator_id_to_peer_id(collator_id) + } +} + +impl Specialization for PolkadotProtocol { + fn status(&self) -> Vec { + Status { collating_for: self.collating_for.clone() }.encode() + } + + fn on_connect(&mut self, ctx: &mut dyn Context, who: PeerId, status: FullStatus) { + let local_status = Status::decode(&mut &status.chain_status[..]) + .unwrap_or_else(|_| Status { collating_for: None }); + + let validator = status.roles.contains(sc_network::config::Roles::AUTHORITY); + + let mut peer_info = PeerInfo { + collating_for: local_status.collating_for.clone(), + validator_keys: Default::default(), + claimed_validator: validator, + collator_state: CollatorState::Fresh, + }; + + if let Some((ref acc_id, ref para_id)) = local_status.collating_for { + if self.collator_peer(acc_id.clone()).is_some() { + ctx.report_peer(who, cost::COLLATOR_ALREADY_KNOWN); + return + } + ctx.report_peer(who.clone(), benefit::NEW_COLLATOR); + + let collator_role = self.collators.on_new_collator( + acc_id.clone(), + para_id.clone(), + who.clone(), + ); + + peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message( + ctx, + who.clone(), + msg, + )); + } + + // send session keys. + if peer_info.should_send_key() { + for local_session_key in self.live_validation_leaves.recent_keys() { + peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message( + ctx, + who.clone(), + msg, + )); + } + } + + self.peers.insert(who, peer_info); + self.dispatch_pending_requests(ctx); + } + + fn on_disconnect(&mut self, ctx: &mut dyn Context, who: PeerId) { + if let Some(info) = self.peers.remove(&who) { + if let Some((acc_id, _)) = info.collating_for { + let new_primary = self.collators.on_disconnect(acc_id) + .and_then(|new_primary| self.collator_peer(new_primary)); + + if let Some((new_primary, primary_info)) = new_primary { + primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message( + ctx, + new_primary.clone(), + msg, + )); + } + } + + for key in info.validator_keys.as_slice().iter() { + self.validators.remove(key); + self.local_collations.on_disconnect(key); + } + + { + let pending = &mut self.pending; + self.in_flight.retain(|&(_, ref peer), val| { + let retain = peer != &who; + if !retain { + // swap with a dummy value which will be dropped immediately. + let (sender, _) = oneshot::channel(); + pending.push(::std::mem::replace(val, PoVBlockRequest { + attempted_peers: Default::default(), + validation_leaf: Default::default(), + candidate_hash: Default::default(), + block_data_hash: Default::default(), + canon_roots: StructuredUnroutedIngress(Vec::new()), + sender, + })); + } + + retain + }); + } + self.dispatch_pending_requests(ctx); + } + } + + fn on_message( + &mut self, + ctx: &mut dyn Context, + who: PeerId, + message: Vec, + ) { + match Message::decode(&mut &message[..]) { + Ok(msg) => { + ctx.report_peer(who.clone(), benefit::VALID_FORMAT); + self.on_polkadot_message(ctx, who, msg) + }, + Err(_) => { + trace!(target: "p_net", "Bad message from {}", who); + ctx.report_peer(who, cost::INVALID_FORMAT); + } + } + } + + fn maintain_peers(&mut self, ctx: &mut dyn Context) { + self.collators.collect_garbage(None); + self.local_collations.collect_garbage(None); + self.dispatch_pending_requests(ctx); + + for collator_action in self.collators.maintain_peers() { + match collator_action { + Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator), + Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) { + info.collator_state.set_role(role, |msg| send_polkadot_message( + ctx, + collator.clone(), + msg, + )) + }, + } + } + } + + fn on_block_imported(&mut self, _ctx: &mut dyn Context, hash: Hash, header: &Header) { + self.collators.collect_garbage(Some(&hash)); + self.local_collations.collect_garbage(Some(&header.parent_hash)); + } +} + +impl PolkadotProtocol { + // we received a collation from a peer + fn on_collation( + &mut self, + ctx: &mut dyn Context, + from: PeerId, + relay_parent: Hash, + collation: Collation + ) { + let collation_para = collation.info.parachain_index; + let collated_acc = collation.info.collator.clone(); + + match self.peers.get(&from) { + None => ctx.report_peer(from, cost::UNKNOWN_PEER), + Some(peer_info) => { + ctx.report_peer(from.clone(), benefit::KNOWN_PEER); + match peer_info.collating_for { + None => ctx.report_peer(from, cost::UNEXPECTED_MESSAGE), + Some((ref acc_id, ref para_id)) => { + ctx.report_peer(from.clone(), benefit::EXPECTED_MESSAGE); + let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; + if structurally_valid && collation.info.check_signature().is_ok() { + debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from); + ctx.report_peer(from, benefit::GOOD_COLLATION); + self.collators.on_collation(acc_id.clone(), relay_parent, collation) + } else { + ctx.report_peer(from, cost::INVALID_FORMAT) + }; + } + } + }, + } + } + + fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); + self.collators.await_collation(relay_parent, para_id, tx); + rx + } + + // get connected peer with given account ID for collation. + fn collator_peer(&mut self, collator_id: CollatorId) -> Option<(PeerId, &mut PeerInfo)> { + let check_info = |info: &PeerInfo| info + .collating_for + .as_ref() + .map_or(false, |&(ref acc_id, _)| acc_id == &collator_id); + + self.peers + .iter_mut() + .filter(|&(_, ref info)| check_info(&**info)) + .map(|(who, info)| (who.clone(), info)) + .next() + } + + // disconnect a collator by account-id. + fn disconnect_bad_collator(&mut self, ctx: &mut dyn Context, collator_id: CollatorId) { + if let Some((who, _)) = self.collator_peer(collator_id) { + ctx.report_peer(who, cost::BAD_COLLATION) + } + } +} + +impl PolkadotProtocol { + /// Add a local collation and broadcast it to the necessary peers. + /// + /// This should be called by a collator intending to get the locally-collated + /// block into the hands of validators. + /// It also places the outgoing message and block data in the local availability store. + pub fn add_local_collation( + &mut self, + ctx: &mut dyn Context, + relay_parent: Hash, + targets: HashSet, + collation: Collation, + outgoing_targeted: OutgoingMessages, + ) -> impl Future { + debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", + relay_parent, collation.info.parachain_index); + + for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { + match self.validators.get(&primary) { + Some(who) => { + debug!(target: "p_net", "Sending local collation to {:?}", primary); + send_polkadot_message( + ctx, + who.clone(), + Message::Collation(relay_parent, cloned_collation), + ) + }, + None => + warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), + } + } + + let availability_store = self.availability_store.clone(); + let collation_cloned = collation.clone(); + + async move { + if let Some(availability_store) = availability_store { + let _ = availability_store.make_available(av_store::Data { + relay_parent, + parachain_id: collation_cloned.info.parachain_index, + block_data: collation_cloned.pov.block_data.clone(), + outgoing_queues: Some(outgoing_targeted.clone().into()), + }).await; + } + } + } + + /// Give the network protocol a handle to an availability store, used for + /// circulation of parachain data required for validation. + pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) { + self.availability_store = Some(availability_store); + } +} diff --git a/network/src/router.rs b/network/src/legacy/router.rs similarity index 87% rename from network/src/router.rs rename to network/src/legacy/router.rs index d0fdec95b5b3..6171ca672995 100644 --- a/network/src/router.rs +++ b/network/src/legacy/router.rs @@ -31,7 +31,6 @@ use polkadot_primitives::{Block, Hash}; use polkadot_primitives::parachain::{ OutgoingMessages, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock, ErasureChunk, }; -use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage}; use sp_api::ProvideRuntimeApi; use futures::prelude::*; @@ -44,8 +43,9 @@ use std::io; use std::sync::Arc; use std::pin::Pin; -use crate::validation::{LeafWorkDataFetcher, Executor}; -use crate::NetworkService; +use crate::legacy::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage}; +use crate::legacy::validation::{LeafWorkDataFetcher, Executor}; +use crate::legacy::{NetworkService, PolkadotProtocol}; /// Compute the gossip topic for attestations on the given parent hash. pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { @@ -78,14 +78,16 @@ pub struct Router { attestation_topic: Hash, fetcher: LeafWorkDataFetcher, deferred_statements: Arc>, - message_validator: RegisteredMessageValidator, + message_validator: RegisteredMessageValidator, + drop_signal: Arc, } impl Router { pub(crate) fn new( table: Arc, fetcher: LeafWorkDataFetcher, - message_validator: RegisteredMessageValidator, + message_validator: RegisteredMessageValidator, + drop_signal: exit_future::Signal, ) -> Self { let parent_hash = fetcher.parent_hash(); Router { @@ -94,6 +96,7 @@ impl Router { attestation_topic: attestation_topic(parent_hash), deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())), message_validator, + drop_signal: Arc::new(drop_signal), } } @@ -111,7 +114,7 @@ impl Router { self.fetcher.parent_hash() } - fn network(&self) -> &RegisteredMessageValidator { + fn network(&self) -> &RegisteredMessageValidator { self.fetcher.network() } } @@ -124,6 +127,7 @@ impl Clone for Router { attestation_topic: self.attestation_topic, deferred_statements: self.deferred_statements.clone(), message_validator: self.message_validator.clone(), + drop_signal: self.drop_signal.clone(), } } } @@ -155,7 +159,7 @@ impl + Send + Sync + 'static, T> Router where // import all statements pending on this candidate let (mut statements, _traces) = if let GenericStatement::Candidate(_) = statement.statement { - self.deferred_statements.lock().get_deferred(&c_hash) + self.deferred_statements.lock().take_deferred(&c_hash) } else { (Vec::new(), Vec::new()) }; @@ -229,6 +233,7 @@ impl + Send, T> TableRouter for Router where T: Clone + Executor + Send + 'static, { type Error = io::Error; + type SendLocalCollation = future::Ready>; type FetchValidationProof = Pin> + Send>>; // We have fetched from a collator and here the receipt should have been already formed. @@ -238,7 +243,7 @@ impl + Send, T> TableRouter for Router where receipt: CandidateReceipt, outgoing: OutgoingMessages, chunks: (ValidatorIndex, &[ErasureChunk]) - ) { + ) -> Self::SendLocalCollation { // produce a signed statement let hash = receipt.hash(); let erasure_root = receipt.erasure_root; @@ -251,7 +256,7 @@ impl + Send, T> TableRouter for Router where let statement = GossipStatement::new( self.parent_hash(), match self.table.import_validated(validated) { - None => return, + None => return future::ready(Ok(())), Some(s) => s, }, ); @@ -273,6 +278,8 @@ impl + Send, T> TableRouter for Router where message.into() ); } + + future::ready(Ok(())) } fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof { @@ -289,26 +296,29 @@ impl Drop for Router { // A unique trace for valid statements issued by a validator. #[derive(Hash, PartialEq, Eq, Clone, Debug)] -enum StatementTrace { +pub(crate) enum StatementTrace { Valid(ValidatorIndex, Hash), Invalid(ValidatorIndex, Hash), } -// helper for deferring statements whose associated candidate is unknown. -struct DeferredStatements { +/// Helper for deferring statements whose associated candidate is unknown. +pub(crate) struct DeferredStatements { deferred: HashMap>, known_traces: HashSet, } impl DeferredStatements { - fn new() -> Self { + /// Create a new `DeferredStatements`. + pub(crate) fn new() -> Self { DeferredStatements { deferred: HashMap::new(), known_traces: HashSet::new(), } } - fn push(&mut self, statement: SignedStatement) { + /// Push a new statement onto the deferred pile. `Candidate` statements + /// cannot be deferred and are ignored. + pub(crate) fn push(&mut self, statement: SignedStatement) { let (hash, trace) = match statement.statement { GenericStatement::Candidate(_) => return, GenericStatement::Valid(hash) => (hash, StatementTrace::Valid(statement.sender.clone(), hash)), @@ -320,7 +330,8 @@ impl DeferredStatements { } } - fn get_deferred(&mut self, hash: &Hash) -> (Vec, Vec) { + /// Take all deferred statements referencing the given candidate hash out. + pub(crate) fn take_deferred(&mut self, hash: &Hash) -> (Vec, Vec) { match self.deferred.remove(hash) { None => (Vec::new(), Vec::new()), Some(deferred) => { @@ -361,7 +372,7 @@ mod tests { // pre-push. { - let (signed, traces) = deferred.get_deferred(&hash); + let (signed, traces) = deferred.take_deferred(&hash); assert!(signed.is_empty()); assert!(traces.is_empty()); } @@ -371,7 +382,7 @@ mod tests { // draining: second push should have been ignored. { - let (signed, traces) = deferred.get_deferred(&hash); + let (signed, traces) = deferred.take_deferred(&hash); assert_eq!(signed.len(), 1); assert_eq!(traces.len(), 1); @@ -381,7 +392,7 @@ mod tests { // after draining { - let (signed, traces) = deferred.get_deferred(&hash); + let (signed, traces) = deferred.take_deferred(&hash); assert!(signed.is_empty()); assert!(traces.is_empty()); } diff --git a/network/src/tests/mod.rs b/network/src/legacy/tests/mod.rs similarity index 97% rename from network/src/tests/mod.rs rename to network/src/legacy/tests/mod.rs index a1720344e0f2..7c10da723154 100644 --- a/network/src/tests/mod.rs +++ b/network/src/legacy/tests/mod.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use super::{PolkadotProtocol, Status, Message, FullStatus}; -use crate::validation::LeafWorkParams; +use crate::legacy::validation::LeafWorkParams; use polkadot_validation::GenericStatement; use polkadot_primitives::{Block, Hash}; @@ -74,12 +74,12 @@ impl TestContext { #[derive(Default)] pub struct TestChainContext { - pub known_map: HashMap, + pub known_map: HashMap, pub ingress_roots: HashMap>, } -impl crate::gossip::ChainContext for TestChainContext { - fn is_known(&self, block_hash: &Hash) -> Option { +impl crate::legacy::gossip::ChainContext for TestChainContext { + fn is_known(&self, block_hash: &Hash) -> Option { self.known_map.get(block_hash).map(|x| x.clone()) } diff --git a/network/src/tests/validation.rs b/network/src/legacy/tests/validation.rs similarity index 96% rename from network/src/tests/validation.rs rename to network/src/legacy/tests/validation.rs index e5dab770c9d2..0cfc86f22e86 100644 --- a/network/src/tests/validation.rs +++ b/network/src/legacy/tests/validation.rs @@ -18,12 +18,12 @@ #![allow(unused)] -use crate::gossip::GossipMessage; +use crate::legacy::gossip::GossipMessage; use sc_network::{Context as NetContext, PeerId}; use sc_network_gossip::TopicNotification; use sp_core::{NativeOrEncoded, ExecutionContext}; use sp_keyring::Sr25519Keyring; -use crate::{PolkadotProtocol, NetworkService, GossipMessageStream}; +use crate::legacy::{PolkadotProtocol, NetworkService, GossipService, GossipMessageStream}; use polkadot_validation::{SharedTable, Network}; use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId}; @@ -117,6 +117,18 @@ struct TestNetwork { } impl NetworkService for TestNetwork { + fn with_spec(&self, with: F) + where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext) + { + let mut context = TestContext::default(); + let res = with(&mut *self.proto.lock(), &mut context); + // TODO: send context to worker for message routing. + // https://github.com/paritytech/polkadot/issues/215 + res + } +} + +impl GossipService for TestNetwork { fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { let (tx, rx) = mpsc::unbounded(); let _ = self.gossip.send_listener.unbounded_send((topic, tx)); @@ -131,16 +143,6 @@ impl NetworkService for TestNetwork { let notification = TopicNotification { message: message.encode(), sender: None }; let _ = self.gossip.send_message.unbounded_send((topic, notification)); } - - fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext) - { - let mut context = TestContext::default(); - let res = with(&mut *self.proto.lock(), &mut context); - // TODO: send context to worker for message routing. - // https://github.com/paritytech/polkadot/issues/215 - res - } } #[derive(Default)] @@ -319,7 +321,7 @@ impl ParachainHost for RuntimeApi { } } -type TestValidationNetwork = crate::validation::ValidationNetwork; +type TestValidationNetwork = crate::legacy::validation::ValidationNetwork; struct Built { gossip: Pin>>, @@ -327,7 +329,8 @@ struct Built { networks: Vec>, } -fn build_network(n: usize, spawner: SP) -> Built { +fn build_network(n: usize, spawner: SP)-> Built { + use crate::legacy::gossip::RegisteredMessageValidator; let (gossip_router, gossip_handle) = make_gossip(); let api_handle = Arc::new(Mutex::new(Default::default())); let runtime_api = Arc::new(TestApi { data: api_handle.clone() }); @@ -338,7 +341,7 @@ fn build_network(n: usize, spawner: SP) -> Built { gossip: gossip_handle.clone(), }); - let message_val = crate::gossip::RegisteredMessageValidator::new_test( + let message_val = RegisteredMessageValidator::::new_test( TestChainContext::default(), Box::new(|_, _| {}), ); diff --git a/network/src/validation.rs b/network/src/legacy/validation.rs similarity index 96% rename from network/src/validation.rs rename to network/src/legacy/validation.rs index 9dcd8ead92e7..3f38282f6b02 100644 --- a/network/src/validation.rs +++ b/network/src/legacy/validation.rs @@ -44,10 +44,10 @@ use std::pin::Pin; use arrayvec::ArrayVec; use parking_lot::Mutex; -use crate::router::Router; -use crate::gossip::{RegisteredMessageValidator, MessageValidationData}; +use crate::legacy::router::Router; +use crate::legacy::gossip::{RegisteredMessageValidator, MessageValidationData}; -use super::NetworkService; +use super::{NetworkService, PolkadotProtocol}; pub use polkadot_validation::Incoming; @@ -65,13 +65,13 @@ pub struct LeafWorkParams { pub struct ValidationNetwork { api: Arc

, executor: T, - network: RegisteredMessageValidator, + network: RegisteredMessageValidator, } impl ValidationNetwork { /// Create a new consensus networking object. pub fn new( - network: RegisteredMessageValidator, + network: RegisteredMessageValidator, api: Arc

, executor: T, ) -> Self { @@ -165,7 +165,7 @@ impl ValidationNetwork { /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream { - crate::router::checked_statements(&self.network, crate::router::attestation_topic(relay_parent)) + crate::legacy::router::checked_statements(&self.network, crate::legacy::router::attestation_topic(relay_parent)) } } @@ -179,12 +179,12 @@ impl ParachainNetwork for ValidationNetwork where type TableRouter = Router; type BuildTableRouter = Box> + Send + Unpin>; - fn communication_for( + fn build_table_router( &self, table: Arc, authorities: &[ValidatorId], - exit: exit_future::Exit, ) -> Self::BuildTableRouter { + let (signal, exit) = exit_future::signal(); let parent_hash = *table.consensus_parent_hash(); let local_session_key = table.session_key(); @@ -203,6 +203,7 @@ impl ParachainNetwork for ValidationNetwork where table, fetcher, network, + signal, ); let table_router_clone = table_router.clone(); @@ -390,7 +391,7 @@ impl RecentValidatorIds { InsertedRecentKey::New(old) } - /// As a slice. + /// As a slice. Most recent is last. pub(crate) fn as_slice(&self) -> &[ValidatorId] { &*self.inner } @@ -512,7 +513,7 @@ impl LiveValidationLeaves { /// Can fetch data for a given validation leaf-work instance. pub struct LeafWorkDataFetcher { - network: RegisteredMessageValidator, + network: RegisteredMessageValidator, api: Arc

, task_executor: T, knowledge: Arc>, @@ -531,7 +532,7 @@ impl LeafWorkDataFetcher { } /// Get the network service. - pub(crate) fn network(&self) -> &RegisteredMessageValidator { + pub(crate) fn network(&self) -> &RegisteredMessageValidator { &self.network } diff --git a/network/src/lib.rs b/network/src/lib.rs index 9c906c07a69e..e2d8873248fe 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// Copyright 2020 Parity Technologies (UK) Ltd. // This file is part of Polkadot. // Polkadot is free software: you can redistribute it and/or modify @@ -14,43 +14,23 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -//! Polkadot-specific network implementation. +//! High-level network protocols for Polkadot. //! //! This manages routing for parachain statements, parachain block and outgoing message //! data fetching, communication between collators and validators, and more. -mod collator_pool; -mod local_collations; -mod router; -pub mod validation; -pub mod gossip; +use polkadot_primitives::{Block, Hash}; -use codec::{Decode, Encode}; -use futures::channel::oneshot; -use futures::prelude::*; -use polkadot_primitives::{Block, Hash, Header}; -use polkadot_primitives::parachain::{ - Id as ParaId, CollatorId, CandidateReceipt, Collation, PoVBlock, - StructuredUnroutedIngress, ValidatorId, OutgoingMessages, ErasureChunk, -}; -use sc_network::{ - PeerId, RequestId, Context, StatusMessage as GenericFullStatus, - specialization::NetworkSpecialization as Specialization, -}; -use sc_network_gossip::TopicNotification; -use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey}; -use self::collator_pool::{CollatorPool, Role, Action}; -use self::local_collations::LocalCollations; -use log::{trace, debug, warn}; +pub mod legacy; +pub mod protocol; -use std::collections::{HashMap, HashSet}; -use std::pin::Pin; -use std::task::{Context as PollContext, Poll}; - -use crate::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator}; +sc_network::construct_simple_protocol! { + /// Stub until https://github.com/paritytech/substrate/pull/4665 is merged + pub struct PolkadotProtocol where Block = Block { } +} -#[cfg(test)] -mod tests; +/// Specialization of the network service for the polkadot protocol. +pub type PolkadotNetworkService = sc_network::NetworkService; mod cost { use sc_network::ReputationChange as Rep; @@ -59,7 +39,7 @@ mod cost { pub(super) const INVALID_FORMAT: Rep = Rep::new(-200, "Polkadot: Bad message"); pub(super) const UNKNOWN_PEER: Rep = Rep::new(-50, "Polkadot: Unknown peer"); - pub(super) const COLLATOR_ALREADY_KNOWN: Rep = Rep::new( -100, "Polkadot: Known collator"); + pub(super) const COLLATOR_ALREADY_KNOWN: Rep = Rep::new(-100, "Polkadot: Known collator"); pub(super) const BAD_COLLATION: Rep = Rep::new(-1000, "Polkadot: Bad collation"); pub(super) const BAD_POV_BLOCK: Rep = Rep::new(-1000, "Polkadot: Bad POV block"); } @@ -75,735 +55,3 @@ mod benefit { pub(super) const GOOD_POV_BLOCK: Rep = Rep::new(100, "Polkadot: Good POV block"); } -type FullStatus = GenericFullStatus; - -/// Specialization of the network service for the polkadot protocol. -pub type PolkadotNetworkService = sc_network::NetworkService; - -/// Basic functionality that a network has to fulfill. -pub trait NetworkService: Send + Sync + 'static { - /// Get a stream of gossip messages for a given hash. - fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream; - - /// Gossip a message on given topic. - fn gossip_message(&self, topic: Hash, message: GossipMessage); - - /// Send a message to a specific peer we're connected to. - fn send_message(&self, who: PeerId, message: GossipMessage); - - /// Execute a closure with the polkadot protocol. - fn with_spec(&self, with: F) - where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context); -} - -/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait. -/// -/// For any wrapped [`NetworkService`] type it implements a [`ProvideGossipMessages`]. -/// For more details see documentation of [`ProvideGossipMessages`]. -/// -/// [`NetworkService`]: ./trait.NetworkService.html -/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html -#[derive(Clone)] -pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator); - -impl av_store::ProvideGossipMessages for AvailabilityNetworkShim { - fn gossip_messages_for(&self, topic: Hash) - -> Pin + Send>> - { - self.0.gossip_messages_for(topic) - .filter_map(|(msg, _)| async move { - match msg { - GossipMessage::ErasureChunk(chunk) => { - Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk)) - }, - _ => None, - } - }) - .boxed() - } - - fn gossip_erasure_chunk( - &self, - relay_parent: Hash, - candidate_hash: Hash, - erasure_root: Hash, - chunk: ErasureChunk - ) { - let topic = av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index); - self.0.gossip_message( - topic, - GossipMessage::ErasureChunk(ErasureChunkMessage { - chunk, - relay_parent, - candidate_hash, - }) - ) - } -} - -/// A stream of gossip messages and an optional sender for a topic. -pub struct GossipMessageStream { - topic_stream: Pin + Send>>, -} - -impl GossipMessageStream { - /// Create a new instance with the given topic stream. - pub fn new(topic_stream: Pin + Send>>) -> Self { - Self { - topic_stream, - } - } -} - -impl Stream for GossipMessageStream { - type Item = (GossipMessage, Option); - - fn poll_next(self: Pin<&mut Self>, cx: &mut PollContext) -> Poll> { - let this = Pin::into_inner(self); - - loop { - let msg = match Pin::new(&mut this.topic_stream).poll_next(cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => return Poll::Pending, - }; - - debug!(target: "validation", "Processing statement for live validation leaf-work"); - if let Ok(gmsg) = GossipMessage::decode(&mut &msg.message[..]) { - return Poll::Ready(Some((gmsg, msg.sender))) - } - } - } -} - -/// Status of a Polkadot node. -#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] -pub struct Status { - collating_for: Option<(CollatorId, ParaId)>, -} - -struct PoVBlockRequest { - attempted_peers: HashSet, - validation_leaf: Hash, - candidate_hash: Hash, - block_data_hash: Hash, - sender: oneshot::Sender, - canon_roots: StructuredUnroutedIngress, -} - -impl PoVBlockRequest { - // Attempt to process a response. If the provided block is invalid, - // this returns an error result containing the unmodified request. - // - // If `Ok(())` is returned, that indicates that the request has been processed. - fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> { - if pov_block.block_data.hash() != self.block_data_hash { - return Err(self); - } - - match polkadot_validation::validate_incoming(&self.canon_roots, &pov_block.ingress) { - Ok(()) => { - let _ = self.sender.send(pov_block); - Ok(()) - } - Err(_) => Err(self) - } - } -} - -// ensures collator-protocol messages are sent in correct order. -// session key must be sent before collator role. -enum CollatorState { - Fresh, - RolePending(Role), - Primed(Option), -} - -impl CollatorState { - fn send_key(&mut self, key: ValidatorId, mut f: F) { - f(Message::ValidatorId(key)); - if let CollatorState::RolePending(role) = *self { - f(Message::CollatorRole(role)); - *self = CollatorState::Primed(Some(role)); - } - } - - fn set_role(&mut self, role: Role, mut f: F) { - if let CollatorState::Primed(ref mut r) = *self { - f(Message::CollatorRole(role)); - *r = Some(role); - } else { - *self = CollatorState::RolePending(role); - } - } - - fn role(&self) -> Option { - match *self { - CollatorState::Fresh => None, - CollatorState::RolePending(role) => Some(role), - CollatorState::Primed(role) => role, - } - } -} - -struct PeerInfo { - collating_for: Option<(CollatorId, ParaId)>, - validator_keys: RecentValidatorIds, - claimed_validator: bool, - collator_state: CollatorState, -} - -impl PeerInfo { - fn should_send_key(&self) -> bool { - self.claimed_validator || self.collating_for.is_some() - } -} - -/// Polkadot-specific messages. -#[derive(Debug, Encode, Decode)] -pub enum Message { - /// As a validator, tell the peer your current session key. - // TODO: do this with a cryptographic proof of some kind - // https://github.com/paritytech/polkadot/issues/47 - ValidatorId(ValidatorId), - /// Requesting parachain proof-of-validation block (relay_parent, candidate_hash). - RequestPovBlock(RequestId, Hash, Hash), - /// Provide requested proof-of-validation block data by candidate hash or nothing if unknown. - PovBlock(RequestId, Option), - /// Tell a collator their role. - CollatorRole(Role), - /// A collation provided by a peer. Relay parent and collation. - Collation(Hash, Collation), -} - -fn send_polkadot_message(ctx: &mut dyn Context, to: PeerId, message: Message) { - trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message); - let encoded = message.encode(); - ctx.send_chain_specific(to, encoded) -} - -/// Polkadot protocol attachment for substrate. -pub struct PolkadotProtocol { - peers: HashMap, - collating_for: Option<(CollatorId, ParaId)>, - collators: CollatorPool, - validators: HashMap, - local_collations: LocalCollations, - live_validation_leaves: LiveValidationLeaves, - in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>, - pending: Vec, - availability_store: Option, - next_req_id: u64, -} - -impl PolkadotProtocol { - /// Instantiate a polkadot protocol handler. - pub fn new(collating_for: Option<(CollatorId, ParaId)>) -> Self { - PolkadotProtocol { - peers: HashMap::new(), - collators: CollatorPool::new(), - collating_for, - validators: HashMap::new(), - local_collations: LocalCollations::new(), - live_validation_leaves: LiveValidationLeaves::new(), - in_flight: HashMap::new(), - pending: Vec::new(), - availability_store: None, - next_req_id: 1, - } - } - - /// Fetch block data by candidate receipt. - fn fetch_pov_block( - &mut self, - ctx: &mut dyn Context, - candidate: &CandidateReceipt, - relay_parent: Hash, - canon_roots: StructuredUnroutedIngress, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - - self.pending.push(PoVBlockRequest { - attempted_peers: Default::default(), - validation_leaf: relay_parent, - candidate_hash: candidate.hash(), - block_data_hash: candidate.block_data_hash, - sender: tx, - canon_roots, - }); - - self.dispatch_pending_requests(ctx); - rx - } - - /// Note new leaf to do validation work at - fn new_validation_leaf_work( - &mut self, - ctx: &mut dyn Context, - params: validation::LeafWorkParams, - ) -> validation::LiveValidationLeaf { - - let (work, new_local) = self.live_validation_leaves - .new_validation_leaf(params); - - if let Some(new_local) = new_local { - for (id, peer_data) in self.peers.iter_mut() - .filter(|&(_, ref info)| info.should_send_key()) - { - peer_data.collator_state.send_key(new_local.clone(), |msg| send_polkadot_message( - ctx, - id.clone(), - msg - )); - } - } - - work - } - - // true indicates that it was removed actually. - fn remove_validation_session(&mut self, parent_hash: Hash) -> bool { - self.live_validation_leaves.remove(parent_hash) - } - - fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context) { - let mut new_pending = Vec::new(); - let validator_keys = &mut self.validators; - let next_req_id = &mut self.next_req_id; - let in_flight = &mut self.in_flight; - - for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { - let parent = pending.validation_leaf; - let c_hash = pending.candidate_hash; - - let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x { - Ok(data @ &_) => { - // answer locally. - let _ = pending.sender.send(data.clone()); - None - } - Err(Some(known_keys)) => { - let next_peer = known_keys.iter() - .filter_map(|x| validator_keys.get(x).map(|id| (x.clone(), id.clone()))) - .find(|&(ref key, _)| pending.attempted_peers.insert(key.clone())) - .map(|(_, id)| id); - - // dispatch to peer - if let Some(who) = next_peer { - let req_id = *next_req_id; - *next_req_id += 1; - - send_polkadot_message( - ctx, - who.clone(), - Message::RequestPovBlock(req_id, parent, c_hash), - ); - - in_flight.insert((req_id, who), pending); - - None - } else { - Some(pending) - } - } - Err(None) => None, // no such known validation leaf-work. prune out. - }); - - if let Some(pending) = still_pending { - new_pending.push(pending); - } - } - - self.pending = new_pending; - } - - fn on_polkadot_message(&mut self, ctx: &mut dyn Context, who: PeerId, msg: Message) { - trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); - match msg { - Message::ValidatorId(key) => self.on_session_key(ctx, who, key), - Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => { - let pov_block = self.live_validation_leaves.with_pov_block( - &relay_parent, - &candidate_hash, - |res| res.ok().map(|b| b.clone()), - ); - - send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block)); - } - Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data), - Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), - Message::CollatorRole(role) => self.on_new_role(ctx, who, role), - } - } - - fn on_session_key(&mut self, ctx: &mut dyn Context, who: PeerId, key: ValidatorId) { - { - let info = match self.peers.get_mut(&who) { - Some(peer) => peer, - None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); - return - } - }; - - if !info.claimed_validator { - ctx.report_peer(who, cost::UNEXPECTED_MESSAGE); - return; - } - ctx.report_peer(who.clone(), benefit::EXPECTED_MESSAGE); - - let local_collations = &mut self.local_collations; - let new_collations = match info.validator_keys.insert(key.clone()) { - InsertedRecentKey::AlreadyKnown => Vec::new(), - InsertedRecentKey::New(Some(old_key)) => { - self.validators.remove(&old_key); - local_collations.fresh_key(&old_key, &key) - } - InsertedRecentKey::New(None) => info.collator_state.role() - .map(|r| local_collations.note_validator_role(key.clone(), r)) - .unwrap_or_else(Vec::new), - }; - - for (relay_parent, collation) in new_collations { - send_polkadot_message( - ctx, - who.clone(), - Message::Collation(relay_parent, collation), - ) - } - - self.validators.insert(key, who); - } - - self.dispatch_pending_requests(ctx); - } - - fn on_pov_block( - &mut self, - ctx: &mut dyn Context, - who: PeerId, - req_id: RequestId, - pov_block: Option, - ) { - match self.in_flight.remove(&(req_id, who.clone())) { - Some(mut req) => { - match pov_block { - Some(pov_block) => { - match req.process_response(pov_block) { - Ok(()) => { - ctx.report_peer(who, benefit::GOOD_POV_BLOCK); - return; - } - Err(r) => { - ctx.report_peer(who, cost::BAD_POV_BLOCK); - req = r; - } - } - }, - None => { - ctx.report_peer(who, benefit::EXPECTED_MESSAGE); - } - } - - self.pending.push(req); - self.dispatch_pending_requests(ctx); - } - None => ctx.report_peer(who, cost::UNEXPECTED_MESSAGE), - } - } - - // when a validator sends us (a collator) a new role. - fn on_new_role(&mut self, ctx: &mut dyn Context, who: PeerId, role: Role) { - let info = match self.peers.get_mut(&who) { - Some(peer) => peer, - None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); - return - } - }; - - debug!(target: "p_net", "New collator role {:?} from {}", role, who); - - if info.validator_keys.as_slice().is_empty() { - ctx.report_peer(who, cost::UNEXPECTED_ROLE) - } else { - // update role for all saved session keys for this validator. - let local_collations = &mut self.local_collations; - for (relay_parent, collation) in info.validator_keys - .as_slice() - .iter() - .cloned() - .flat_map(|k| local_collations.note_validator_role(k, role)) - { - debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent); - send_polkadot_message( - ctx, - who.clone(), - Message::Collation(relay_parent, collation), - ) - } - } - } - - /// Convert the given `CollatorId` to a `PeerId`. - pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> { - self.collators.collator_id_to_peer_id(collator_id) - } -} - -impl Specialization for PolkadotProtocol { - fn status(&self) -> Vec { - Status { collating_for: self.collating_for.clone() }.encode() - } - - fn on_connect(&mut self, ctx: &mut dyn Context, who: PeerId, status: FullStatus) { - let local_status = Status::decode(&mut &status.chain_status[..]) - .unwrap_or_else(|_| Status { collating_for: None }); - - let validator = status.roles.contains(sc_network::config::Roles::AUTHORITY); - - let mut peer_info = PeerInfo { - collating_for: local_status.collating_for.clone(), - validator_keys: Default::default(), - claimed_validator: validator, - collator_state: CollatorState::Fresh, - }; - - if let Some((ref acc_id, ref para_id)) = local_status.collating_for { - if self.collator_peer(acc_id.clone()).is_some() { - ctx.report_peer(who, cost::COLLATOR_ALREADY_KNOWN); - return - } - ctx.report_peer(who.clone(), benefit::NEW_COLLATOR); - - let collator_role = self.collators.on_new_collator( - acc_id.clone(), - para_id.clone(), - who.clone(), - ); - - peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message( - ctx, - who.clone(), - msg, - )); - } - - // send session keys. - if peer_info.should_send_key() { - for local_session_key in self.live_validation_leaves.recent_keys() { - peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message( - ctx, - who.clone(), - msg, - )); - } - } - - self.peers.insert(who, peer_info); - self.dispatch_pending_requests(ctx); - } - - fn on_disconnect(&mut self, ctx: &mut dyn Context, who: PeerId) { - if let Some(info) = self.peers.remove(&who) { - if let Some((acc_id, _)) = info.collating_for { - let new_primary = self.collators.on_disconnect(acc_id) - .and_then(|new_primary| self.collator_peer(new_primary)); - - if let Some((new_primary, primary_info)) = new_primary { - primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message( - ctx, - new_primary.clone(), - msg, - )); - } - } - - for key in info.validator_keys.as_slice().iter() { - self.validators.remove(key); - self.local_collations.on_disconnect(key); - } - - { - let pending = &mut self.pending; - self.in_flight.retain(|&(_, ref peer), val| { - let retain = peer != &who; - if !retain { - // swap with a dummy value which will be dropped immediately. - let (sender, _) = oneshot::channel(); - pending.push(::std::mem::replace(val, PoVBlockRequest { - attempted_peers: Default::default(), - validation_leaf: Default::default(), - candidate_hash: Default::default(), - block_data_hash: Default::default(), - canon_roots: StructuredUnroutedIngress(Vec::new()), - sender, - })); - } - - retain - }); - } - self.dispatch_pending_requests(ctx); - } - } - - fn on_message( - &mut self, - ctx: &mut dyn Context, - who: PeerId, - message: Vec, - ) { - match Message::decode(&mut &message[..]) { - Ok(msg) => { - ctx.report_peer(who.clone(), benefit::VALID_FORMAT); - self.on_polkadot_message(ctx, who, msg) - }, - Err(_) => { - trace!(target: "p_net", "Bad message from {}", who); - ctx.report_peer(who, cost::INVALID_FORMAT); - } - } - } - - fn maintain_peers(&mut self, ctx: &mut dyn Context) { - self.collators.collect_garbage(None); - self.local_collations.collect_garbage(None); - self.dispatch_pending_requests(ctx); - - for collator_action in self.collators.maintain_peers() { - match collator_action { - Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator), - Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) { - info.collator_state.set_role(role, |msg| send_polkadot_message( - ctx, - collator.clone(), - msg, - )) - }, - } - } - } - - fn on_block_imported(&mut self, _ctx: &mut dyn Context, hash: Hash, header: &Header) { - self.collators.collect_garbage(Some(&hash)); - self.local_collations.collect_garbage(Some(&header.parent_hash)); - } -} - -impl PolkadotProtocol { - // we received a collation from a peer - fn on_collation( - &mut self, - ctx: &mut dyn Context, - from: PeerId, - relay_parent: Hash, - collation: Collation - ) { - let collation_para = collation.info.parachain_index; - let collated_acc = collation.info.collator.clone(); - - match self.peers.get(&from) { - None => ctx.report_peer(from, cost::UNKNOWN_PEER), - Some(peer_info) => { - ctx.report_peer(from.clone(), benefit::KNOWN_PEER); - match peer_info.collating_for { - None => ctx.report_peer(from, cost::UNEXPECTED_MESSAGE), - Some((ref acc_id, ref para_id)) => { - ctx.report_peer(from.clone(), benefit::EXPECTED_MESSAGE); - let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; - if structurally_valid && collation.info.check_signature().is_ok() { - debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from); - ctx.report_peer(from, benefit::GOOD_COLLATION); - self.collators.on_collation(acc_id.clone(), relay_parent, collation) - } else { - ctx.report_peer(from, cost::INVALID_FORMAT) - }; - } - } - }, - } - } - - fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); - self.collators.await_collation(relay_parent, para_id, tx); - rx - } - - // get connected peer with given account ID for collation. - fn collator_peer(&mut self, collator_id: CollatorId) -> Option<(PeerId, &mut PeerInfo)> { - let check_info = |info: &PeerInfo| info - .collating_for - .as_ref() - .map_or(false, |&(ref acc_id, _)| acc_id == &collator_id); - - self.peers - .iter_mut() - .filter(|&(_, ref info)| check_info(&**info)) - .map(|(who, info)| (who.clone(), info)) - .next() - } - - // disconnect a collator by account-id. - fn disconnect_bad_collator(&mut self, ctx: &mut dyn Context, collator_id: CollatorId) { - if let Some((who, _)) = self.collator_peer(collator_id) { - ctx.report_peer(who, cost::BAD_COLLATION) - } - } -} - -impl PolkadotProtocol { - /// Add a local collation and broadcast it to the necessary peers. - /// - /// This should be called by a collator intending to get the locally-collated - /// block into the hands of validators. - /// It also places the outgoing message and block data in the local availability store. - pub fn add_local_collation( - &mut self, - ctx: &mut dyn Context, - relay_parent: Hash, - targets: HashSet, - collation: Collation, - outgoing_targeted: OutgoingMessages, - ) -> impl Future { - debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", - relay_parent, collation.info.parachain_index); - - for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { - match self.validators.get(&primary) { - Some(who) => { - debug!(target: "p_net", "Sending local collation to {:?}", primary); - send_polkadot_message( - ctx, - who.clone(), - Message::Collation(relay_parent, cloned_collation), - ) - }, - None => - warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), - } - } - - let availability_store = self.availability_store.clone(); - let collation_cloned = collation.clone(); - - async move { - if let Some(availability_store) = availability_store { - let _ = availability_store.make_available(av_store::Data { - relay_parent, - parachain_id: collation_cloned.info.parachain_index, - block_data: collation_cloned.pov.block_data.clone(), - outgoing_queues: Some(outgoing_targeted.clone().into()), - }).await; - } - } - } - - /// Give the network protocol a handle to an availability store, used for - /// circulation of parachain data required for validation. - pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) { - self.availability_store = Some(availability_store); - } -} diff --git a/network/src/protocol.rs b/network/src/protocol.rs new file mode 100644 index 000000000000..79114d2c78ef --- /dev/null +++ b/network/src/protocol.rs @@ -0,0 +1,967 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Polkadot-specific base networking protocol. +//! +//! This is implemented using the sc-network APIs for futures-based +//! notifications protocols. In some cases, we emulate request/response on top +//! of the notifications machinery, which is slightly less efficient but not +//! meaningfully so. +//! +//! We handle events from `sc-network` in a thin wrapper that forwards to a +//! background worker which also handles commands from other parts of the node. + +use codec::{Decode, Encode}; +use futures::channel::{mpsc, oneshot}; +use futures::future::Either; +use futures::prelude::*; +use futures::task::{Spawn, SpawnExt}; +use log::{debug, trace}; + +use av_store::Store as AvailabilityStore; +use polkadot_primitives::{ + Hash, Block, + parachain::{ + PoVBlock, ValidatorId, ValidatorIndex, Collation, CandidateReceipt, OutgoingMessages, + ErasureChunk, ParachainHost, Id as ParaId, CollatorId, + }, +}; +use polkadot_validation::{ + SharedTable, TableRouter, Network as ParachainNetwork, Validated, GenericStatement, Collators, +}; +use sc_network::{config::Roles, Event, PeerId}; +use sp_api::ProvideRuntimeApi; + +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::{Arc, Weak}; +use std::time::Duration; + +use super::{cost, benefit, PolkadotNetworkService}; +use crate::legacy::validation::{RecentValidatorIds, InsertedRecentKey}; +use crate::legacy::collator_pool::Role as CollatorRole; + +/// The current protocol version. +pub const VERSION: u32 = 1; +/// The minimum supported protocol version. +pub const MIN_SUPPORTED_VERSION: u32 = 1; + +/// The engine ID of the polkadot network protocol. +pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot2"; + +pub use crate::legacy::gossip::ChainContext; + +// Messages from the service API or network adapter. +enum ServiceToWorkerMsg { + // basic peer messages. + PeerConnected(PeerId, Roles), + PeerMessage(PeerId, Vec), + PeerDisconnected(PeerId), + + // service messages. + BuildConsensusNetworking(Arc, Vec, oneshot::Sender), + DropConsensusNetworking(Hash), + LocalCollation( + Hash, // relay-parent + Collation, + CandidateReceipt, + (ValidatorIndex, Vec), + ), + FetchPoVBlock( + Hash, // relay-parent + CandidateReceipt, + oneshot::Sender, + ), + AwaitCollation( + Hash, // relay-parent, + ParaId, + oneshot::Sender, + ), + NoteBadCollator( + CollatorId, + ), +} + +/// An async handle to the network service. +#[derive(Clone)] +pub struct Service { + sender: mpsc::Sender, + network_service: Arc, +} + +/// Registers the protocol. +/// +/// You are very strongly encouraged to call this method very early on. Any connection open +/// will retain the protocols that were registered then, and not any new one. +pub fn start( + service: Arc, + config: Config, + availability_store: AvailabilityStore, + chain_context: C, + api: Arc, + executor: SP, +) -> Result where + C: ChainContext + 'static, + Api: ProvideRuntimeApi + Send + Sync + 'static, + Api::Api: ParachainHost, + SP: Spawn + Clone + Send + 'static, +{ + const SERVICE_TO_WORKER_BUF: usize = 256; + + let mut event_stream = service.event_stream(); + let (mut worker_sender, worker_receiver) = mpsc::channel(SERVICE_TO_WORKER_BUF); + + let gossip_validator = crate::legacy::gossip::register_validator( + service.clone(), + chain_context, + &executor, + ); + executor.spawn(worker_loop( + config, + service.clone(), + availability_store, + gossip_validator, + worker_sender.clone(), + api, + worker_receiver, + executor.clone(), + ))?; + + let polkadot_service = Service { + sender: worker_sender.clone(), + network_service: service.clone(), + }; + + executor.spawn(async move { + while let Some(event) = event_stream.next().await { + let res = match event { + Event::Dht(_) => continue, + Event::NotificationStreamOpened { + remote, + engine_id, + roles, + } => { + if engine_id != POLKADOT_ENGINE_ID { continue } + + worker_sender.send(ServiceToWorkerMsg::PeerConnected(remote, roles)).await + }, + Event::NotificationStreamClosed { + remote, + engine_id, + } => { + if engine_id != POLKADOT_ENGINE_ID { continue } + + worker_sender.send(ServiceToWorkerMsg::PeerDisconnected(remote)).await + }, + Event::NotificationsReceived { + remote, + messages, + } => { + let our_notifications = messages.into_iter() + .filter_map(|(engine, message)| if engine == POLKADOT_ENGINE_ID { + Some(message) + } else { + None + }) + .collect(); + + worker_sender.send( + ServiceToWorkerMsg::PeerMessage(remote, our_notifications) + ).await + } + }; + + if let Err(e) = res { + // full is impossible here, as we've `await`ed the value being sent. + if e.is_disconnected() { + break + } + } + } + })?; + + Ok(polkadot_service) +} + +/// The Polkadot protocol status message. +#[derive(Debug, Encode, Decode)] +pub struct Status { + version: u32, // protocol version. + collating_for: Option<(CollatorId, ParaId)>, +} + +/// Polkadot-specific messages from peer to peer. +#[derive(Debug, Encode, Decode)] +pub enum Message { + /// Exchange status with a peer. This should be the first message sent. + #[codec(index = "0")] + Status(Status), + /// Inform a peer of their role as a collator. May only be sent after + /// validator ID. + #[codec(index = "1")] + CollatorRole(CollatorRole), + /// Send a collation. + #[codec(index = "2")] + Collation(Hash, Collation), + /// Inform a peer of a new validator public key. + #[codec(index = "3")] + ValidatorId(ValidatorId), +} + +// ensures collator-protocol messages are sent in correct order. +// session key must be sent before collator role. +enum CollatorState { + Fresh, + RolePending(CollatorRole), + Primed(Option), +} + +impl CollatorState { + fn send_key(&mut self, key: ValidatorId, mut f: F) { + f(Message::ValidatorId(key)); + if let CollatorState::RolePending(role) = *self { + f(Message::CollatorRole(role)); + *self = CollatorState::Primed(Some(role)); + } + } + + fn set_role(&mut self, role: CollatorRole, mut f: F) { + if let CollatorState::Primed(ref mut r) = *self { + f(Message::CollatorRole(role)); + *r = Some(role); + } else { + *self = CollatorState::RolePending(role); + } + } +} + +enum ProtocolState { + Fresh, + Ready(Status, CollatorState), +} + +struct PeerData { + claimed_validator: bool, + protocol_state: ProtocolState, + session_keys: RecentValidatorIds, +} + +impl PeerData { + fn ready_and_collating_for(&self) -> Option<(CollatorId, ParaId)> { + match self.protocol_state { + ProtocolState::Ready(ref status, _) => status.collating_for.clone(), + _ => None, + } + } + + fn collator_state_mut(&mut self) -> Option<&mut CollatorState> { + match self.protocol_state { + ProtocolState::Ready(_, ref mut c_state) => Some(c_state), + _ => None, + } + } + + fn should_send_key(&self) -> bool { + self.claimed_validator || self.ready_and_collating_for().is_some() + } +} + +struct ConsensusNetworkingInstance { + statement_table: Arc, + relay_parent: Hash, + attestation_topic: Hash, + _drop_signal: exit_future::Signal, +} + +type RegisteredMessageValidator = crate::legacy::gossip::RegisteredMessageValidator; + +/// Protocol configuration. +#[derive(Default)] +pub struct Config { + /// Which collator-id to use when collating, and on which parachain. + /// `None` if not collating. + pub collating_for: Option<(CollatorId, ParaId)>, +} + +struct ProtocolHandler { + service: Arc, + peers: HashMap, + collators: crate::legacy::collator_pool::CollatorPool, + local_collations: crate::legacy::local_collations::LocalCollations, + config: Config, +} + +impl ProtocolHandler { + fn new( + service: Arc, + config: Config, + ) -> Self { + ProtocolHandler { + service, + peers: HashMap::new(), + collators: Default::default(), + local_collations: Default::default(), + config, + } + } + + fn on_connect(&mut self, peer: PeerId, roles: Roles) { + let claimed_validator = roles.contains(sc_network::config::Roles::AUTHORITY); + + self.peers.insert(peer.clone(), PeerData { + claimed_validator, + protocol_state: ProtocolState::Fresh, + session_keys: Default::default(), + }); + + let status = Message::Status(Status { + version: VERSION, + collating_for: self.config.collating_for.clone(), + }).encode(); + + self.service.write_notification(peer, POLKADOT_ENGINE_ID, status); + } + + fn on_disconnect(&mut self, peer: PeerId) { + let mut new_primary = None; + if let Some(data) = self.peers.remove(&peer) { + if let Some((collator_id, _)) = data.ready_and_collating_for() { + if self.collators.collator_id_to_peer_id(&collator_id) == Some(&peer) { + new_primary = self.collators.on_disconnect(collator_id); + } + } + } + + let service = &self.service; + let peers = &mut self.peers; + if let Some(new_primary) = new_primary { + let new_primary_peer_id = match self.collators.collator_id_to_peer_id(&new_primary) { + None => return, + Some(p) => p.clone(), + }; + if let Some(c_state) = peers.get_mut(&new_primary_peer_id) + .and_then(|p| p.collator_state_mut()) + { + c_state.set_role( + CollatorRole::Primary, + |msg| service.write_notification( + new_primary_peer_id.clone(), + POLKADOT_ENGINE_ID, + msg.encode(), + ), + ); + } + } + } + + fn on_raw_messages(&mut self, remote: PeerId, messages: Vec) { + for raw_message in messages { + match Message::decode(&mut raw_message.as_ref()) { + Ok(message) => { + self.service.report_peer(remote.clone(), benefit::VALID_FORMAT); + match message { + Message::Status(status) => { + self.on_status(remote.clone(), status); + } + Message::CollatorRole(role) => { + self.on_collator_role(remote.clone(), role) + } + Message::Collation(relay_parent, collation) => { + self.on_remote_collation(remote.clone(), relay_parent, collation); + } + Message::ValidatorId(session_key) => { + self.on_validator_id(remote.clone(), session_key) + } + } + }, + Err(_) => self.service.report_peer(remote.clone(), cost::INVALID_FORMAT), + } + } + } + + fn on_status(&mut self, remote: PeerId, status: Status) { + let peer = match self.peers.get_mut(&remote) { + None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return } + Some(p) => p, + }; + + match peer.protocol_state { + ProtocolState::Fresh => { + peer.protocol_state = ProtocolState::Ready(status, CollatorState::Fresh); + if let Some((collator_id, para_id)) = peer.ready_and_collating_for() { + let collator_attached = self.collators + .collator_id_to_peer_id(&collator_id) + .map_or(false, |id| id != &remote); + + // we only care about the first connection from this collator. + if !collator_attached { + let role = self.collators + .on_new_collator(collator_id, para_id, remote.clone()); + let service = &self.service; + if let Some(c_state) = peer.collator_state_mut() { + c_state.set_role(role, |msg| service.write_notification( + remote.clone(), + POLKADOT_ENGINE_ID, + msg.encode(), + )); + } + } + } + } + ProtocolState::Ready(_, _) => { + self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE); + } + } + } + + fn on_remote_collation(&mut self, remote: PeerId, relay_parent: Hash, collation: Collation) { + let peer = match self.peers.get_mut(&remote) { + None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return } + Some(p) => p, + }; + + let (collator_id, para_id) = match peer.ready_and_collating_for() { + None => { + self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE); + return + } + Some(x) => x, + }; + + let collation_para = collation.info.parachain_index; + let collated_acc = collation.info.collator.clone(); + + let structurally_valid = para_id == collation_para && collator_id == collated_acc; + if structurally_valid && collation.info.check_signature().is_ok() { + debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", + para_id, remote); + + if self.collators.collator_id_to_peer_id(&collator_id) == Some(&remote) { + self.collators.on_collation(collator_id, relay_parent, collation); + self.service.report_peer(remote, benefit::GOOD_COLLATION); + } + } else { + self.service.report_peer(remote, cost::INVALID_FORMAT); + } + } + + fn on_collator_role(&mut self, remote: PeerId, role: CollatorRole) { + let collations_to_send; + + { + let peer = match self.peers.get_mut(&remote) { + None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return } + Some(p) => p, + }; + + match peer.protocol_state { + ProtocolState::Fresh => { + self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE); + return; + } + ProtocolState::Ready(_, _) => { + let last_key = match peer.session_keys.as_slice().last() { + None => { + self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE); + return; + } + Some(k) => k, + }; + + collations_to_send = self.local_collations + .note_validator_role(last_key.clone(), role); + } + } + } + + self.send_peer_collations(remote, collations_to_send); + } + + fn on_validator_id(&mut self, remote: PeerId, key: ValidatorId) { + let mut collations_to_send = Vec::new(); + + { + let peer = match self.peers.get_mut(&remote) { + None => { self.service.report_peer(remote, cost::UNKNOWN_PEER); return } + Some(p) => p, + }; + + match peer.protocol_state { + ProtocolState::Fresh => { + self.service.report_peer(remote, cost::UNEXPECTED_MESSAGE); + return + } + ProtocolState::Ready(_, _) => { + if let InsertedRecentKey::New(Some(last)) = peer.session_keys.insert(key.clone()) { + collations_to_send = self.local_collations.fresh_key(&last, &key); + } + } + } + } + + self.send_peer_collations(remote, collations_to_send); + } + + fn send_peer_collations(&self, remote: PeerId, collations: Vec<(Hash, Collation)>) { + for (relay_parent, collation) in collations { + self.service.write_notification( + remote.clone(), + POLKADOT_ENGINE_ID, + Message::Collation(relay_parent, collation).encode(), + ); + } + } + + fn await_collation( + &mut self, + relay_parent: Hash, + para_id: ParaId, + sender: oneshot::Sender, + ) { + self.collators.await_collation(relay_parent, para_id, sender); + } + + fn collect_garbage(&mut self) { + self.collators.collect_garbage(None); + self.local_collations.collect_garbage(None); + } + + fn note_bad_collator(&mut self, who: CollatorId) { + if let Some(peer) = self.collators.collator_id_to_peer_id(&who) { + self.service.report_peer(peer.clone(), cost::BAD_COLLATION); + } + } + + // distribute a new session key to any relevant peers. + fn distribute_new_session_key(&mut self, key: ValidatorId) { + let service = &self.service; + + for (peer_id, peer) in self.peers.iter_mut() { + if !peer.should_send_key() { continue } + + if let Some(c_state) = peer.collator_state_mut() { + c_state.send_key(key.clone(), |msg| service.write_notification( + peer_id.clone(), + POLKADOT_ENGINE_ID, + msg.encode(), + )); + } + } + } +} + +async fn worker_loop( + config: Config, + service: Arc, + availability_store: AvailabilityStore, + gossip_handle: RegisteredMessageValidator, + sender: mpsc::Sender, + api: Arc, + mut receiver: mpsc::Receiver, + executor: Sp, +) where + Api: ProvideRuntimeApi + Send + Sync + 'static, + Api::Api: ParachainHost, + Sp: Spawn + Clone + Send + 'static, +{ + const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); + + let mut protocol_handler = ProtocolHandler::new(service, config); + let mut consensus_instances = HashMap::new(); + let mut local_keys = RecentValidatorIds::default(); + + let mut collect_garbage = stream::unfold((), move |_| { + futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ()))) + }).map(drop); + + loop { + let message = match future::select(receiver.next(), collect_garbage.next()).await { + Either::Left((None, _)) | Either::Right((None, _)) => break, + Either::Left((Some(message), _)) => message, + Either::Right(_) => { + protocol_handler.collect_garbage(); + continue + } + }; + + match message { + ServiceToWorkerMsg::PeerConnected(remote, roles) => { + protocol_handler.on_connect(remote, roles); + } + ServiceToWorkerMsg::PeerDisconnected(remote) => { + protocol_handler.on_disconnect(remote); + } + ServiceToWorkerMsg::PeerMessage(remote, messages) => { + protocol_handler.on_raw_messages(remote, messages) + } + + ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, router_sender) => { + // glue: let gossip know about our new local leaf. + let relay_parent = table.consensus_parent_hash().clone(); + let (signal, exit) = exit_future::signal(); + + let router = Router { + inner: Arc::new(RouterInner { relay_parent, sender: sender.clone() }), + }; + + let key = table.session_key(); + if let Some(key) = key { + if let InsertedRecentKey::New(_) = local_keys.insert(key.clone()) { + protocol_handler.distribute_new_session_key(key); + } + } + + let new_leaf_actions = gossip_handle.new_local_leaf( + relay_parent, + crate::legacy::gossip::MessageValidationData { authorities }, + |queue_root| availability_store.queue_by_root(queue_root), + ); + + new_leaf_actions.perform(&gossip_handle); + + consensus_instances.insert(relay_parent, ConsensusNetworkingInstance { + statement_table: table.clone(), + relay_parent, + attestation_topic: crate::legacy::router::attestation_topic(relay_parent), + _drop_signal: signal, + }); + + let weak_router = Arc::downgrade(&router.inner); + + // glue the incoming messages, shared table, and validation + // work together. + let _ = executor.spawn(statement_import_loop( + relay_parent, + table, + api.clone(), + weak_router, + gossip_handle.clone(), + exit, + executor.clone(), + )); + + let _ = router_sender.send(router); + } + ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { + consensus_instances.remove(&relay_parent); + } + ServiceToWorkerMsg::LocalCollation(relay_parent, collation, receipt, chunks) => { + let instance = match consensus_instances.get(&relay_parent) { + None => continue, + Some(instance) => instance, + }; + + distribute_local_collation( + instance, + collation, + receipt, + chunks, + &gossip_handle, + ); + } + ServiceToWorkerMsg::FetchPoVBlock(_relay_parent, _candidate, _sender) => { + // TODO https://github.com/paritytech/polkadot/issues/742: + // create a filter on gossip for it and send to sender. + } + ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => { + debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); + protocol_handler.await_collation(relay_parent, para_id, sender) + } + ServiceToWorkerMsg::NoteBadCollator(collator) => { + protocol_handler.note_bad_collator(collator); + } + } + } +} + +// the internal loop of waiting for messages and spawning validation work +// as a result of those messages. this future exits when `exit` is ready. +async fn statement_import_loop( + relay_parent: Hash, + table: Arc, + api: Arc, + weak_router: Weak, + validator: RegisteredMessageValidator, + mut exit: exit_future::Exit, + executor: impl Spawn, +) where + Api: ProvideRuntimeApi + Send + Sync + 'static, + Api::Api: ParachainHost, +{ + let topic = crate::legacy::router::attestation_topic(relay_parent); + let mut checked_messages = validator.gossip_messages_for(topic) + .filter_map(|msg| match msg.0 { + crate::legacy::gossip::GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), + _ => future::ready(None), + }); + + let mut deferred_statements = crate::legacy::router::DeferredStatements::new(); + + loop { + let statement = match future::select(exit, checked_messages.next()).await { + Either::Left(_) | Either::Right((None, _)) => return, + Either::Right((Some(statement), e)) => { + exit = e; + statement + } + }; + + // defer any statements for which we haven't imported the candidate yet + let c_hash = { + let candidate_data = match statement.statement { + GenericStatement::Candidate(ref c) => Some(c.hash()), + GenericStatement::Valid(ref hash) + | GenericStatement::Invalid(ref hash) + => table.with_candidate(hash, |c| c.map(|_| *hash)), + }; + match candidate_data { + Some(x) => x, + None => { + deferred_statements.push(statement); + continue; + } + } + }; + + // import all statements pending on this candidate + let (mut statements, _traces) = if let GenericStatement::Candidate(_) = statement.statement { + deferred_statements.take_deferred(&c_hash) + } else { + (Vec::new(), Vec::new()) + }; + + // prepend the candidate statement. + debug!(target: "validation", "Importing statements about candidate {:?}", c_hash); + statements.insert(0, statement); + + let producers: Vec<_> = { + // create a temporary router handle for importing all of these statements + let temp_router = match weak_router.upgrade() { + None => break, + Some(inner) => Router { inner }, + }; + + table.import_remote_statements( + &temp_router, + statements.iter().cloned(), + ) + }; + + // dispatch future work as necessary. + for (producer, statement) in producers.into_iter().zip(statements) { + if let Some(_sender) = table.index_to_id(statement.sender) { + if let Some(producer) = producer { + trace!(target: "validation", "driving statement work to completion"); + + let work = producer.prime(api.clone()).validate(); + + let work = future::select(work.boxed(), exit.clone()).map(drop); + let _ = executor.spawn(work); + } + } + } + } +} + +// distribute a "local collation": this is the collation gotten by a validator +// from a collator. it needs to be distributed to other validators in the same +// group. +fn distribute_local_collation( + instance: &ConsensusNetworkingInstance, + collation: Collation, + receipt: CandidateReceipt, + chunks: (ValidatorIndex, Vec), + gossip_handle: &RegisteredMessageValidator, +) { + // produce a signed statement. + let hash = receipt.hash(); + let erasure_root = receipt.erasure_root; + let validated = Validated::collated_local( + receipt, + collation.pov.clone(), + OutgoingMessages { outgoing_messages: Vec::new() }, + ); + + let statement = crate::legacy::gossip::GossipStatement::new( + instance.relay_parent, + match instance.statement_table.import_validated(validated) { + None => return, + Some(s) => s, + } + ); + + gossip_handle.gossip_message(instance.attestation_topic, statement.into()); + + for chunk in chunks.1 { + let index = chunk.index; + let message = crate::legacy::gossip::ErasureChunkMessage { + chunk, + relay_parent: instance.relay_parent, + candidate_hash: hash, + }; + + gossip_handle.gossip_message( + av_store::erasure_coding_topic(instance.relay_parent, erasure_root, index), + message.into(), + ); + } +} + +/// Routing logic for a particular attestation session. +#[derive(Clone)] +pub struct Router { + inner: Arc, +} + +// note: do _not_ make this `Clone`: the drop implementation needs to _uniquely_ +// send the `DropConsensusNetworking` message. +struct RouterInner { + relay_parent: Hash, + sender: mpsc::Sender, +} + +impl Drop for RouterInner { + fn drop(&mut self) { + let res = self.sender.try_send( + ServiceToWorkerMsg::DropConsensusNetworking(self.relay_parent) + ); + + if let Err(e) = res { + assert!( + !e.is_full(), + "futures 0.3 guarantees at least one free slot in the capacity \ + per sender; this is the first message sent via this sender; \ + therefore we will not have to wait for capacity; qed" + ); + // other error variants (disconnection) are fine here. + } + } +} + +impl ParachainNetwork for Service { + type Error = future::Either; + type TableRouter = Router; + type BuildTableRouter = Pin>>>; + + fn build_table_router( + &self, + table: Arc, + authorities: &[ValidatorId], + ) -> Self::BuildTableRouter { + let authorities = authorities.to_vec(); + let mut sender = self.sender.clone(); + + let (tx, rx) = oneshot::channel(); + Box::pin(async move { + sender.send( + ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, tx) + ).map_err(future::Either::Left).await?; + + rx.map_err(future::Either::Right).await + }) + } +} + +impl Collators for Service { + type Error = future::Either; + type Collation = Pin> + Send>>; + + fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { + let (tx, rx) = oneshot::channel(); + let mut sender = self.sender.clone(); + + Box::pin(async move { + sender.send( + ServiceToWorkerMsg::AwaitCollation(relay_parent, parachain, tx) + ).map_err(future::Either::Left).await?; + + rx.map_err(future::Either::Right).await + }) + } + + fn note_bad_collator(&self, collator: CollatorId) { + let _ = self.sender.clone().try_send(ServiceToWorkerMsg::NoteBadCollator(collator)); + } +} + +/// Errors when interacting with the statement router. +#[derive(Debug, derive_more::Display, derive_more::From)] +pub enum RouterError { + #[display(fmt = "Encountered unexpected I/O error: {}", _0)] + Io(std::io::Error), + #[display(fmt = "Worker hung up while answering request.")] + Canceled(oneshot::Canceled), + #[display(fmt = "Could not reach worker with request: {}", _0)] + SendError(mpsc::SendError), +} + +impl TableRouter for Router { + type Error = RouterError; + type SendLocalCollation = Pin> + Send>>; + type FetchValidationProof = Pin> + Send>>; + + fn local_collation( + &self, + collation: Collation, + receipt: CandidateReceipt, + _outgoing: OutgoingMessages, + chunks: (ValidatorIndex, &[ErasureChunk]), + ) -> Self::SendLocalCollation { + let message = ServiceToWorkerMsg::LocalCollation( + self.inner.relay_parent.clone(), + collation, + receipt, + (chunks.0, chunks.1.to_vec()), + ); + let mut sender = self.inner.sender.clone(); + Box::pin(async move { + sender.send(message).map_err(Into::into).await + }) + } + + fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof { + let (tx, rx) = oneshot::channel(); + let message = ServiceToWorkerMsg::FetchPoVBlock( + self.inner.relay_parent.clone(), + candidate.clone(), + tx, + ); + + let mut sender = self.inner.sender.clone(); + Box::pin(async move { + sender.send(message).await?; + rx.map_err(Into::into).await + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn router_inner_drop_sends_worker_message() { + let parent = [1; 32].into(); + + let (sender, mut receiver) = mpsc::channel(0); + drop(RouterInner { + relay_parent: parent, + sender, + }); + + match receiver.try_next() { + Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), + _ => panic!("message not sent"), + } + } +} diff --git a/service/src/lib.rs b/service/src/lib.rs index b32f85e4e667..fe140b9df6ec 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -22,7 +22,10 @@ use sc_client::LongestChain; use std::sync::Arc; use std::time::Duration; use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance}; -use polkadot_network::{gossip::{self as network_gossip, Known}, validation::ValidationNetwork}; +use polkadot_network::legacy::{ + gossip::{self as network_gossip, Known}, + validation::ValidationNetwork, +}; use service::{error::{Error as ServiceError}, ServiceBuilder}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use inherents::InherentDataProviders; @@ -39,7 +42,7 @@ pub use sc_client_api::backend::Backend; pub use sp_api::{Core as CoreApi, ConstructRuntimeApi, ProvideRuntimeApi, StateBackend}; pub use sp_runtime::traits::HasherFor; pub use consensus_common::SelectChain; -pub use polkadot_network::PolkadotProtocol; +pub use polkadot_network::legacy::PolkadotProtocol; pub use polkadot_primitives::parachain::{CollatorId, ParachainHost}; pub use polkadot_primitives::Block; pub use sp_core::Blake2Hasher; @@ -342,7 +345,8 @@ pub fn new_full( let mut path = PathBuf::from(db_path); path.push("availability"); - let gossip = polkadot_network::AvailabilityNetworkShim(gossip_validator.clone()); + let gossip = polkadot_network::legacy + ::AvailabilityNetworkShim(gossip_validator.clone()); #[cfg(not(target_os = "unknown"))] { diff --git a/validation/src/block_production.rs b/validation/src/block_production.rs index bedd83f78961..76001ca110a6 100644 --- a/validation/src/block_production.rs +++ b/validation/src/block_production.rs @@ -132,7 +132,7 @@ pub struct Proposer { parent_hash: Hash, parent_id: BlockId, parent_number: BlockNumber, - tracker: Arc, + tracker: crate::validation_service::ValidationInstanceHandle, transaction_pool: Arc, slot_duration: u64, backend: Arc, diff --git a/validation/src/collation.rs b/validation/src/collation.rs index 1cfebc101a9e..0be87a5a94a0 100644 --- a/validation/src/collation.rs +++ b/validation/src/collation.rs @@ -55,6 +55,9 @@ pub trait Collators: Clone { /// /// This does not have to guarantee local availability, as a valid collation /// will be passed to the `TableRouter` instance. + /// + /// The returned future may be prematurely concluded if the `relay_parent` goes + /// out of date. fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation; /// Note a bad collator. TODO: take proof (https://github.com/paritytech/polkadot/issues/217) diff --git a/validation/src/lib.rs b/validation/src/lib.rs index 41f2552aebda..73ad3e011768 100644 --- a/validation/src/lib.rs +++ b/validation/src/lib.rs @@ -76,9 +76,13 @@ pub type Incoming = Vec<(ParaId, Vec)>; /// A handle to a statement table router. /// /// This is expected to be a lightweight, shared type like an `Arc`. +/// Once all instances are dropped, consensus networking for this router +/// should be cleaned up. pub trait TableRouter: Clone { /// Errors when fetching data from the network. type Error: std::fmt::Debug; + /// Future that drives sending of the local collation to the network. + type SendLocalCollation: Future>; /// Future that resolves when candidate data is fetched. type FetchValidationProof: Future>; @@ -90,9 +94,12 @@ pub trait TableRouter: Clone { receipt: CandidateReceipt, outgoing: OutgoingMessages, chunks: (ValidatorIndex, &[ErasureChunk]), - ); + ) -> Self::SendLocalCollation; /// Fetch validation proof for a specific candidate. + /// + /// This future must conclude once all `Clone`s of this `TableRouter` have + /// been cleaned up. fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> Self::FetchValidationProof; } @@ -111,11 +118,10 @@ pub trait Network { /// Instantiate a table router using the given shared table. /// Also pass through any outgoing messages to be broadcast to peers. - fn communication_for( + fn build_table_router( &self, table: Arc, authorities: &[ValidatorId], - exit: exit_future::Exit, ) -> Self::BuildTableRouter; } diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index 69d6e7fd1ef5..5890cbc9c26e 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -616,6 +616,7 @@ mod tests { struct DummyRouter; impl TableRouter for DummyRouter { type Error = ::std::io::Error; + type SendLocalCollation = future::Ready>; type FetchValidationProof = future::Ready>; fn local_collation( @@ -624,7 +625,7 @@ mod tests { _candidate: CandidateReceipt, _outgoing: OutgoingMessages, _chunks: (ValidatorIndex, &[ErasureChunk]) - ) {} + ) -> Self::SendLocalCollation { future::ready(Ok(())) } fn fetch_pov_block(&self, _candidate: &CandidateReceipt) -> Self::FetchValidationProof { future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5])) diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index 8139d562d36f..bfb25401b404 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -34,7 +34,7 @@ use sp_blockchain::HeaderBackend; use block_builder::BlockBuilderApi; use consensus::SelectChain; use futures::prelude::*; -use futures::{future::select, task::{Spawn, SpawnExt}}; +use futures::task::{Spawn, SpawnExt}; use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::parachain::{ Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, @@ -57,14 +57,14 @@ pub type TaskExecutor = Arc; // They send a oneshot channel. type ValidationInstanceRequest = ( Hash, - futures::channel::oneshot::Sender, Error>>, + futures::channel::oneshot::Sender>, ); /// A handle to a single instance of parachain validation, which is pinned to /// a specific relay-chain block. This is the instance that should be used when /// constructing any +#[derive(Clone)] pub(crate) struct ValidationInstanceHandle { - _drop_signal: exit_future::Signal, table: Arc, started: Instant, } @@ -92,7 +92,7 @@ impl ServiceHandle { /// /// This can fail if the service task has shut down for some reason. pub(crate) async fn get_validation_instance(self, relay_parent: Hash) - -> Result, Error> + -> Result { let mut sender = self.sender; let instance_rx = loop { @@ -261,7 +261,7 @@ pub(crate) struct ParachainValidationInstances { availability_store: AvailabilityStore, /// Live agreements. Maps relay chain parent hashes to attestation /// instances. - live_instances: HashMap>, + live_instances: HashMap, } impl ParachainValidationInstances where @@ -289,7 +289,7 @@ impl ParachainValidationInstances where keystore: &KeyStorePtr, max_block_data_size: Option, ) - -> Result, Error> + -> Result { use primitives::Pair; @@ -344,23 +344,19 @@ impl ParachainValidationInstances where max_block_data_size, )); - let (_drop_signal, exit) = exit_future::signal(); - - let router = self.network.communication_for( + let router = self.network.build_table_router( table.clone(), &validators, - exit.clone(), ); if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) { - self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index, exit); + self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index); } - let tracker = Arc::new(ValidationInstanceHandle { + let tracker = ValidationInstanceHandle { table, started: Instant::now(), - _drop_signal, - }); + }; self.live_instances.insert(parent_hash, tracker.clone()); @@ -381,7 +377,6 @@ impl ParachainValidationInstances where max_block_data_size: Option, authorities_num: usize, local_id: ValidatorIndex, - exit: exit_future::Exit, ) { let (collators, client) = (self.collators.clone(), self.client.clone()); let availability_store = self.availability_store.clone(); @@ -423,13 +418,20 @@ impl ParachainValidationInstances where "Failed to add erasure chunks: {}", e ); } else { - router.local_collation( + let res = router.local_collation( collation, receipt, outgoing_targeted, (local_id, &chunks), - ); - } + ).await; + + if let Err(e) = res { + warn!( + target: "validation", + "Failed to notify network of local collation: {:?}", e + ); + } + }; } Err(e) => { warn!(target: "validation", "Failed to produce a receipt: {:?}", e); @@ -442,17 +444,17 @@ impl ParachainValidationInstances where } }; - let router = build_router + let router_work = build_router .map_ok(with_router) .map_err(|e| { warn!(target: "validation" , "Failed to build table router: {:?}", e); - }); + }) + .map(|_| ()); - let cancellable_work = select(exit, router).map(drop); // spawn onto thread pool. - if self.spawner.spawn(cancellable_work).is_err() { - error!("Failed to spawn cancellable work task"); + if self.spawner.spawn(router_work).is_err() { + error!("Failed to spawn router work task"); } } }