diff --git a/network/src/legacy/gossip/mod.rs b/network/src/legacy/gossip/mod.rs index 5141f11c9969..a77dd3f54c87 100644 --- a/network/src/legacy/gossip/mod.rs +++ b/network/src/legacy/gossip/mod.rs @@ -327,6 +327,11 @@ pub struct NewLeafActions { } impl NewLeafActions { + #[cfg(test)] + pub fn new() -> Self { + NewLeafActions { actions: Vec::new() } + } + /// Perform the queued actions, feeding into gossip. pub fn perform( self, diff --git a/network/src/protocol.rs b/network/src/protocol/mod.rs similarity index 77% rename from network/src/protocol.rs rename to network/src/protocol/mod.rs index 380c62a7d5ec..f78d84c39289 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol/mod.rs @@ -42,10 +42,11 @@ use polkadot_validation::{ }; use sc_network::{config::Roles, Event, PeerId}; use sp_api::ProvideRuntimeApi; +use sp_runtime::ConsensusEngineId; use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::pin::Pin; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::Duration; use super::{cost, benefit, PolkadotNetworkService}; @@ -58,12 +59,15 @@ pub const VERSION: u32 = 1; pub const MIN_SUPPORTED_VERSION: u32 = 1; /// The engine ID of the polkadot network protocol. -pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot2"; +pub const POLKADOT_ENGINE_ID: ConsensusEngineId = *b"dot2"; /// The protocol name. pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"/polkadot/1"; pub use crate::legacy::gossip::ChainContext; +#[cfg(test)] +mod tests; + // Messages from the service API or network adapter. enum ServiceToWorkerMsg { // basic peer messages. @@ -72,16 +76,14 @@ enum ServiceToWorkerMsg { PeerDisconnected(PeerId), // service messages. - BuildConsensusNetworking(Arc, Vec, oneshot::Sender), + BuildConsensusNetworking(Arc, Vec), DropConsensusNetworking(Hash), SubmitValidatedCollation( - Hash, // relay-parent AbridgedCandidateReceipt, PoVBlock, (ValidatorIndex, Vec), ), FetchPoVBlock( - Hash, // relay-parent AbridgedCandidateReceipt, oneshot::Sender, ), @@ -113,13 +115,87 @@ enum ServiceToWorkerMsg { Hash, // relay-parent, oneshot::Sender + Send>>>, ), + + /// Used in tests to ensure that all other messages sent from the same + /// thread have been flushed. Also executes arbitrary logic with the protocl + /// handler. + #[cfg(test)] + Synchronize(Box), +} + +/// Messages from a background task to the main worker task. +enum BackgroundToWorkerMsg { + // Spawn a given future. + Spawn(future::BoxFuture<'static, ()>), +} + +/// Operations that a handle to an underlying network service should provide. +trait NetworkServiceOps: Send + Sync { + /// Report the peer as having a particular positive or negative value. + fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange); + + /// Write a notification to a given peer. + fn write_notification( + &self, + peer: PeerId, + engine_id: ConsensusEngineId, + notification: Vec, + ); +} + +impl NetworkServiceOps for PolkadotNetworkService { + fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) { + PolkadotNetworkService::report_peer(self, peer, value); + } + + fn write_notification( + &self, + peer: PeerId, + engine_id: ConsensusEngineId, + notification: Vec, + ) { + PolkadotNetworkService::write_notification(self, peer, engine_id, notification); + } +} + +/// Operations that a handle to a gossip network should provide. +trait GossipOps: Clone + Send + crate::legacy::GossipService + 'static { + fn new_local_leaf( + &self, + relay_parent: Hash, + validation_data: crate::legacy::gossip::MessageValidationData, + ) -> crate::legacy::gossip::NewLeafActions; + + /// Register an availability store in the gossip service to evaluate incoming + /// messages with. + fn register_availability_store( + &self, + store: av_store::Store, + ); +} + +impl GossipOps for RegisteredMessageValidator { + fn new_local_leaf( + &self, + relay_parent: Hash, + validation_data: crate::legacy::gossip::MessageValidationData, + ) -> crate::legacy::gossip::NewLeafActions { + RegisteredMessageValidator::new_local_leaf(self, relay_parent, validation_data) + } + + fn register_availability_store( + &self, + store: av_store::Store, + ) { + RegisteredMessageValidator::register_availability_store(self, store); + } } /// An async handle to the network service. #[derive(Clone)] pub struct Service { sender: mpsc::Sender, - network_service: Arc, + network_service: Arc, } /// Registers the protocol. @@ -153,7 +229,6 @@ pub fn start( config, service.clone(), gossip_validator, - worker_sender.clone(), api, worker_receiver, executor.clone(), @@ -305,6 +380,28 @@ struct ConsensusNetworkingInstance { _drop_signal: exit_future::Signal, } +/// A utility future that resolves when the receiving end of a channel has hung up. +/// +/// This is an `.await`-friendly interface around `poll_canceled`. +// TODO: remove in favor of https://github.com/rust-lang/futures-rs/pull/2092/ +// once published. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug)] +pub struct AwaitCanceled<'a, T> { + inner: &'a mut oneshot::Sender, +} + +impl Future for AwaitCanceled<'_, T> { + type Output = (); + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut futures::task::Context<'_>, + ) -> futures::task::Poll<()> { + self.inner.poll_canceled(cx) + } +} + /// Protocol configuration. #[derive(Default)] pub struct Config { @@ -356,33 +453,37 @@ impl RecentValidatorIds { } struct ProtocolHandler { - service: Arc, + service: Arc, peers: HashMap, // reverse mapping from validator-ID to PeerID. Multiple peers can represent // the same validator because of sentry nodes. connected_validators: HashMap>, + consensus_instances: HashMap, collators: crate::legacy::collator_pool::CollatorPool, local_collations: crate::legacy::local_collations::LocalCollations, config: Config, + local_keys: RecentValidatorIds, } impl ProtocolHandler { fn new( - service: Arc, + service: Arc, config: Config, ) -> Self { ProtocolHandler { service, peers: HashMap::new(), connected_validators: HashMap::new(), + consensus_instances: HashMap::new(), collators: Default::default(), local_collations: Default::default(), + local_keys: Default::default(), config, } } fn on_connect(&mut self, peer: PeerId, roles: Roles) { - let claimed_validator = roles.contains(sc_network::config::Roles::AUTHORITY); + let claimed_validator = roles.contains(Roles::AUTHORITY); self.peers.insert(peer.clone(), PeerData { claimed_validator, @@ -586,6 +687,7 @@ impl ProtocolHandler { if let Some(invalidated) = invalidated_key { self.validator_representative_removed(invalidated, &remote); } + self.connected_validators.entry(key).or_insert_with(HashSet::new).insert(remote.clone()); send_peer_collations(&*self.service, remote, collations_to_send); } @@ -658,10 +760,15 @@ impl ProtocolHandler { } } } + + fn drop_consensus_networking(&mut self, relay_parent: &Hash) { + // this triggers an abort of the background task. + self.consensus_instances.remove(relay_parent); + } } fn send_peer_collations( - service: &PolkadotNetworkService, + service: &dyn NetworkServiceOps, remote: PeerId, collations: impl IntoIterator, ) { @@ -674,102 +781,90 @@ fn send_peer_collations( } } -async fn worker_loop( - config: Config, - service: Arc, - gossip_handle: RegisteredMessageValidator, - sender: mpsc::Sender, +struct Worker { + protocol_handler: ProtocolHandler, api: Arc, - mut receiver: mpsc::Receiver, executor: Sp, -) where + gossip_handle: Gossip, + background_to_main_sender: mpsc::Sender, + background_receiver: mpsc::Receiver, + service_receiver: mpsc::Receiver, +} + +impl Worker where Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, - Sp: Spawn + Clone + Send + 'static, + Sp: Spawn + Clone, + Gossip: GossipOps, { - const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); + // spawns a background task to spawn consensus networking. + fn build_consensus_networking( + &mut self, + table: Arc, + authorities: Vec, + ) { + // glue: let gossip know about our new local leaf. + let relay_parent = table.consensus_parent_hash().clone(); + let (signal, exit) = exit_future::signal(); + + let key = table.session_key(); + if let Some(key) = key { + if let InsertedRecentKey::New(_) = self.protocol_handler.local_keys.insert(key.clone()) { + self.protocol_handler.distribute_new_session_key(key); + } + } - let mut protocol_handler = ProtocolHandler::new(service, config); - let mut consensus_instances = HashMap::new(); - let mut local_keys = RecentValidatorIds::default(); + let new_leaf_actions = self.gossip_handle.new_local_leaf( + relay_parent, + crate::legacy::gossip::MessageValidationData { authorities }, + ); - let mut collect_garbage = stream::unfold((), move |_| { - futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ()))) - }).map(drop); + new_leaf_actions.perform(&self.gossip_handle); - loop { - let message = match future::select(receiver.next(), collect_garbage.next()).await { - Either::Left((None, _)) | Either::Right((None, _)) => break, - Either::Left((Some(message), _)) => message, - Either::Right(_) => { - protocol_handler.collect_garbage(); - continue - } - }; + self.protocol_handler.consensus_instances.insert( + relay_parent, + ConsensusNetworkingInstance { + statement_table: table.clone(), + relay_parent, + attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), + _drop_signal: signal, + }, + ); + + // glue the incoming messages, shared table, and validation + // work together. + let _ = self.executor.spawn(statement_import_loop( + relay_parent, + table, + self.api.clone(), + self.gossip_handle.clone(), + self.background_to_main_sender.clone(), + exit, + )); + } + fn handle_service_message(&mut self, message: ServiceToWorkerMsg) { match message { ServiceToWorkerMsg::PeerConnected(remote, roles) => { - protocol_handler.on_connect(remote, roles); + self.protocol_handler.on_connect(remote, roles); } ServiceToWorkerMsg::PeerDisconnected(remote) => { - protocol_handler.on_disconnect(remote); + self.protocol_handler.on_disconnect(remote); } ServiceToWorkerMsg::PeerMessage(remote, messages) => { - protocol_handler.on_raw_messages(remote, messages) + self.protocol_handler.on_raw_messages(remote, messages) } - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, router_sender) => { - // glue: let gossip know about our new local leaf. - let relay_parent = table.consensus_parent_hash().clone(); - let (signal, exit) = exit_future::signal(); - - let router = Router { - inner: Arc::new(RouterInner { relay_parent, sender: sender.clone() }), - }; - - let key = table.session_key(); - if let Some(key) = key { - if let InsertedRecentKey::New(_) = local_keys.insert(key.clone()) { - protocol_handler.distribute_new_session_key(key); - } - } - - let new_leaf_actions = gossip_handle.new_local_leaf( - relay_parent, - crate::legacy::gossip::MessageValidationData { authorities }, - ); - - new_leaf_actions.perform(&gossip_handle); - - consensus_instances.insert(relay_parent, ConsensusNetworkingInstance { - statement_table: table.clone(), - relay_parent, - attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), - _drop_signal: signal, - }); - - let weak_router = Arc::downgrade(&router.inner); - - // glue the incoming messages, shared table, and validation - // work together. - let _ = executor.spawn(statement_import_loop( - relay_parent, - table, - api.clone(), - weak_router, - gossip_handle.clone(), - exit, - executor.clone(), - )); - - let _ = router_sender.send(router); + ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) => { + self.build_consensus_networking(table, authorities); } ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { - consensus_instances.remove(&relay_parent); + self.protocol_handler.drop_consensus_networking(&relay_parent); } - ServiceToWorkerMsg::SubmitValidatedCollation(relay_parent, receipt, pov_block, chunks) => { - let instance = match consensus_instances.get(&relay_parent) { - None => continue, + ServiceToWorkerMsg::SubmitValidatedCollation(receipt, pov_block, chunks) => { + let relay_parent = receipt.relay_parent; + let instance = match self.protocol_handler.consensus_instances.get(&relay_parent) { + None => return, Some(instance) => instance, }; @@ -778,21 +873,21 @@ async fn worker_loop( receipt, pov_block, chunks, - &gossip_handle, + &self.gossip_handle, ); } - ServiceToWorkerMsg::FetchPoVBlock(_relay_parent, _candidate, _sender) => { + ServiceToWorkerMsg::FetchPoVBlock(_candidate, _sender) => { // TODO https://github.com/paritytech/polkadot/issues/742: // create a filter on gossip for it and send to sender. } - ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, sender) => { + ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, mut sender) => { let topic = crate::erasure_coding_topic(&candidate_hash); // for every erasure-root, relay-parent pair, there should only be one // valid chunk with the given index. // // so we only care about the first item of the filtered stream. - let get_msg = gossip_handle.gossip_messages_for(topic) + let get_msg = self.gossip_handle.gossip_messages_for(topic) .filter_map(move |(msg, _)| { future::ready(match msg { GossipMessage::ErasureChunk(chunk) => @@ -809,14 +904,16 @@ async fn worker_loop( "gossip message streams do not conclude early; qed" )); - let _ = executor.spawn(async move { - let chunk = get_msg.await; - let _ = sender.send(chunk); + let _ = self.executor.spawn(async move { + let res = future::select(get_msg, AwaitCanceled { inner: &mut sender }).await; + if let Either::Left((chunk, _)) = res { + let _ = sender.send(chunk); + } }); } ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => { let topic = crate::erasure_coding_topic(&candidate_hash); - gossip_handle.gossip_message( + self.gossip_handle.gossip_message( topic, GossipMessage::ErasureChunk(ErasureChunkMessage { chunk: erasure_chunk, @@ -826,20 +923,20 @@ async fn worker_loop( } ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => { debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); - protocol_handler.await_collation(relay_parent, para_id, sender) + self.protocol_handler.await_collation(relay_parent, para_id, sender) } ServiceToWorkerMsg::NoteBadCollator(collator) => { - protocol_handler.note_bad_collator(collator); + self.protocol_handler.note_bad_collator(collator); } ServiceToWorkerMsg::RegisterAvailabilityStore(store) => { - gossip_handle.register_availability_store(store); + self.gossip_handle.register_availability_store(store); } ServiceToWorkerMsg::OurCollation(targets, collation) => { - protocol_handler.distribute_our_collation(targets, collation); + self.protocol_handler.distribute_our_collation(targets, collation); } ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, sender) => { let topic = crate::legacy::gossip::attestation_topic(relay_parent); - let checked_messages = gossip_handle.gossip_messages_for(topic) + let checked_messages = self.gossip_handle.gossip_messages_for(topic) .filter_map(|msg| match msg.0 { GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), _ => future::ready(None), @@ -848,10 +945,74 @@ async fn worker_loop( let _ = sender.send(checked_messages); } + #[cfg(test)] + ServiceToWorkerMsg::Synchronize(callback) => { + (callback)(&mut self.protocol_handler) + } + } + } + + fn handle_background_message(&mut self, message: BackgroundToWorkerMsg) { + match message { + BackgroundToWorkerMsg::Spawn(task) => { + let _ = self.executor.spawn(task); + } + } + } + + async fn main_loop(&mut self) { + const COLLECT_GARBAGE_INTERVAL: Duration = Duration::from_secs(29); + + let mut collect_garbage = stream::unfold((), move |_| { + futures_timer::Delay::new(COLLECT_GARBAGE_INTERVAL).map(|_| Some(((), ()))) + }).map(drop); + + loop { + futures::select! { + _do_collect = collect_garbage.next() => { + self.protocol_handler.collect_garbage(); + } + service_msg = self.service_receiver.next() => match service_msg { + Some(msg) => self.handle_service_message(msg), + None => return, + }, + background_msg = self.background_receiver.next() => match background_msg { + Some(msg) => self.handle_background_message(msg), + None => return, + }, + } } } } +async fn worker_loop( + config: Config, + service: Arc, + gossip_handle: impl GossipOps, + api: Arc, + receiver: mpsc::Receiver, + executor: Sp, +) where + Api: ProvideRuntimeApi + Send + Sync + 'static, + Api::Api: ParachainHost, + Sp: Spawn + Clone, +{ + const BACKGROUND_TO_MAIN_BUF: usize = 16; + + let (background_tx, background_rx) = mpsc::channel(BACKGROUND_TO_MAIN_BUF); + let mut worker = Worker { + protocol_handler: ProtocolHandler::new(service, config), + api, + executor, + gossip_handle, + background_to_main_sender: background_tx, + background_receiver: background_rx, + service_receiver: receiver, + }; + + worker.main_loop().await +} + // A unique trace for valid statements issued by a validator. #[derive(Hash, PartialEq, Eq, Clone, Debug)] pub(crate) enum StatementTrace { @@ -917,10 +1078,9 @@ async fn statement_import_loop( relay_parent: Hash, table: Arc, api: Arc, - weak_router: Weak, - gossip_handle: RegisteredMessageValidator, + gossip_handle: impl GossipOps, + mut to_worker: mpsc::Sender, mut exit: exit_future::Exit, - executor: impl Spawn, ) where Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, @@ -972,14 +1132,16 @@ async fn statement_import_loop( statements.insert(0, statement); let producers: Vec<_> = { - // create a temporary router handle for importing all of these statements - let temp_router = match weak_router.upgrade() { - None => break, - Some(inner) => Router { inner }, - }; + // TODO: fetch these from gossip. + // https://github.com/paritytech/polkadot/issues/742 + fn ignore_pov_fetch_requests(_: &AbridgedCandidateReceipt) + -> future::Pending> + { + future::pending() + } table.import_remote_statements( - &temp_router, + &ignore_pov_fetch_requests, statements.iter().cloned(), ) }; @@ -1015,7 +1177,14 @@ async fn statement_import_loop( }); let work = future::select(work.boxed(), exit.clone()).map(drop); - let _ = executor.spawn(work); + if let Err(_) = to_worker.send( + BackgroundToWorkerMsg::Spawn(work.boxed()) + ).await { + // can fail only if remote has hung up - worker is dead, + // we should die too. this is defensive, since the exit future + // would fire shortly anyway. + return + } } } } @@ -1030,7 +1199,7 @@ fn distribute_validated_collation( receipt: AbridgedCandidateReceipt, pov_block: PoVBlock, chunks: (ValidatorIndex, Vec), - gossip_handle: &RegisteredMessageValidator, + gossip_handle: &impl GossipOps, ) { // produce a signed statement. let hash = receipt.hash(); @@ -1144,7 +1313,7 @@ impl Service { } impl ParachainNetwork for Service { - type Error = future::Either; + type Error = mpsc::SendError; type TableRouter = Router; type BuildTableRouter = Pin> + Send>>; @@ -1155,14 +1324,19 @@ impl ParachainNetwork for Service { ) -> Self::BuildTableRouter { let authorities = authorities.to_vec(); let mut sender = self.sender.clone(); + let relay_parent = table.consensus_parent_hash().clone(); - let (tx, rx) = oneshot::channel(); Box::pin(async move { sender.send( - ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities, tx) - ).map_err(future::Either::Left).await?; + ServiceToWorkerMsg::BuildConsensusNetworking(table, authorities) + ).await?; - rx.map_err(future::Either::Right).await + Ok(Router { + inner: Arc::new(RouterInner { + relay_parent, + sender, + }) + }) }) } } @@ -1228,6 +1402,8 @@ pub enum RouterError { Canceled(oneshot::Canceled), #[display(fmt = "Could not reach worker with request: {}", _0)] SendError(mpsc::SendError), + #[display(fmt = "Provided candidate receipt does not have expected relay parent {}", _0)] + IncorrectRelayParent(Hash), } impl TableRouter for Router { @@ -1241,8 +1417,13 @@ impl TableRouter for Router { pov_block: PoVBlock, chunks: (ValidatorIndex, &[ErasureChunk]), ) -> Self::SendLocalCollation { + if receipt.relay_parent != self.inner.relay_parent { + return Box::pin( + future::ready(Err(RouterError::IncorrectRelayParent(self.inner.relay_parent))) + ); + } + let message = ServiceToWorkerMsg::SubmitValidatedCollation( - self.inner.relay_parent.clone(), receipt, pov_block, (chunks.0, chunks.1.to_vec()), @@ -1254,9 +1435,14 @@ impl TableRouter for Router { } fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { + if candidate.relay_parent != self.inner.relay_parent { + return Box::pin( + future::ready(Err(RouterError::IncorrectRelayParent(self.inner.relay_parent))) + ); + } + let (tx, rx) = oneshot::channel(); let message = ServiceToWorkerMsg::FetchPoVBlock( - self.inner.relay_parent.clone(), candidate.clone(), tx, ); @@ -1268,24 +1454,3 @@ impl TableRouter for Router { }) } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn router_inner_drop_sends_worker_message() { - let parent = [1; 32].into(); - - let (sender, mut receiver) = mpsc::channel(0); - drop(RouterInner { - relay_parent: parent, - sender, - }); - - match receiver.try_next() { - Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), - _ => panic!("message not sent"), - } - } -} diff --git a/network/src/protocol/tests.rs b/network/src/protocol/tests.rs new file mode 100644 index 000000000000..652f1cff01a6 --- /dev/null +++ b/network/src/protocol/tests.rs @@ -0,0 +1,573 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +//! Tests for the protocol. + +use super::*; +use parking_lot::Mutex; + +use polkadot_primitives::{Block, Header, BlockId}; +use polkadot_primitives::parachain::{ + Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId, + Retriable, CollatorId, AbridgedCandidateReceipt, + GlobalValidationSchedule, LocalValidationData, ErasureChunk, +}; +use polkadot_validation::SharedTable; + +use av_store::{Store as AvailabilityStore, ErasureNetworking}; +use sc_network_gossip::TopicNotification; +use sp_blockchain::Result as ClientResult; +use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi}; +use sp_runtime::traits::{Block as BlockT, HashFor, NumberFor}; +use sp_state_machine::ChangesTrieState; +use sp_core::{crypto::Pair, NativeOrEncoded, ExecutionContext}; +use sp_keyring::Sr25519Keyring; + +use futures::executor::LocalPool; +use futures::task::LocalSpawnExt; + +#[derive(Default)] +struct MockNetworkOps { + recorded: Mutex, +} + +#[derive(Default)] +struct Recorded { + peer_reputations: HashMap, + notifications: Vec<(PeerId, Message)>, +} + +// Test setup registers receivers of gossip messages as well as signals that +// fire when they are taken. +type GossipStreamEntry = (mpsc::UnboundedReceiver, oneshot::Sender<()>); + +#[derive(Default, Clone)] +struct MockGossip { + inner: Arc>>, +} + +impl MockGossip { + fn add_gossip_stream(&self, topic: Hash) + -> (mpsc::UnboundedSender, oneshot::Receiver<()>) + { + let (tx, rx) = mpsc::unbounded(); + let (o_tx, o_rx) = oneshot::channel(); + self.inner.lock().insert(topic, (rx, o_tx)); + (tx, o_rx) + } + + fn contains_listener(&self, topic: &Hash) -> bool { + self.inner.lock().contains_key(topic) + } +} + +impl NetworkServiceOps for MockNetworkOps { + fn report_peer(&self, peer: PeerId, value: sc_network::ReputationChange) { + let mut recorded = self.recorded.lock(); + let total_rep = recorded.peer_reputations.entry(peer).or_insert(0); + + *total_rep = total_rep.saturating_add(value.value); + } + + fn write_notification( + &self, + peer: PeerId, + engine_id: ConsensusEngineId, + notification: Vec, + ) { + assert_eq!(engine_id, POLKADOT_ENGINE_ID); + let message = Message::decode(&mut ¬ification[..]).expect("invalid notification"); + self.recorded.lock().notifications.push((peer, message)); + } +} + +impl crate::legacy::GossipService for MockGossip { + fn gossip_messages_for(&self, topic: Hash) -> crate::legacy::GossipMessageStream { + crate::legacy::GossipMessageStream::new(match self.inner.lock().remove(&topic) { + None => Box::pin(stream::empty()), + Some((rx, o_rx)) => { + let _ = o_rx.send(()); + Box::pin(rx) + } + }) + } + + fn gossip_message(&self, _topic: Hash, _message: GossipMessage) { + + } + + fn send_message(&self, _who: PeerId, _message: GossipMessage) { + + } +} + +impl GossipOps for MockGossip { + fn new_local_leaf( + &self, + _relay_parent: Hash, + _validation_data: crate::legacy::gossip::MessageValidationData, + ) -> crate::legacy::gossip::NewLeafActions { + crate::legacy::gossip::NewLeafActions::new() + } + + fn register_availability_store( + &self, + _store: av_store::Store, + ) { + + } +} + +#[derive(Default)] +struct ApiData { + validators: Vec, + duties: Vec, + active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>, +} + +#[derive(Default, Clone)] +struct TestApi { + data: Arc>, +} + +#[derive(Default)] +struct RuntimeApi { + data: Arc>, +} + +impl ProvideRuntimeApi for TestApi { + type Api = RuntimeApi; + + fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { + RuntimeApi { data: self.data.clone() }.into() + } +} + +impl Core for RuntimeApi { + fn Core_version_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } + + fn Core_execute_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } + + fn Core_initialize_block_runtime_api_impl( + &self, + _: &BlockId, + _: ExecutionContext, + _: Option<&Header>, + _: Vec, + ) -> ClientResult> { + unimplemented!("Not required for testing!") + } +} + +impl ApiErrorExt for RuntimeApi { + type Error = sp_blockchain::Error; +} + +impl ApiExt for RuntimeApi { + type StateBackend = sp_state_machine::InMemoryBackend>; + + fn map_api_result Result, R, E>( + &self, + _: F + ) -> Result { + unimplemented!("Not required for testing!") + } + + fn runtime_version_at(&self, _: &BlockId) -> ClientResult { + unimplemented!("Not required for testing!") + } + + fn record_proof(&mut self) { } + + fn extract_proof(&mut self) -> Option { + None + } + + fn into_storage_changes( + &self, + _: &Self::StateBackend, + _: Option<&ChangesTrieState, NumberFor>>, + _: ::Hash, + ) -> std::result::Result, String> + where Self: Sized + { + unimplemented!("Not required for testing!") + } +} + +impl ParachainHost for RuntimeApi { + fn ParachainHost_validators_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult>> { + Ok(NativeOrEncoded::Native(self.data.lock().validators.clone())) + } + + fn ParachainHost_duty_roster_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + + Ok(NativeOrEncoded::Native(DutyRoster { + validator_duty: self.data.lock().duties.clone(), + })) + } + + fn ParachainHost_active_parachains_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult)>>> { + Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone())) + } + + fn ParachainHost_parachain_code_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult>>> { + Ok(NativeOrEncoded::Native(Some(Vec::new()))) + } + + fn ParachainHost_global_validation_schedule_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option<()>, + _: Vec, + ) -> ClientResult> { + Ok(NativeOrEncoded::Native(Default::default())) + } + + fn ParachainHost_local_validation_data_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _: Option, + _: Vec, + ) -> ClientResult>> { + Ok(NativeOrEncoded::Native(Some(Default::default()))) + } + + fn ParachainHost_get_heads_runtime_api_impl( + &self, + _at: &BlockId, + _: ExecutionContext, + _extrinsics: Option::Extrinsic>>, + _: Vec, + ) -> ClientResult>>> { + Ok(NativeOrEncoded::Native(Some(Vec::new()))) + } +} + +impl super::Service { + async fn connect_peer(&mut self, peer: PeerId, roles: Roles) { + self.sender.send(ServiceToWorkerMsg::PeerConnected(peer, roles)).await.unwrap(); + } + + async fn peer_message(&mut self, peer: PeerId, message: Message) { + let bytes = message.encode().into(); + + self.sender.send(ServiceToWorkerMsg::PeerMessage(peer, vec![bytes])).await.unwrap(); + } + + async fn disconnect_peer(&mut self, peer: PeerId) { + self.sender.send(ServiceToWorkerMsg::PeerDisconnected(peer)).await.unwrap(); + } + + async fn synchronize( + &mut self, + callback: impl FnOnce(&mut ProtocolHandler) -> T + Send + 'static, + ) -> T { + let (tx, rx) = oneshot::channel(); + + let msg = ServiceToWorkerMsg::Synchronize(Box::new(move |proto| { + let res = callback(proto); + if let Err(_) = tx.send(res) { + log::warn!(target: "p_net", "Failed to send synchronization result"); + } + })); + + self.sender.send(msg).await.expect("Worker thread unexpectedly hung up"); + rx.await.expect("Worker thread failed to send back result") + } +} + +fn test_setup(config: Config) -> ( + Service, + MockGossip, + LocalPool, + impl Future + 'static, +) { + let pool = LocalPool::new(); + + let network_ops = Arc::new(MockNetworkOps::default()); + let mock_gossip = MockGossip::default(); + let (worker_tx, worker_rx) = mpsc::channel(0); + let api = Arc::new(TestApi::default()); + + let worker_task = worker_loop( + config, + network_ops.clone(), + mock_gossip.clone(), + api.clone(), + worker_rx, + pool.spawner(), + ); + + let service = Service { + sender: worker_tx, + network_service: network_ops, + }; + + (service, mock_gossip, pool, worker_task) +} + +#[test] +fn router_inner_drop_sends_worker_message() { + let parent = [1; 32].into(); + + let (sender, mut receiver) = mpsc::channel(0); + drop(RouterInner { + relay_parent: parent, + sender, + }); + + match receiver.try_next() { + Ok(Some(ServiceToWorkerMsg::DropConsensusNetworking(x))) => assert_eq!(parent, x), + _ => panic!("message not sent"), + } +} + +#[test] +fn worker_task_shuts_down_when_sender_dropped() { + let (service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + + drop(service); + let _ = pool.run_until(worker_task); +} + +#[test] +fn consensus_instances_cleaned_up() { + let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + let relay_parent = [0; 32].into(); + let authorities = Vec::new(); + + let table = Arc::new(SharedTable::new( + Vec::new(), + HashMap::new(), + None, + relay_parent, + AvailabilityStore::new_in_memory(service.clone()), + None, + )); + + pool.spawner().spawn_local(worker_task).unwrap(); + + let router = pool.run_until( + service.build_table_router(table, &authorities) + ).unwrap(); + + drop(router); + + assert!(pool.run_until(service.synchronize(move |proto| { + !proto.consensus_instances.contains_key(&relay_parent) + }))); +} + +#[test] +fn validator_peer_cleaned_up() { + let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + + let peer = PeerId::random(); + let validator_key = Sr25519Keyring::Alice.pair(); + let validator_id = ValidatorId::from(validator_key.public()); + + pool.spawner().spawn_local(worker_task).unwrap(); + pool.run_until(async move { + service.connect_peer(peer.clone(), Roles::AUTHORITY).await; + service.peer_message(peer.clone(), Message::Status(Status { + version: VERSION, + collating_for: None, + })).await; + service.peer_message(peer.clone(), Message::ValidatorId(validator_id.clone())).await; + + let p = peer.clone(); + let v = validator_id.clone(); + let (peer_has_key, reverse_lookup) = service.synchronize(move |proto| { + let peer_has_key = proto.peers.get(&p).map_or( + false, + |p_data| p_data.session_keys.as_slice().contains(&v), + ); + + let reverse_lookup = proto.connected_validators.get(&v).map_or( + false, + |reps| reps.contains(&p), + ); + + (peer_has_key, reverse_lookup) + }).await; + + assert!(peer_has_key); + assert!(reverse_lookup); + + service.disconnect_peer(peer.clone()).await; + + let p = peer.clone(); + let v = validator_id.clone(); + let (peer_removed, rev_removed) = service.synchronize(move |proto| { + let peer_removed = !proto.peers.contains_key(&p); + let reverse_mapping_removed = !proto.connected_validators.contains_key(&v); + + (peer_removed, reverse_mapping_removed) + }).await; + + assert!(peer_removed); + assert!(rev_removed); + }); +} + +#[test] +fn validator_key_spillover_cleaned() { + let (mut service, _gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + + let peer = PeerId::random(); + let make_validator_id = |ring: Sr25519Keyring| ValidatorId::from(ring.public()); + + // We will push 1 extra beyond what is normally kept. + assert_eq!(RECENT_SESSIONS, 3); + let key_a = make_validator_id(Sr25519Keyring::Alice); + let key_b = make_validator_id(Sr25519Keyring::Bob); + let key_c = make_validator_id(Sr25519Keyring::Charlie); + let key_d = make_validator_id(Sr25519Keyring::Dave); + + let keys = vec![key_a, key_b, key_c, key_d]; + + pool.spawner().spawn_local(worker_task).unwrap(); + pool.run_until(async move { + service.connect_peer(peer.clone(), Roles::AUTHORITY).await; + service.peer_message(peer.clone(), Message::Status(Status { + version: VERSION, + collating_for: None, + })).await; + + for key in &keys { + service.peer_message(peer.clone(), Message::ValidatorId(key.clone())).await; + } + + let p = peer.clone(); + let active_keys = keys[1..].to_vec(); + let discarded_key = keys[0].clone(); + assert!(service.synchronize(move |proto| { + let active_correct = proto.peers.get(&p).map_or(false, |p_data| { + p_data.session_keys.as_slice() == &active_keys[..] + }); + + let active_lookup = active_keys.iter().all(|k| { + proto.connected_validators.get(&k).map_or(false, |m| m.contains(&p)) + }); + + let discarded = !proto.connected_validators.contains_key(&discarded_key); + + active_correct && active_lookup && discarded + }).await); + }); +} + +#[test] +fn erasure_fetch_drop_also_drops_gossip_sender() { + let (service, gossip, mut pool, worker_task) = test_setup(Config { collating_for: None }); + let candidate_hash = [1; 32].into(); + + let expected_index = 1; + + let spawner = pool.spawner(); + + spawner.spawn_local(worker_task).unwrap(); + let topic = crate::erasure_coding_topic(&candidate_hash); + let (mut gossip_tx, gossip_taken_rx) = gossip.add_gossip_stream(topic); + + let test_work = async move { + let chunk_listener = service.fetch_erasure_chunk( + &candidate_hash, + expected_index, + ); + + // spawn an abortable handle to the chunk listener future. + // we will wait until this future has proceeded enough to start grabbing + // messages from gossip, and then we will abort the future. + let (chunk_listener, abort_handle) = future::abortable(chunk_listener); + let handle = spawner.spawn_with_handle(chunk_listener).unwrap(); + gossip_taken_rx.await.unwrap(); + + // gossip listener was taken. and is active. + assert!(!gossip.contains_listener(&topic)); + assert!(!gossip_tx.is_closed()); + + abort_handle.abort(); + + // we must `await` this, otherwise context may never transfer over + // to the spawned `Abortable` future. + assert!(handle.await.is_err()); + loop { + // if dropping the sender leads to the gossip listener + // being cleaned up, we will eventually be unable to send a message + // on the sender. + if gossip_tx.is_closed() { break } + + let fake_chunk = GossipMessage::ErasureChunk( + crate::legacy::gossip::ErasureChunkMessage { + chunk: ErasureChunk { + chunk: vec![], + index: expected_index + 1, + proof: vec![], + }, + candidate_hash, + } + ).encode(); + + match gossip_tx.send(TopicNotification { message: fake_chunk, sender: None }).await { + Err(e) => { assert!(e.is_disconnected()); break }, + Ok(_) => continue, + } + } + }; + + pool.run_until(test_work); +} diff --git a/validation/src/block_production.rs b/validation/src/block_production.rs index ad8ffb86cc6a..5fb3dd8a62ec 100644 --- a/validation/src/block_production.rs +++ b/validation/src/block_production.rs @@ -197,7 +197,6 @@ impl consensus::Proposer for Proposer consensus::Proposer for Proposer { parent_id: BlockId, client: Arc, transaction_pool: Arc, - table: Arc, inherent_data: Option, inherent_digests: DigestFor, deadline: Instant, diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index eb38dae2b871..2b6a8e62271b 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -34,7 +34,7 @@ use futures::channel::oneshot; use log::{warn, debug}; use bitvec::bitvec; -use super::{GroupInfo, TableRouter}; +use super::GroupInfo; use self::includable::IncludabilitySender; use primitives::Pair; use sp_api::ProvideRuntimeApi; @@ -135,14 +135,14 @@ impl SharedTableInner { // // the statement producer, if any, will produce only statements concerning the same candidate // as the one just imported - fn import_remote_statement( + fn import_remote_statement( &mut self, context: &TableContext, - router: &R, + fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch, statement: table::SignedStatement, max_block_data_size: Option, ) -> Option> { let summary = self.table.import_statement(context, statement)?; self.update_trackers(&summary.candidate, context); @@ -175,7 +175,7 @@ impl SharedTableInner { None } Some(candidate) => { - let fetch = router.fetch_pov_block(candidate); + let fetch = fetch_pov_block(candidate); Some(Work { candidate_receipt: candidate.clone(), @@ -446,14 +446,19 @@ impl SharedTable { /// /// The ParachainWork, if any, will produce only statements concerning the same candidate /// as the one just imported - pub fn import_remote_statement( + pub fn import_remote_statement( &self, - router: &R, + fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch, statement: table::SignedStatement, ) -> Option> { - self.inner.lock().import_remote_statement(&*self.context, router, statement, self.max_block_data_size) + self.inner.lock().import_remote_statement( + &*self.context, + fetch_pov_block, + statement, + self.max_block_data_size, + ) } /// Import many statements at once. @@ -464,18 +469,26 @@ impl SharedTable { /// /// The ParachainWork, if any, will produce only statements concerning the same candidate /// as the one just imported - pub fn import_remote_statements(&self, router: &R, iterable: I) -> U + pub fn import_remote_statements( + &self, + fetch_pov_block: impl Fn(&AbridgedCandidateReceipt) -> Fetch, + iterable: I, + ) -> U where - R: TableRouter, I: IntoIterator, U: ::std::iter::FromIterator>>, { let mut inner = self.inner.lock(); iterable.into_iter().map(move |statement| { - inner.import_remote_statement(&*self.context, router, statement, self.max_block_data_size) + inner.import_remote_statement( + &*self.context, + &fetch_pov_block, + statement, + self.max_block_data_size, + ) }).collect() } @@ -562,7 +575,7 @@ impl SharedTable { self.inner.lock().table.get_misbehavior().clone() } - /// Track includability of a given set of candidate hashes. + /// Track includability of a given set of candidate hashes. pub fn track_includability(&self, iterable: I) -> oneshot::Receiver<()> where I: IntoIterator { @@ -626,23 +639,14 @@ mod tests { ) {} } - #[derive(Clone)] - struct DummyRouter; - impl TableRouter for DummyRouter { - type Error = ::std::io::Error; - type SendLocalCollation = future::Ready>; - type FetchValidationProof = future::Ready>; - - fn local_collation( - &self, - _candidate: AbridgedCandidateReceipt, - _pov_block: PoVBlock, - _chunks: (ValidatorIndex, &[ErasureChunk]) - ) -> Self::SendLocalCollation { future::ready(Ok(())) } - - fn fetch_pov_block(&self, _candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { - future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5])) - } + fn lazy_fetch_pov() + -> Box< + dyn Fn(&AbridgedCandidateReceipt) -> future::Ready< + Result + > + > + { + Box::new(|_| future::ok(pov_block_with_data(vec![1, 2, 3, 4, 5]))) } #[test] @@ -688,7 +692,7 @@ mod tests { }; shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement, ).expect("candidate and local validity group are same"); } @@ -736,7 +740,7 @@ mod tests { }; shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement, ).expect("should produce work"); } @@ -909,7 +913,7 @@ mod tests { }; let _a = shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement.clone(), ).expect("should produce work"); @@ -917,7 +921,7 @@ mod tests { .expect("validation has started").is_in_progress()); let b = shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement.clone(), ); @@ -967,7 +971,7 @@ mod tests { .expect("validation has started").is_done()); let a = shared_table.import_remote_statement( - &DummyRouter, + lazy_fetch_pov(), signed_statement, );