diff --git a/availability-store/src/lib.rs b/availability-store/src/lib.rs index bd4f77ae36ac..52df0749864e 100644 --- a/availability-store/src/lib.rs +++ b/availability-store/src/lib.rs @@ -32,7 +32,7 @@ use polkadot_primitives::{ ParachainHost, AvailableData, OmittedValidationData, }, }; -use sp_runtime::traits::{BlakeTwo256, Hash as HashT, HashFor}; +use sp_runtime::traits::HashFor; use sp_blockchain::{Result as ClientResult}; use client::{ BlockchainEvents, BlockBody, @@ -55,7 +55,7 @@ pub use worker::AvailabilityBlockImport; pub use store::AwaitedFrontierEntry; use worker::{ - Worker, WorkerHandle, Chunks, IncludedParachainBlocks, WorkerMsg, MakeAvailable, + Worker, WorkerHandle, IncludedParachainBlocks, WorkerMsg, MakeAvailable, Chunks }; use store::{Store as InnerStore}; @@ -70,23 +70,7 @@ pub struct Config { pub path: PathBuf, } -/// Compute gossip topic for the erasure chunk messages given the relay parent, -/// root and the chunk index. -/// -/// Since at this point we are not able to use [`network`] directly, but both -/// of them need to compute these topics, this lives here and not there. -/// -/// [`network`]: ../polkadot_network/index.html -pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32) -> Hash { - let mut v = relay_parent.as_ref().to_vec(); - v.extend(erasure_root.as_ref()); - v.extend(&index.to_le_bytes()[..]); - v.extend(b"erasure_chunks"); - - BlakeTwo256::hash(&v[..]) -} - -/// A trait that provides a shim for the [`NetworkService`] trait. +/// An abstraction around networking for the availablity-store. /// /// Currently it is not possible to use the networking code in the availability store /// core directly due to a number of loop dependencies it require: @@ -95,26 +79,25 @@ pub fn erasure_coding_topic(relay_parent: Hash, erasure_root: Hash, index: u32) /// /// `availability-store` -> `network` -> `validation` -> `availability-store` /// -/// So we provide this shim trait that gets implemented for a wrapper newtype in -/// the [`network`] module. +/// So we provide this trait that gets implemented for a type in +/// the [`network`] module or a mock in tests. /// -/// [`NetworkService`]: ../polkadot_network/trait.NetworkService.html /// [`network`]: ../polkadot_network/index.html -pub trait ProvideGossipMessages { - /// Get a stream of gossip erasure chunk messages for a given topic. - /// - /// Each item is a tuple (relay_parent, candidate_hash, erasure_chunk) - fn gossip_messages_for( +pub trait ErasureNetworking { + /// Errors that can occur when fetching erasure chunks. + type Error: std::fmt::Debug + 'static; + + /// Fetch an erasure chunk from the networking service. + fn fetch_erasure_chunk( &self, - topic: Hash, - ) -> Pin + Send>>; + candidate_hash: &Hash, + index: u32, + ) -> Pin> + Send>>; - /// Gossip an erasure chunk message. - fn gossip_erasure_chunk( + /// Distributes an erasure chunk to the correct validator node. + fn distribute_erasure_chunk( &self, - relay_parent: Hash, candidate_hash: Hash, - erasure_root: Hash, chunk: ErasureChunk, ); } @@ -148,11 +131,11 @@ impl Store { /// Creating a store among other things starts a background worker thread which /// handles most of the write operations to the storage. #[cfg(not(target_os = "unknown"))] - pub fn new(config: Config, gossip: PGM) -> io::Result - where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static + pub fn new(config: Config, network: EN) -> io::Result + where EN: ErasureNetworking + Send + Sync + Clone + 'static { let inner = InnerStore::new(config)?; - let worker = Arc::new(Worker::start(inner.clone(), gossip)); + let worker = Arc::new(Worker::start(inner.clone(), network)); let to_worker = worker.to_worker().clone(); Ok(Self { @@ -166,11 +149,11 @@ impl Store { /// /// Creating a store among other things starts a background worker thread /// which handles most of the write operations to the storage. - pub fn new_in_memory(gossip: PGM) -> Self - where PGM: ProvideGossipMessages + Send + Sync + Clone + 'static + pub fn new_in_memory(network: EN) -> Self + where EN: ErasureNetworking + Send + Sync + Clone + 'static { let inner = InnerStore::new_in_memory(); - let worker = Arc::new(Worker::start(inner.clone(), gossip)); + let worker = Arc::new(Worker::start(inner.clone(), network)); let to_worker = worker.to_worker().clone(); Self { @@ -204,7 +187,6 @@ impl Store { let to_worker = self.to_worker.clone(); let import = AvailabilityBlockImport::new( - self.inner.clone(), client, wrapped_block_import, spawner, @@ -261,35 +243,38 @@ impl Store { pub async fn add_erasure_chunk( &self, candidate: AbridgedCandidateReceipt, + n_validators: u32, chunk: ErasureChunk, ) -> io::Result<()> { - self.add_erasure_chunks(candidate, vec![chunk]).await + self.add_erasure_chunks(candidate, n_validators, std::iter::once(chunk)).await } /// Adds a set of erasure chunks to storage. /// /// The chunks should be checked for validity against the root of encoding - /// and it's proof prior to calling this. + /// and its proof prior to calling this. /// /// This method will send the chunks to the background worker, allowing caller to /// asynchrounously waiting for the result. pub async fn add_erasure_chunks( &self, candidate: AbridgedCandidateReceipt, + n_validators: u32, chunks: I, ) -> io::Result<()> where I: IntoIterator { let candidate_hash = candidate.hash(); - let relay_parent = candidate.relay_parent; self.add_candidate(candidate).await?; + let (s, r) = oneshot::channel(); let chunks = chunks.into_iter().collect(); + let msg = WorkerMsg::Chunks(Chunks { - relay_parent, candidate_hash, chunks, + n_validators, result: s, }); diff --git a/availability-store/src/store.rs b/availability-store/src/store.rs index 03a7855151da..e3b1e3592979 100644 --- a/availability-store/src/store.rs +++ b/availability-store/src/store.rs @@ -60,33 +60,31 @@ fn candidate_key(candidate_hash: &Hash) -> Vec { (candidate_hash, 2i8).encode() } -fn available_chunks_key(relay_parent: &Hash, erasure_root: &Hash) -> Vec { - (relay_parent, erasure_root, 3i8).encode() -} - fn candidates_with_relay_parent_key(relay_block: &Hash) -> Vec { (relay_block, 4i8).encode() } // meta keys -fn awaited_chunks_key() -> [u8; 14] { - *b"awaited_chunks" -} +const AWAITED_CHUNKS_KEY: [u8; 14] = *b"awaited_chunks"; fn validator_index_and_n_validators_key(relay_parent: &Hash) -> Vec { (relay_parent, 1i8).encode() } +fn available_chunks_key(candidate_hash: &Hash) -> Vec { + (candidate_hash, 2i8).encode() +} + /// An entry in the awaited frontier of chunks we are interested in. #[derive(Encode, Decode, Debug, Hash, PartialEq, Eq, Clone)] pub struct AwaitedFrontierEntry { - /// The relay-chain parent block hash. + /// The hash of the candidate for which we want to fetch a chunk for. + /// There will be duplicate entries in the case of multiple candidates with + /// the same erasure-root, but this is unlikely. + pub candidate_hash: Hash, + /// Although the relay-parent is implicitly referenced by the candidate hash, + /// we include it here as well for convenience in pruning the set. pub relay_parent: Hash, - /// The erasure-chunk trie root we are comparing against. - /// - /// We index by erasure-root because there may be multiple candidates - /// with the same erasure root. - pub erasure_root: Hash, /// The index of the validator we represent. pub validator_index: u32, } @@ -153,7 +151,7 @@ impl Store { /// Get a set of all chunks we are waiting for. pub fn awaited_chunks(&self) -> Option> { - self.query_inner(columns::META, &awaited_chunks_key()).map(|vec: Vec| { + self.query_inner(columns::META, &AWAITED_CHUNKS_KEY).map(|vec: Vec| { HashSet::from_iter(vec.into_iter()) }) } @@ -183,21 +181,21 @@ impl Store { if let Some((validator_index, _)) = self.get_validator_index_and_n_validators(relay_parent) { let candidates = candidates.clone(); let awaited_frontier: Vec = self - .query_inner(columns::META, &awaited_chunks_key()) + .query_inner(columns::META, &AWAITED_CHUNKS_KEY) .unwrap_or_else(|| Vec::new()); let mut awaited_frontier: HashSet = HashSet::from_iter(awaited_frontier.into_iter()); - awaited_frontier.extend(candidates.iter().filter_map(|candidate| { - self.get_candidate(&candidate).map(|receipt| AwaitedFrontierEntry { + awaited_frontier.extend(candidates.iter().cloned().map(|candidate_hash| { + AwaitedFrontierEntry { relay_parent: relay_parent.clone(), - erasure_root: receipt.commitments.erasure_root, + candidate_hash, validator_index, - }) + } })); let awaited_frontier = Vec::from_iter(awaited_frontier.into_iter()); - tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode()); + tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode()); } let mut descendent_candidates = self.get_candidates_with_relay_parent(relay_parent); @@ -246,15 +244,12 @@ impl Store { let mut v = self.query_inner(columns::DATA, &dbkey).unwrap_or(Vec::new()); - let av_chunks_key = available_chunks_key( - &receipt.relay_parent, - &receipt.commitments.erasure_root, - ); + let av_chunks_key = available_chunks_key(candidate_hash); let mut have_chunks = self.query_inner(columns::META, &av_chunks_key).unwrap_or(Vec::new()); let awaited_frontier: Option> = self.query_inner( columns::META, - &awaited_chunks_key(), + &AWAITED_CHUNKS_KEY, ); for chunk in chunks.into_iter() { @@ -268,19 +263,21 @@ impl Store { awaited_frontier.retain(|entry| { !( entry.relay_parent == receipt.relay_parent && - entry.erasure_root == receipt.commitments.erasure_root && + &entry.candidate_hash == candidate_hash && have_chunks.contains(&entry.validator_index) ) }); - tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode()); + tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode()); } - // If therea are no block data in the store at this point, + // If there are no block data in the store at this point, // check that they can be reconstructed now and add them to store if they can. if self.execution_data(&candidate_hash).is_none() { if let Ok(available_data) = erasure::reconstruct( n_validators as usize, - v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize))) { + v.iter().map(|chunk| (chunk.chunk.as_ref(), chunk.index as usize)), + ) + { self.make_available(*candidate_hash, available_data)?; } } @@ -339,11 +336,11 @@ impl Store { let mut tx = DBTransaction::new(); let awaited_frontier: Option> = self - .query_inner(columns::META, &awaited_chunks_key()); + .query_inner(columns::META, &AWAITED_CHUNKS_KEY); if let Some(mut awaited_frontier) = awaited_frontier { awaited_frontier.retain(|entry| entry.relay_parent != relay_parent); - tx.put_vec(columns::META, &awaited_chunks_key(), awaited_frontier.encode()); + tx.put_vec(columns::META, &AWAITED_CHUNKS_KEY, awaited_frontier.encode()); } let candidates = self.get_candidates_with_relay_parent(&relay_parent); @@ -354,6 +351,8 @@ impl Store { tx.delete(columns::DATA, execution_data_key(&candidate).as_slice()); tx.delete(columns::DATA, &erasure_chunks_key(&candidate)); tx.delete(columns::DATA, &candidate_key(&candidate)); + + tx.delete(columns::META, &available_chunks_key(&candidate)); } self.inner.write(tx) @@ -576,7 +575,6 @@ mod tests { proof: Vec::new(), }; let candidates = vec![receipt_1_hash, receipt_2_hash]; - let erasure_roots = vec![erasure_root_1, erasure_root_2]; let store = Store::new_in_memory(); @@ -596,10 +594,9 @@ mod tests { let expected: HashSet<_> = candidates .clone() .into_iter() - .zip(erasure_roots.iter()) - .map(|(_c, &e)| AwaitedFrontierEntry { + .map(|c| AwaitedFrontierEntry { relay_parent, - erasure_root: e, + candidate_hash: c, validator_index, }) .collect(); @@ -612,7 +609,7 @@ mod tests { // Now we wait for the other chunk that we haven't received yet. let expected: HashSet<_> = vec![AwaitedFrontierEntry { relay_parent, - erasure_root: erasure_roots[1], + candidate_hash: receipt_2_hash, validator_index, }].into_iter().collect(); diff --git a/availability-store/src/worker.rs b/availability-store/src/worker.rs index d94f07fdb7ed..5cd8f1e9dfcf 100644 --- a/availability-store/src/worker.rs +++ b/availability-store/src/worker.rs @@ -38,11 +38,12 @@ use polkadot_primitives::parachain::{ ValidatorPair, ErasureChunk, }; use futures::{prelude::*, future::select, channel::{mpsc, oneshot}, task::{Spawn, SpawnExt}}; +use futures::future::AbortHandle; use keystore::KeyStorePtr; use tokio::runtime::{Handle, Runtime as LocalRuntime}; -use crate::{LOG_TARGET, ProvideGossipMessages, erasure_coding_topic}; +use crate::{LOG_TARGET, ErasureNetworking}; use crate::store::Store; /// Errors that may occur. @@ -52,8 +53,17 @@ pub(crate) enum Error { StoreError(io::Error), #[display(fmt = "Validator's id and number of validators at block with parent {} not found", relay_parent)] IdAndNValidatorsNotFound { relay_parent: Hash }, - #[display(fmt = "Candidate receipt with hash {} not found", candidate_hash)] - CandidateNotFound { candidate_hash: Hash }, +} + +/// Used in testing to interact with the worker thread. +#[cfg(test)] +pub(crate) struct WithWorker(Box); + +#[cfg(test)] +impl std::fmt::Debug for WithWorker { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "") + } } /// Messages sent to the `Worker`. @@ -66,10 +76,11 @@ pub(crate) enum Error { #[derive(Debug)] pub(crate) enum WorkerMsg { IncludedParachainBlocks(IncludedParachainBlocks), - ListenForChunks(ListenForChunks), Chunks(Chunks), CandidatesFinalized(CandidatesFinalized), MakeAvailable(MakeAvailable), + #[cfg(test)] + WithWorker(WithWorker), } /// A notification of a parachain block included in the relay chain. @@ -90,26 +101,15 @@ pub(crate) struct IncludedParachainBlocks { pub result: oneshot::Sender>, } -/// Listen gossip for these chunks. -#[derive(Debug)] -pub(crate) struct ListenForChunks { - /// The hash of the candidate chunk belongs to. - pub candidate_hash: Hash, - /// The index of the chunk we need. - pub index: u32, - /// A sender to signal the result asynchronously. - pub result: Option>>, -} - -/// We have received some chunks. +/// We have received chunks we requested. #[derive(Debug)] pub(crate) struct Chunks { - /// The relay parent of the block these chunks belong to. - pub relay_parent: Hash, /// The hash of the parachain candidate these chunks belong to. pub candidate_hash: Hash, - /// The chunks. + /// The chunks pub chunks: Vec, + /// The number of validators present at the candidate's relay-parent. + pub n_validators: u32, /// A sender to signal the result asynchronously. pub result: oneshot::Sender>, } @@ -134,20 +134,26 @@ pub(crate) struct MakeAvailable { pub result: oneshot::Sender>, } +/// Description of a chunk we are listening for. +#[derive(Hash, Debug, PartialEq, Eq)] +struct ListeningKey { + candidate_hash: Hash, + index: u32, +} + /// An availability worker with it's inner state. -pub(super) struct Worker { +pub(super) struct Worker { availability_store: Store, - provide_gossip_messages: PGM, - registered_gossip_streams: HashMap, + listening_for: HashMap, sender: mpsc::UnboundedSender, } /// The handle to the `Worker`. pub(super) struct WorkerHandle { - exit_signal: Option, thread: Option>>, sender: mpsc::UnboundedSender, + exit_signal: Option, } impl WorkerHandle { @@ -170,34 +176,6 @@ impl Drop for WorkerHandle { } } -async fn listen_for_chunks( - p: PGM, - topic: Hash, - mut sender: S -) -where - PGM: ProvideGossipMessages, - S: Sink + Unpin, -{ - trace!(target: LOG_TARGET, "Registering gossip listener for topic {}", topic); - let mut chunks_stream = p.gossip_messages_for(topic); - - while let Some(item) = chunks_stream.next().await { - let (s, _) = oneshot::channel(); - trace!(target: LOG_TARGET, "Received for {:?}", item); - let chunks = Chunks { - relay_parent: item.0, - candidate_hash: item.1, - chunks: vec![item.2], - result: s, - }; - - if let Err(_) = sender.send(WorkerMsg::Chunks(chunks)).await { - break; - } - } -} - fn fetch_candidates

(client: &P, extrinsics: Vec<::Extrinsic>, parent: &BlockId) -> ClientResult>> @@ -293,76 +271,90 @@ where } } -impl Drop for Worker { - fn drop(&mut self) { - for (_, signal) in self.registered_gossip_streams.drain() { - let _ = signal.fire(); - } - } -} +impl Worker { -impl Worker -where - PGM: ProvideGossipMessages + Clone + Send + 'static, -{ - - // Called on startup of the worker to register listeners for all awaited chunks. - fn register_listeners( + // Called on startup of the worker to initiate fetch from network for all awaited chunks. + fn initiate_all_fetches( &mut self, runtime_handle: &Handle, + erasure_network: &EN, sender: &mut mpsc::UnboundedSender, ) { if let Some(awaited_chunks) = self.availability_store.awaited_chunks() { for awaited_chunk in awaited_chunks { - if let Err(e) = self.register_chunks_listener( + if let Err(e) = self.initiate_fetch( runtime_handle, + erasure_network, sender, awaited_chunk.relay_parent, - awaited_chunk.erasure_root, + awaited_chunk.candidate_hash, ) { - warn!(target: LOG_TARGET, "Failed to register gossip listener: {}", e); + warn!(target: LOG_TARGET, "Failed to register network listener: {}", e); } } } } - fn register_chunks_listener( + // initiates a fetch from network for the described chunk, with our local index. + fn initiate_fetch( &mut self, runtime_handle: &Handle, + erasure_network: &EN, sender: &mut mpsc::UnboundedSender, relay_parent: Hash, - erasure_root: Hash, + candidate_hash: Hash, ) -> Result<(), Error> { - let (local_id, _) = self.availability_store + let (local_id, n_validators) = self.availability_store .get_validator_index_and_n_validators(&relay_parent) .ok_or(Error::IdAndNValidatorsNotFound { relay_parent })?; - let topic = erasure_coding_topic(relay_parent, erasure_root, local_id); + + // fast exit for if we already have the chunk. + if self.availability_store.get_erasure_chunk(&candidate_hash, local_id as _).is_some() { + return Ok(()) + } + trace!( target: LOG_TARGET, - "Registering listener for erasure chunks topic {} for ({}, {})", - topic, + "Initiating fetch for erasure-chunk at parent {} with candidate-hash {}", relay_parent, - erasure_root, + candidate_hash, ); - let (signal, exit) = exit_future::signal(); + let fut = erasure_network.fetch_erasure_chunk(&candidate_hash, local_id); + let mut sender = sender.clone(); + let (fut, signal) = future::abortable(async move { + let chunk = match fut.await { + Ok(chunk) => chunk, + Err(e) => { + warn!(target: LOG_TARGET, "Unable to fetch erasure-chunk from network: {:?}", e); + return + } + }; + let (s, _) = oneshot::channel(); + let _ = sender.send(WorkerMsg::Chunks(Chunks { + candidate_hash, + chunks: vec![chunk], + n_validators, + result: s, + })).await; + }.map(drop).boxed()); - let fut = listen_for_chunks( - self.provide_gossip_messages.clone(), - topic, - sender.clone(), - ); - self.registered_gossip_streams.insert(topic, signal); + let key = ListeningKey { + candidate_hash, + index: local_id, + }; - let _ = runtime_handle.spawn(select(fut.boxed(), exit).map(drop)); + self.listening_for.insert(key, signal); + let _ = runtime_handle.spawn(fut); Ok(()) } - fn on_parachain_blocks_received( + fn on_parachain_blocks_received( &mut self, runtime_handle: &Handle, + erasure_network: &EN, sender: &mut mpsc::UnboundedSender, blocks: Vec, ) -> Result<(), Error> { @@ -376,54 +368,47 @@ where // Should we be breaking block into chunks here and gossiping it and so on? } - if let Err(e) = self.register_chunks_listener( - runtime_handle, - sender, - candidate.relay_parent, - candidate.commitments.erasure_root, - ) { - warn!(target: LOG_TARGET, "Failed to register chunk listener: {}", e); - } - // This leans on the codebase-wide assumption that the `relay_parent` // of all candidates in a block matches the parent hash of that block. // // In the future this will not always be true. + let candidate_hash = candidate.hash(); let _ = self.availability_store.note_candidates_with_relay_parent( &candidate.relay_parent, - &[candidate.hash()], + &[candidate_hash], ); + + if let Err(e) = self.initiate_fetch( + runtime_handle, + erasure_network, + sender, + candidate.relay_parent, + candidate_hash, + ) { + warn!(target: LOG_TARGET, "Failed to register chunk listener: {}", e); + } } Ok(()) } - // Processes chunks messages that contain awaited items. - // - // When an awaited item is received, it is placed into the availability store - // and removed from the frontier. Listener de-registered. - fn on_chunks_received( + // Handles chunks that were required. + fn on_chunks( &mut self, - relay_parent: Hash, candidate_hash: Hash, chunks: Vec, + n_validators: u32, ) -> Result<(), Error> { - let (_, n_validators) = self.availability_store - .get_validator_index_and_n_validators(&relay_parent) - .ok_or(Error::IdAndNValidatorsNotFound { relay_parent })?; - - let receipt = self.availability_store.get_candidate(&candidate_hash) - .ok_or(Error::CandidateNotFound { candidate_hash })?; + for c in &chunks { + let key = ListeningKey { + candidate_hash, + index: c.index, + }; - for chunk in &chunks { - let topic = erasure_coding_topic( - relay_parent, - receipt.commitments.erasure_root, - chunk.index, - ); - // need to remove gossip listener and stop it. - if let Some(signal) = self.registered_gossip_streams.remove(&topic) { - let _ = signal.fire(); + // remove bookkeeping so network does not attempt to fetch + // any longer. + if let Some(exit_signal) = self.listening_for.remove(&key) { + exit_signal.abort(); } } @@ -436,47 +421,16 @@ where Ok(()) } - // Processes the `ListenForChunks` message. - // - // When the worker receives a `ListenForChunk` message, it double-checks that - // we don't have that piece, and then it registers a listener. - fn on_listen_for_chunks_received( - &mut self, - runtime_handle: &Handle, - sender: &mut mpsc::UnboundedSender, - candidate_hash: Hash, - id: usize - ) -> Result<(), Error> { - let candidate = self.availability_store.get_candidate(&candidate_hash) - .ok_or(Error::CandidateNotFound { candidate_hash })?; - - if self.availability_store - .get_erasure_chunk(&candidate_hash, id) - .is_none() { - if let Err(e) = self.register_chunks_listener( - runtime_handle, - sender, - candidate.relay_parent, - candidate.commitments.erasure_root - ) { - warn!(target: LOG_TARGET, "Failed to register a gossip listener: {}", e); - } - } - - Ok(()) - } - /// Starts a worker with a given availability store and a gossip messages provider. - pub fn start( + pub fn start( availability_store: Store, - provide_gossip_messages: PGM, + erasure_network: EN, ) -> WorkerHandle { let (sender, mut receiver) = mpsc::unbounded(); - let mut worker = Self { + let mut worker = Worker { availability_store, - provide_gossip_messages, - registered_gossip_streams: HashMap::new(), + listening_for: HashMap::new(), sender: sender.clone(), }; @@ -489,34 +443,15 @@ where let runtime_handle = runtime.handle().clone(); - // On startup, registers listeners (gossip streams) for all + // On startup, initiates fetch from network for all // entries in the awaited frontier. - worker.register_listeners(runtime.handle(), &mut sender); + worker.initiate_all_fetches(runtime.handle(), &erasure_network, &mut sender); let process_notification = async move { while let Some(msg) = receiver.next().await { trace!(target: LOG_TARGET, "Received message {:?}", msg); let res = match msg { - WorkerMsg::ListenForChunks(msg) => { - let ListenForChunks { - candidate_hash, - index, - result, - } = msg; - - let res = worker.on_listen_for_chunks_received( - &runtime_handle, - &mut sender, - candidate_hash, - index as usize, - ); - - if let Some(result) = result { - let _ = result.send(res); - } - Ok(()) - } WorkerMsg::IncludedParachainBlocks(msg) => { let IncludedParachainBlocks { blocks, @@ -525,6 +460,7 @@ where let res = worker.on_parachain_blocks_received( &runtime_handle, + &erasure_network, &mut sender, blocks, ); @@ -533,11 +469,17 @@ where Ok(()) } WorkerMsg::Chunks(msg) => { - let Chunks { relay_parent, candidate_hash, chunks, result } = msg; - let res = worker.on_chunks_received( - relay_parent, + let Chunks { + candidate_hash, + chunks, + n_validators, + result, + } = msg; + + let res = worker.on_chunks( candidate_hash, chunks, + n_validators, ); let _ = result.send(res); @@ -559,6 +501,11 @@ where let _ = result.send(res); Ok(()) } + #[cfg(test)] + WorkerMsg::WithWorker(with_worker) => { + (with_worker.0)(&mut worker); + Ok(()) + } }; if let Err(_) = res { @@ -569,7 +516,6 @@ where }; runtime.spawn(select(process_notification.boxed(), exit.clone()).map(drop)); - runtime.block_on(exit); info!(target: LOG_TARGET, "Availability worker exiting"); @@ -591,19 +537,16 @@ where /// /// [`BlockImport`]: https://substrate.dev/rustdocs/v1.0/substrate_consensus_common/trait.BlockImport.html pub struct AvailabilityBlockImport { - availability_store: Store, inner: I, client: Arc

, keystore: KeyStorePtr, to_worker: mpsc::UnboundedSender, - exit_signal: Option, + exit_signal: AbortHandle, } impl Drop for AvailabilityBlockImport { fn drop(&mut self) { - if let Some(signal) = self.exit_signal.take() { - let _ = signal.fire(); - } + self.exit_signal.abort(); } } @@ -653,24 +596,6 @@ impl BlockImport for AvailabilityBlockImport where candidates, ); - for candidate in &candidates { - let candidate_hash = candidate.hash(); - // If we don't yet have our chunk of this candidate, - // tell the worker to listen for one. - if self.availability_store.get_erasure_chunk( - &candidate_hash, - our_id as usize, - ).is_none() { - let msg = WorkerMsg::ListenForChunks(ListenForChunks { - candidate_hash, - index: our_id as u32, - result: None, - }); - - let _ = self.to_worker.unbounded_send(msg); - } - } - let (s, _) = oneshot::channel(); // Inform the worker about the included parachain blocks. @@ -714,7 +639,6 @@ impl BlockImport for AvailabilityBlockImport where impl AvailabilityBlockImport { pub(crate) fn new( - availability_store: Store, client: Arc

, block_import: I, spawner: impl Spawn, @@ -728,26 +652,21 @@ impl AvailabilityBlockImport { // Rust bug: https://github.com/rust-lang/rust/issues/24159 sp_api::StateBackendFor: sp_api::StateBackend>, { - let (signal, exit) = exit_future::signal(); - // This is not the right place to spawn the finality future, // it would be more appropriate to spawn it in the `start` method of the `Worker`. // However, this would make the type of the `Worker` and the `Store` itself // dependent on the types of client and executor, which would prove // not not so handy in the testing code. - let mut exit_signal = Some(signal); - let prune_available = select( - prune_unneeded_availability(client.clone(), to_worker.clone()).boxed(), - exit.clone() - ).map(drop); + let (prune_available, exit_signal) = future::abortable(prune_unneeded_availability( + client.clone(), + to_worker.clone(), + )); - if let Err(_) = spawner.spawn(prune_available) { + if let Err(_) = spawner.spawn(prune_available.map(drop)) { error!(target: LOG_TARGET, "Failed to spawn availability pruning task"); - exit_signal = None; } AvailabilityBlockImport { - availability_store, client, inner: block_import, to_worker, @@ -771,57 +690,56 @@ impl AvailabilityBlockImport { #[cfg(test)] mod tests { use super::*; - use std::time::Duration; - use futures::{stream, channel::mpsc, Stream}; - use std::sync::{Arc, Mutex, Condvar}; + use futures::channel::oneshot; + use std::sync::Arc; use std::pin::Pin; use tokio::runtime::Runtime; + use parking_lot::Mutex; use crate::store::AwaitedFrontierEntry; - // Just contains topic->channel mapping to give to outer code on `gossip_messages_for` calls. - struct TestGossipMessages { - messages: Arc, Condvar)>, - mpsc::UnboundedReceiver<(Hash, Hash, ErasureChunk)>, - ), + #[derive(Default, Clone)] + struct TestErasureNetwork { + chunk_receivers: Arc >>>, } - impl ProvideGossipMessages for TestGossipMessages { - fn gossip_messages_for(&self, topic: Hash) - -> Pin + Send>> + impl TestErasureNetwork { + // adds a receiver. this returns a sender for the erasure-chunk + // along with an exit future that fires when the erasure chunk has + // been fully-processed + fn add_receiver(&self, candidate_hash: Hash, index: u32) + -> oneshot::Sender { - match self.messages.lock().unwrap().remove(&topic) { - Some((pair, receiver)) => { - let (lock, cvar) = &*pair; - let mut consumed = lock.lock().unwrap(); - *consumed = true; - cvar.notify_one(); - receiver.boxed() - }, - None => stream::iter(vec![]).boxed(), + let (sender, receiver) = oneshot::channel(); + self.chunk_receivers.lock().insert((candidate_hash, index), receiver); + sender + } + } + + impl ErasureNetworking for TestErasureNetwork { + type Error = String; + + fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32) + -> Pin> + Send>> + { + match self.chunk_receivers.lock().remove(&(*candidate_hash, index)) { + Some(receiver) => receiver.then(|x| match x { + Ok(x) => future::ready(Ok(x)).left_future(), + Err(_) => future::pending().right_future(), + }).boxed(), + None => future::pending().boxed(), } } - fn gossip_erasure_chunk( + fn distribute_erasure_chunk( &self, - _relay_parent: Hash, _candidate_hash: Hash, - _erasure_root: Hash, _chunk: ErasureChunk ) {} } - impl Clone for TestGossipMessages { - fn clone(&self) -> Self { - TestGossipMessages { - messages: self.messages.clone(), - } - } - } - // This test tests that as soon as the worker receives info about new parachain blocks // included it registers gossip listeners for it's own chunks. Upon receiving the awaited // chunk messages the corresponding listeners are deregistered and these chunks are removed @@ -830,32 +748,22 @@ mod tests { fn receiving_gossip_chunk_removes_from_frontier() { let mut runtime = Runtime::new().unwrap(); let relay_parent = [1; 32].into(); - let erasure_root = [2; 32].into(); let local_id = 2; let n_validators = 4; let store = Store::new_in_memory(); - // Tell the store our validator's position and the number of validators at given point. - store.note_validator_index_and_n_validators(&relay_parent, local_id, n_validators).unwrap(); - - let (gossip_sender, gossip_receiver) = mpsc::unbounded(); - - let topic = erasure_coding_topic(relay_parent, erasure_root, local_id); - - let pair = Arc::new((Mutex::new(false), Condvar::new())); - let messages = TestGossipMessages { - messages: Arc::new(Mutex::new(vec![ - (topic, (pair.clone(), gossip_receiver)) - ].into_iter().collect())) - }; - let mut candidate = AbridgedCandidateReceipt::default(); - candidate.commitments.erasure_root = erasure_root; candidate.relay_parent = relay_parent; let candidate_hash = candidate.hash(); + // Tell the store our validator's position and the number of validators at given point. + store.note_validator_index_and_n_validators(&relay_parent, local_id, n_validators).unwrap(); + + let network = TestErasureNetwork::default(); + let chunk_sender = network.add_receiver(candidate_hash, local_id); + // At this point we shouldn't be waiting for any chunks. assert!(store.awaited_chunks().is_none()); @@ -869,7 +777,7 @@ mod tests { result: s, }); - let handle = Worker::start(store.clone(), messages); + let handle = Worker::start(store.clone(), network); // Tell the worker that the new blocks have been included into the relay chain. // This should trigger the registration of gossip message listeners for the @@ -883,30 +791,36 @@ mod tests { store.awaited_chunks().unwrap(), vec![AwaitedFrontierEntry { relay_parent, - erasure_root, + candidate_hash, validator_index: local_id, }].into_iter().collect() ); - let msg = ( - relay_parent, - candidate_hash, - ErasureChunk { - chunk: vec![1, 2, 3], - index: local_id as u32, - proof: vec![], + // Complete the chunk request. + chunk_sender.send(ErasureChunk { + chunk: vec![1, 2, 3], + index: local_id as u32, + proof: vec![], + }).unwrap(); + + // wait until worker thread has de-registered the listener for a + // particular chunk. + loop { + let (s, r) = oneshot::channel(); + handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| { + let key = ListeningKey { + candidate_hash, + index: local_id, + }; + + let is_waiting = worker.listening_for.contains_key(&key); + + s.send(!is_waiting).unwrap(); // tell the test thread `true` if we are not waiting. + })))).unwrap(); + + if runtime.block_on(r).unwrap() { + break } - ); - - // Send a gossip message with an awaited chunk - gossip_sender.unbounded_send(msg).unwrap(); - - // At the point the needed piece is received, the gossip listener for - // this topic is deregistered and it's receiver side is dropped. - // Wait for the sender side to become closed. - while !gossip_sender.is_closed() { - // Probably we can just .wait this somehow? - thread::sleep(Duration::from_millis(100)); } // The awaited chunk has been received so at this point we no longer wait for any chunks. @@ -914,7 +828,7 @@ mod tests { } #[test] - fn listen_for_chunk_registers_listener() { + fn included_parachain_blocks_registers_listener() { let mut runtime = Runtime::new().unwrap(); let relay_parent = [1; 32].into(); let erasure_root_1 = [2; 32].into(); @@ -956,63 +870,73 @@ mod tests { }], ).unwrap(); - let (_, gossip_receiver_1) = mpsc::unbounded(); - let (_, gossip_receiver_2) = mpsc::unbounded(); - - let topic_1 = erasure_coding_topic(relay_parent, erasure_root_1, local_id); - let topic_2 = erasure_coding_topic(relay_parent, erasure_root_2, local_id); - - let cvar_pair1 = Arc::new((Mutex::new(false), Condvar::new())); - let cvar_pair2 = Arc::new((Mutex::new(false), Condvar::new())); + let network = TestErasureNetwork::default(); + let _ = network.add_receiver(candidate_1_hash, local_id); + let _ = network.add_receiver(candidate_2_hash, local_id); - let messages = TestGossipMessages { - messages: Arc::new(Mutex::new( - vec![ - (topic_1, (cvar_pair1.clone(), gossip_receiver_1)), - (topic_2, (cvar_pair2, gossip_receiver_2)), - ].into_iter().collect())) - }; - - let handle = Worker::start(store.clone(), messages.clone()); - - let (s2, r2) = oneshot::channel(); - // Tell the worker to listen for chunks from candidate 2 (we alredy have a chunk from it). - let listen_msg_2 = WorkerMsg::ListenForChunks(ListenForChunks { - candidate_hash: candidate_2_hash, - index: local_id as u32, - result: Some(s2), - }); + let handle = Worker::start(store.clone(), network.clone()); - handle.sender.unbounded_send(listen_msg_2).unwrap(); - - runtime.block_on(r2).unwrap().unwrap(); - // The gossip sender for this topic left intact => listener not registered. - assert!(messages.messages.lock().unwrap().contains_key(&topic_2)); - - let (s1, r1) = oneshot::channel(); - - // Tell the worker to listen for chunks from candidate 1. - // (we don't have a chunk from it yet). - let listen_msg_1 = WorkerMsg::ListenForChunks(ListenForChunks { - candidate_hash: candidate_1_hash, - index: local_id as u32, - result: Some(s1), - }); - - handle.sender.unbounded_send(listen_msg_1).unwrap(); - runtime.block_on(r1).unwrap().unwrap(); - - // Here, we are racing against the worker thread that might have not yet - // reached the point when it requests the gossip messages for `topic_2` - // which will get them removed from `TestGossipMessages`. Therefore, the - // `Condvar` is used to wait for that event. - let (lock, cvar1) = &*cvar_pair1; - let mut gossip_stream_consumed = lock.lock().unwrap(); - while !*gossip_stream_consumed { - gossip_stream_consumed = cvar1.wait(gossip_stream_consumed).unwrap(); + { + let (s, r) = oneshot::channel(); + // Tell the worker to listen for chunks from candidate 2 (we alredy have a chunk from it). + let listen_msg_2 = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks { + blocks: vec![IncludedParachainBlock { + candidate: candidate_2, + available_data: None, + }], + result: s, + }); + + handle.sender.unbounded_send(listen_msg_2).unwrap(); + + runtime.block_on(r).unwrap().unwrap(); + // The receiver for this chunk left intact => listener not registered. + assert!(network.chunk_receivers.lock().contains_key(&(candidate_2_hash, local_id))); + + // more directly: + let (s, r) = oneshot::channel(); + handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| { + let key = ListeningKey { + candidate_hash: candidate_2_hash, + index: local_id, + }; + let _ = s.send(worker.listening_for.contains_key(&key)); + })))).unwrap(); + + assert!(!runtime.block_on(r).unwrap()); } - // The gossip sender taken => listener registered. - assert!(!messages.messages.lock().unwrap().contains_key(&topic_1)); + { + let (s, r) = oneshot::channel(); + + // Tell the worker to listen for chunks from candidate 1. + // (we don't have a chunk from it yet). + let listen_msg_1 = WorkerMsg::IncludedParachainBlocks(IncludedParachainBlocks { + blocks: vec![IncludedParachainBlock { + candidate: candidate_1, + available_data: None, + }], + result: s, + }); + + handle.sender.unbounded_send(listen_msg_1).unwrap(); + runtime.block_on(r).unwrap().unwrap(); + + // The receiver taken => listener registered. + assert!(!network.chunk_receivers.lock().contains_key(&(candidate_1_hash, local_id))); + + + // more directly: + let (s, r) = oneshot::channel(); + handle.sender.unbounded_send(WorkerMsg::WithWorker(WithWorker(Box::new(move |worker| { + let key = ListeningKey { + candidate_hash: candidate_1_hash, + index: local_id, + }; + let _ = s.send(worker.listening_for.contains_key(&key)); + })))).unwrap(); + + assert!(runtime.block_on(r).unwrap()); + } } } diff --git a/cli/src/command.rs b/cli/src/command.rs index c731124f8bb6..489e655fe004 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -127,12 +127,13 @@ where service::Roles::LIGHT => sc_cli::run_service_until_exit( config, - |config| service::new_light::(config, None), + |config| service::new_light::(config), ), _ => sc_cli::run_service_until_exit( config, - |config| service::new_full::(config, None, None, authority_discovery_enabled, 6000), + |config| service::new_full::(config, None, None, authority_discovery_enabled, 6000) + .map(|(s, _)| s), ), } } diff --git a/collator/src/lib.rs b/collator/src/lib.rs index 0886925adaf7..be61d9b2d159 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -48,6 +48,7 @@ use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Duration; +use std::pin::Pin; use futures::{future, Future, Stream, FutureExt, TryFutureExt, StreamExt, task::Spawn}; use log::warn; @@ -63,10 +64,9 @@ use polkadot_primitives::{ }; use polkadot_cli::{ ProvideRuntimeApi, AbstractService, ParachainHost, IsKusama, - service::{self, Roles, SelectChain} + service::{self, Roles} }; -use polkadot_network::legacy::validation::ValidationNetwork; - +use polkadot_network::PolkadotProtocol; pub use polkadot_cli::{VersionInfo, load_spec, service::Configuration}; pub use polkadot_validation::SignedStatement; pub use polkadot_primitives::parachain::CollatorId; @@ -77,30 +77,17 @@ const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); /// An abstraction over the `Network` with useful functions for a `Collator`. pub trait Network: Send + Sync { - /// Convert the given `CollatorId` to a `PeerId`. - fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> - Box> + Send>; - /// Create a `Stream` of checked statements for the given `relay_parent`. /// /// The returned stream will not terminate, so it is required to make sure that the stream is /// dropped when it is not required anymore. Otherwise, it will stick around in memory /// infinitely. - fn checked_statements(&self, relay_parent: Hash) -> Box>; + fn checked_statements(&self, relay_parent: Hash) -> Pin>>; } -impl Network for ValidationNetwork where - P: 'static + Send + Sync, - SP: 'static + Spawn + Clone + Send + Sync, -{ - fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> - Box> + Send> - { - Box::new(Self::collator_id_to_peer_id(self, collator_id)) - } - - fn checked_statements(&self, relay_parent: Hash) -> Box> { - Box::new(Self::checked_statements(self, relay_parent)) +impl Network for polkadot_network::protocol::Service { + fn checked_statements(&self, relay_parent: Hash) -> Pin>> { + polkadot_network::protocol::Service::checked_statements(self, relay_parent) } } @@ -139,7 +126,7 @@ pub trait BuildParachainContext { self, client: Arc>, spawner: SP, - network: Arc, + network: impl Network + Clone + 'static, ) -> Result where PolkadotClient: ProvideRuntimeApi, @@ -219,13 +206,13 @@ pub async fn collate

( } fn build_collator_service( - service: S, + service: (S, polkadot_service::FullNodeHandles), para_id: ParaId, key: Arc, build_parachain_context: P, ) -> Result where - S: AbstractService, + S: AbstractService, sc_client::Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: RuntimeApiCollection< @@ -247,52 +234,22 @@ fn build_collator_service( ::ProduceCandidate: Send, Extrinsic: service::Codec + Send + Sync + 'static, { + let (service, handles) = service; let spawner = service.spawn_task_handle(); - let client = service.client(); - let network = service.network(); - let known_oracle = client.clone(); - let select_chain = if let Some(select_chain) = service.select_chain() { - select_chain - } else { - return Err("The node cannot work because it can't select chain.".into()) - }; - - let is_known = move |block_hash: &Hash| { - use consensus_common::BlockStatus; - use polkadot_network::legacy::gossip::Known; - - match known_oracle.block_status(&BlockId::hash(*block_hash)) { - Err(_) | Ok(BlockStatus::Unknown) | Ok(BlockStatus::Queued) => None, - Ok(BlockStatus::KnownBad) => Some(Known::Bad), - Ok(BlockStatus::InChainWithState) | Ok(BlockStatus::InChainPruned) => - match select_chain.leaves() { - Err(_) => None, - Ok(leaves) => if leaves.contains(block_hash) { - Some(Known::Leaf) - } else { - Some(Known::Old) - }, - } - } + let polkadot_network = match handles.polkadot_network { + None => return Err( + "Collator cannot run when Polkadot-specific networking has not been started".into() + ), + Some(n) => n, }; - let message_validator = polkadot_network::legacy::gossip::register_validator( - network.clone(), - (is_known, client.clone()), - &spawner, - ); - - let validation_network = Arc::new(ValidationNetwork::new( - message_validator, - client.clone(), - spawner.clone(), - )); + let client = service.client(); let parachain_context = match build_parachain_context.build( client.clone(), spawner, - validation_network.clone(), + polkadot_network.clone(), ) { Ok(ctx) => ctx, Err(()) => { @@ -318,7 +275,7 @@ fn build_collator_service( let relay_parent = notification.hash; let id = BlockId::hash(relay_parent); - let network = network.clone(); + let network = polkadot_network.clone(); let client = client.clone(); let key = key.clone(); let parachain_context = parachain_context.clone(); @@ -345,14 +302,7 @@ fn build_collator_service( parachain_context, key, ).map_ok(move |collation| { - network.with_spec(move |spec, ctx| { - spec.add_local_collation( - ctx, - relay_parent, - targets, - collation, - ); - }) + network.distribute_collation(targets, collation) }); future::Either::Right(collation_work) @@ -395,13 +345,9 @@ where ::ProduceCandidate: Send, { match (config.expect_chain_spec().is_kusama(), config.roles) { - (true, Roles::LIGHT) => - build_collator_service( - service::kusama_new_light(config, Some((key.public(), para_id)))?, - para_id, - key, - build_parachain_context, - )?.await, + (_, Roles::LIGHT) => return Err( + polkadot_service::Error::Other("light nodes are unsupported as collator".into()) + ).into(), (true, _) => build_collator_service( service::kusama_new_full(config, Some((key.public(), para_id)), None, false, 6000)?, @@ -409,13 +355,6 @@ where key, build_parachain_context, )?.await, - (false, Roles::LIGHT) => - build_collator_service( - service::polkadot_new_light(config, Some((key.public(), para_id)))?, - para_id, - key, - build_parachain_context, - )?.await, (false, _) => build_collator_service( service::polkadot_new_full(config, Some((key.public(), para_id)), None, false, 6000)?, @@ -451,15 +390,9 @@ pub fn run_collator

( ::ProduceCandidate: Send, { match (config.expect_chain_spec().is_kusama(), config.roles) { - (true, Roles::LIGHT) => - sc_cli::run_service_until_exit(config, |config| { - build_collator_service( - service::kusama_new_light(config, Some((key.public(), para_id)))?, - para_id, - key, - build_parachain_context, - ) - }), + (_, Roles::LIGHT) => return Err( + polkadot_cli::Error::Input("light nodes are unsupported as collator".into()) + ).into(), (true, _) => sc_cli::run_service_until_exit(config, |config| { build_collator_service( @@ -469,15 +402,6 @@ pub fn run_collator

( build_parachain_context, ) }), - (false, Roles::LIGHT) => - sc_cli::run_service_until_exit(config, |config| { - build_collator_service( - service::polkadot_new_light(config, Some((key.public(), para_id)))?, - para_id, - key, - build_parachain_context, - ) - }), (false, _) => sc_cli::run_service_until_exit(config, |config| { build_collator_service( diff --git a/network/src/legacy/gossip/attestation.rs b/network/src/legacy/gossip/attestation.rs index e019859695dd..b6023437cae0 100644 --- a/network/src/legacy/gossip/attestation.rs +++ b/network/src/legacy/gossip/attestation.rs @@ -38,10 +38,10 @@ use polkadot_primitives::Hash; use std::collections::{HashMap, HashSet}; use log::warn; -use crate::legacy::router::attestation_topic; -use super::{cost, benefit, MAX_CHAIN_HEADS, LeavesVec, - ChainContext, Known, MessageValidationData, GossipStatement +use super::{ + cost, benefit, attestation_topic, MAX_CHAIN_HEADS, LeavesVec, + ChainContext, Known, MessageValidationData, GossipStatement, }; // knowledge about attestations on a single parent-hash. diff --git a/network/src/legacy/gossip/mod.rs b/network/src/legacy/gossip/mod.rs index b6ee29c84a69..d086f7f33ce4 100644 --- a/network/src/legacy/gossip/mod.rs +++ b/network/src/legacy/gossip/mod.rs @@ -51,7 +51,7 @@ use sp_runtime::traits::{BlakeTwo256, Hash as HashT}; use sp_blockchain::Error as ClientError; -use sc_network::{config::Roles, Context, PeerId, ReputationChange}; +use sc_network::{config::Roles, PeerId, ReputationChange}; use sc_network::{NetworkService as SubstrateNetworkService, specialization::NetworkSpecialization}; use sc_network_gossip::{ ValidationResult as GossipValidationResult, @@ -73,7 +73,7 @@ use arrayvec::ArrayVec; use futures::prelude::*; use parking_lot::RwLock; -use crate::legacy::{GossipMessageStream, NetworkService, GossipService, PolkadotProtocol, router::attestation_topic}; +use crate::legacy::{GossipMessageStream, GossipService}; use attestation::{View as AttestationView, PeerData as AttestationPeerData}; @@ -175,8 +175,6 @@ impl GossipStatement { pub struct ErasureChunkMessage { /// The chunk itself. pub chunk: PrimitiveChunk, - /// The relay parent of the block this chunk belongs to. - pub relay_parent: Hash, /// The hash of the candidate receipt of the block this chunk belongs to. pub candidate_hash: Hash, } @@ -255,6 +253,15 @@ impl ChainContext for (F, P) where } } + +/// Compute the gossip topic for attestations on the given parent hash. +pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { + let mut v = parent_hash.as_ref().to_vec(); + v.extend(b"attestations"); + + BlakeTwo256::hash(&v[..]) +} + /// Register a gossip validator on the network service. // NOTE: since RegisteredMessageValidator is meant to be a type-safe proof // that we've actually done the registration, this should be the only way @@ -355,24 +362,9 @@ impl> Clone for RegisteredMessageValidator { } } -impl RegisteredMessageValidator { - #[cfg(test)] - pub(crate) fn new_test( - chain: C, - report_handle: Box, - ) -> Self { - let validator = Arc::new(MessageValidator::new_test(chain, report_handle)); - - RegisteredMessageValidator { - inner: validator as _, - service: None, - gossip_engine: None, - } - } -} - impl> RegisteredMessageValidator { - pub fn register_availability_store(&mut self, availability_store: av_store::Store) { + /// Register an availabilty store the gossip service can query. + pub(crate) fn register_availability_store(&self, availability_store: av_store::Store) { self.inner.inner.write().availability_store = Some(availability_store); } @@ -459,18 +451,6 @@ impl> GossipService for RegisteredMessageValidat } } -impl NetworkService for RegisteredMessageValidator { - fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn Context) - { - if let Some(service) = self.service.as_ref() { - service.with_spec(with) - } else { - log::error!("Called with_spec on a test engine"); - } - } -} - /// The data needed for validating gossip messages. #[derive(Default)] pub(crate) struct MessageValidationData { @@ -555,15 +535,13 @@ impl Inner { } else { if let Some(awaited_chunks) = store.awaited_chunks() { let frontier_entry = av_store::AwaitedFrontierEntry { - relay_parent: msg.relay_parent, - erasure_root: receipt.commitments.erasure_root, + candidate_hash: msg.candidate_hash, + relay_parent: receipt.relay_parent, validator_index: msg.chunk.index, }; if awaited_chunks.contains(&frontier_entry) { - let topic = av_store::erasure_coding_topic( - msg.relay_parent, - receipt.commitments.erasure_root, - msg.chunk.index, + let topic = crate::erasure_coding_topic( + &msg.candidate_hash ); return ( @@ -728,8 +706,6 @@ mod tests { use sp_core::sr25519::Signature as Sr25519Signature; use polkadot_validation::GenericStatement; - use crate::legacy::tests::TestChainContext; - #[derive(PartialEq, Clone, Debug)] enum ContextEvent { BroadcastTopic(Hash, bool), @@ -764,6 +740,28 @@ mod tests { } } + #[derive(Default)] + struct TestChainContext { + known_map: HashMap, + ingress_roots: HashMap>, + } + + impl ChainContext for TestChainContext { + fn is_known(&self, block_hash: &Hash) -> Option { + self.known_map.get(block_hash).map(|x| x.clone()) + } + + fn leaf_unrouted_roots(&self, leaf: &Hash, with_queue_root: &mut dyn FnMut(&Hash)) + -> Result<(), sp_blockchain::Error> + { + for root in self.ingress_roots.get(leaf).into_iter().flat_map(|roots| roots) { + with_queue_root(root) + } + + Ok(()) + } + } + #[test] fn message_allowed() { let (tx, _rx) = mpsc::channel(); @@ -955,55 +953,4 @@ mod tests { assert!(message_allowed(&peer_a, MessageIntent::Broadcast, &topic_a, &encoded[..])); } } - - #[test] - fn multicasts_icmp_queues_when_building_on_new_leaf() { - let (tx, _rx) = mpsc::channel(); - let tx = Mutex::new(tx); - let report_handle = Box::new(move |peer: &PeerId, cb: ReputationChange| tx.lock().send((peer.clone(), cb)).unwrap()); - - let hash_a = [1u8; 32].into(); - let root_a = [11u8; 32].into(); - - let chain = { - let mut chain = TestChainContext::default(); - chain.known_map.insert(hash_a, Known::Leaf); - chain.ingress_roots.insert(hash_a, vec![root_a]); - chain - }; - - let validator = RegisteredMessageValidator::new_test(chain, report_handle); - - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - - let mut validator_context = MockValidatorContext::default(); - validator.inner.new_peer(&mut validator_context, &peer_a, Roles::FULL); - validator.inner.new_peer(&mut validator_context, &peer_b, Roles::FULL); - assert!(validator_context.events.is_empty()); - validator_context.clear(); - - - { - let message = GossipMessage::from(NeighborPacket { - chain_heads: vec![hash_a], - }).encode(); - let res = validator.inner.validate( - &mut validator_context, - &peer_a, - &message[..], - ); - - match res { - GossipValidationResult::Discard => {}, - _ => panic!("wrong result"), - } - assert_eq!( - validator_context.events, - vec![ - ContextEvent::SendTopic(peer_a.clone(), attestation_topic(hash_a), false), - ], - ); - } - } } diff --git a/network/src/legacy/mod.rs b/network/src/legacy/mod.rs index 3e65263bc9d0..6167749f538c 100644 --- a/network/src/legacy/mod.rs +++ b/network/src/legacy/mod.rs @@ -21,43 +21,19 @@ pub mod collator_pool; pub mod local_collations; -pub mod router; -pub mod validation; pub mod gossip; -use codec::{Decode, Encode}; -use futures::channel::oneshot; +use codec::Decode; use futures::prelude::*; -use polkadot_primitives::{Block, Hash, Header}; -use polkadot_primitives::parachain::{ - Id as ParaId, CollatorId, AbridgedCandidateReceipt, Collation, PoVBlock, - ValidatorId, ErasureChunk, -}; -use sc_network::{ - PeerId, Context, StatusMessage as GenericFullStatus, - specialization::NetworkSpecialization as Specialization, -}; +use polkadot_primitives::Hash; +use sc_network::PeerId; use sc_network_gossip::TopicNotification; -use self::validation::{LiveValidationLeaves, RecentValidatorIds, InsertedRecentKey}; -use self::collator_pool::{CollatorPool, Role, Action}; -use self::local_collations::LocalCollations; -use log::{trace, debug, warn}; +use log::debug; -use std::collections::{HashMap, HashSet}; use std::pin::Pin; use std::task::{Context as PollContext, Poll}; -use self::gossip::{GossipMessage, ErasureChunkMessage, RegisteredMessageValidator}; -use crate::{cost, benefit}; - -#[cfg(test)] -mod tests; - -type FullStatus = GenericFullStatus; -type RequestId = u64; - -/// Specialization of the network service for the polkadot protocol. -pub type PolkadotNetworkService = sc_network::NetworkService; +use self::gossip::GossipMessage; /// Basic gossip functionality that a network has to fulfill. pub trait GossipService: Send + Sync + 'static { @@ -71,58 +47,6 @@ pub trait GossipService: Send + Sync + 'static { fn send_message(&self, who: PeerId, message: GossipMessage); } -/// Basic functionality that a network has to fulfill. -pub trait NetworkService: GossipService + Send + Sync + 'static { - /// Execute a closure with the polkadot protocol. - fn with_spec(&self, with: F) - where Self: Sized, F: FnOnce(&mut PolkadotProtocol, &mut dyn Context); -} - -/// This is a newtype that implements a [`ProvideGossipMessages`] shim trait. -/// -/// For any wrapped [`NetworkService`] type it implements a [`ProvideGossipMessages`]. -/// For more details see documentation of [`ProvideGossipMessages`]. -/// -/// [`NetworkService`]: ./trait.NetworkService.html -/// [`ProvideGossipMessages`]: ../polkadot_availability_store/trait.ProvideGossipMessages.html -#[derive(Clone)] -pub struct AvailabilityNetworkShim(pub RegisteredMessageValidator); - -impl av_store::ProvideGossipMessages for AvailabilityNetworkShim { - fn gossip_messages_for(&self, topic: Hash) - -> Pin + Send>> - { - self.0.gossip_messages_for(topic) - .filter_map(|(msg, _)| async move { - match msg { - GossipMessage::ErasureChunk(chunk) => { - Some((chunk.relay_parent, chunk.candidate_hash, chunk.chunk)) - }, - _ => None, - } - }) - .boxed() - } - - fn gossip_erasure_chunk( - &self, - relay_parent: Hash, - candidate_hash: Hash, - erasure_root: Hash, - chunk: ErasureChunk - ) { - let topic = av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index); - self.0.gossip_message( - topic, - GossipMessage::ErasureChunk(ErasureChunkMessage { - chunk, - relay_parent, - candidate_hash, - }) - ) - } -} - /// A stream of gossip messages and an optional sender for a topic. pub struct GossipMessageStream { topic_stream: Pin + Send>>, @@ -157,610 +81,3 @@ impl Stream for GossipMessageStream { } } } - -/// Status of a Polkadot node. -#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] -pub struct Status { - collating_for: Option<(CollatorId, ParaId)>, -} - -struct PoVBlockRequest { - attempted_peers: HashSet, - validation_leaf: Hash, - candidate_hash: Hash, - pov_block_hash: Hash, - sender: oneshot::Sender, -} - -impl PoVBlockRequest { - // Attempt to process a response. If the provided block is invalid, - // this returns an error result containing the unmodified request. - // - // If `Ok(())` is returned, that indicates that the request has been processed. - fn process_response(self, pov_block: PoVBlock) -> Result<(), Self> { - if pov_block.hash() != self.pov_block_hash { - return Err(self); - } - - Ok(()) - } -} - -// ensures collator-protocol messages are sent in correct order. -// session key must be sent before collator role. -enum CollatorState { - Fresh, - RolePending(Role), - Primed(Option), -} - -impl CollatorState { - fn send_key(&mut self, key: ValidatorId, mut f: F) { - f(Message::ValidatorId(key)); - if let CollatorState::RolePending(role) = *self { - f(Message::CollatorRole(role)); - *self = CollatorState::Primed(Some(role)); - } - } - - fn set_role(&mut self, role: Role, mut f: F) { - if let CollatorState::Primed(ref mut r) = *self { - f(Message::CollatorRole(role)); - *r = Some(role); - } else { - *self = CollatorState::RolePending(role); - } - } - - fn role(&self) -> Option { - match *self { - CollatorState::Fresh => None, - CollatorState::RolePending(role) => Some(role), - CollatorState::Primed(role) => role, - } - } -} - -struct PeerInfo { - collating_for: Option<(CollatorId, ParaId)>, - validator_keys: RecentValidatorIds, - claimed_validator: bool, - collator_state: CollatorState, -} - -impl PeerInfo { - fn should_send_key(&self) -> bool { - self.claimed_validator || self.collating_for.is_some() - } -} - -/// Polkadot-specific messages. -#[derive(Debug, Encode, Decode)] -pub enum Message { - /// As a validator, tell the peer your current session key. - // TODO: do this with a cryptographic proof of some kind - // https://github.com/paritytech/polkadot/issues/47 - ValidatorId(ValidatorId), - /// Requesting parachain proof-of-validation block (relay_parent, candidate_hash). - RequestPovBlock(RequestId, Hash, Hash), - /// Provide requested proof-of-validation block data by candidate hash or nothing if unknown. - PovBlock(RequestId, Option), - /// Tell a collator their role. - CollatorRole(Role), - /// A collation provided by a peer. Relay parent and collation. - Collation(Hash, Collation), -} - -fn send_polkadot_message(ctx: &mut dyn Context, to: PeerId, message: Message) { - trace!(target: "p_net", "Sending polkadot message to {}: {:?}", to, message); - let encoded = message.encode(); - ctx.send_chain_specific(to, encoded) -} - -/// Polkadot protocol attachment for substrate. -pub struct PolkadotProtocol { - peers: HashMap, - collating_for: Option<(CollatorId, ParaId)>, - collators: CollatorPool, - validators: HashMap, - local_collations: LocalCollations, - live_validation_leaves: LiveValidationLeaves, - in_flight: HashMap<(RequestId, PeerId), PoVBlockRequest>, - pending: Vec, - availability_store: Option, - next_req_id: u64, -} - -impl PolkadotProtocol { - /// Instantiate a polkadot protocol handler. - pub fn new(collating_for: Option<(CollatorId, ParaId)>) -> Self { - PolkadotProtocol { - peers: HashMap::new(), - collators: CollatorPool::new(), - collating_for, - validators: HashMap::new(), - local_collations: LocalCollations::new(), - live_validation_leaves: LiveValidationLeaves::new(), - in_flight: HashMap::new(), - pending: Vec::new(), - availability_store: None, - next_req_id: 1, - } - } - - /// Fetch block data by candidate receipt. - fn fetch_pov_block( - &mut self, - ctx: &mut dyn Context, - candidate: &AbridgedCandidateReceipt, - relay_parent: Hash, - ) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - - self.pending.push(PoVBlockRequest { - attempted_peers: Default::default(), - validation_leaf: relay_parent, - candidate_hash: candidate.hash(), - pov_block_hash: candidate.pov_block_hash, - sender: tx, - }); - - self.dispatch_pending_requests(ctx); - rx - } - - /// Note new leaf to do validation work at - fn new_validation_leaf_work( - &mut self, - ctx: &mut dyn Context, - params: validation::LeafWorkParams, - ) -> validation::LiveValidationLeaf { - - let (work, new_local) = self.live_validation_leaves - .new_validation_leaf(params); - - if let Some(new_local) = new_local { - for (id, peer_data) in self.peers.iter_mut() - .filter(|&(_, ref info)| info.should_send_key()) - { - peer_data.collator_state.send_key(new_local.clone(), |msg| send_polkadot_message( - ctx, - id.clone(), - msg - )); - } - } - - work - } - - // true indicates that it was removed actually. - fn remove_validation_session(&mut self, parent_hash: Hash) -> bool { - self.live_validation_leaves.remove(parent_hash) - } - - fn dispatch_pending_requests(&mut self, ctx: &mut dyn Context) { - let mut new_pending = Vec::new(); - let validator_keys = &mut self.validators; - let next_req_id = &mut self.next_req_id; - let in_flight = &mut self.in_flight; - - for mut pending in ::std::mem::replace(&mut self.pending, Vec::new()) { - let parent = pending.validation_leaf; - let c_hash = pending.candidate_hash; - - let still_pending = self.live_validation_leaves.with_pov_block(&parent, &c_hash, |x| match x { - Ok(data @ &_) => { - // answer locally. - let _ = pending.sender.send(data.clone()); - None - } - Err(Some(known_keys)) => { - let next_peer = known_keys.iter() - .filter_map(|x| validator_keys.get(x).map(|id| (x.clone(), id.clone()))) - .find(|&(ref key, _)| pending.attempted_peers.insert(key.clone())) - .map(|(_, id)| id); - - // dispatch to peer - if let Some(who) = next_peer { - let req_id = *next_req_id; - *next_req_id += 1; - - send_polkadot_message( - ctx, - who.clone(), - Message::RequestPovBlock(req_id, parent, c_hash), - ); - - in_flight.insert((req_id, who), pending); - - None - } else { - Some(pending) - } - } - Err(None) => None, // no such known validation leaf-work. prune out. - }); - - if let Some(pending) = still_pending { - new_pending.push(pending); - } - } - - self.pending = new_pending; - } - - fn on_polkadot_message(&mut self, ctx: &mut dyn Context, who: PeerId, msg: Message) { - trace!(target: "p_net", "Polkadot message from {}: {:?}", who, msg); - match msg { - Message::ValidatorId(key) => self.on_session_key(ctx, who, key), - Message::RequestPovBlock(req_id, relay_parent, candidate_hash) => { - let pov_block = self.live_validation_leaves.with_pov_block( - &relay_parent, - &candidate_hash, - |res| res.ok().map(|b| b.clone()), - ); - - send_polkadot_message(ctx, who, Message::PovBlock(req_id, pov_block)); - } - Message::PovBlock(req_id, data) => self.on_pov_block(ctx, who, req_id, data), - Message::Collation(relay_parent, collation) => self.on_collation(ctx, who, relay_parent, collation), - Message::CollatorRole(role) => self.on_new_role(ctx, who, role), - } - } - - fn on_session_key(&mut self, ctx: &mut dyn Context, who: PeerId, key: ValidatorId) { - { - let info = match self.peers.get_mut(&who) { - Some(peer) => peer, - None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); - return - } - }; - - if !info.claimed_validator { - ctx.report_peer(who, cost::UNEXPECTED_MESSAGE); - return; - } - ctx.report_peer(who.clone(), benefit::EXPECTED_MESSAGE); - - let local_collations = &mut self.local_collations; - let new_collations = match info.validator_keys.insert(key.clone()) { - InsertedRecentKey::AlreadyKnown => Vec::new(), - InsertedRecentKey::New(Some(old_key)) => { - self.validators.remove(&old_key); - local_collations.fresh_key(&old_key, &key) - } - InsertedRecentKey::New(None) => info.collator_state.role() - .map(|r| local_collations.note_validator_role(key.clone(), r)) - .unwrap_or_else(Vec::new), - }; - - for (relay_parent, collation) in new_collations { - send_polkadot_message( - ctx, - who.clone(), - Message::Collation(relay_parent, collation), - ) - } - - self.validators.insert(key, who); - } - - self.dispatch_pending_requests(ctx); - } - - fn on_pov_block( - &mut self, - ctx: &mut dyn Context, - who: PeerId, - req_id: RequestId, - pov_block: Option, - ) { - match self.in_flight.remove(&(req_id, who.clone())) { - Some(mut req) => { - match pov_block { - Some(pov_block) => { - match req.process_response(pov_block) { - Ok(()) => { - ctx.report_peer(who, benefit::GOOD_POV_BLOCK); - return; - } - Err(r) => { - ctx.report_peer(who, cost::BAD_POV_BLOCK); - req = r; - } - } - }, - None => { - ctx.report_peer(who, benefit::EXPECTED_MESSAGE); - } - } - - self.pending.push(req); - self.dispatch_pending_requests(ctx); - } - None => ctx.report_peer(who, cost::UNEXPECTED_MESSAGE), - } - } - - // when a validator sends us (a collator) a new role. - fn on_new_role(&mut self, ctx: &mut dyn Context, who: PeerId, role: Role) { - let info = match self.peers.get_mut(&who) { - Some(peer) => peer, - None => { - trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", who); - return - } - }; - - debug!(target: "p_net", "New collator role {:?} from {}", role, who); - - if info.validator_keys.as_slice().is_empty() { - ctx.report_peer(who, cost::UNEXPECTED_ROLE) - } else { - // update role for all saved session keys for this validator. - let local_collations = &mut self.local_collations; - for (relay_parent, collation) in info.validator_keys - .as_slice() - .iter() - .cloned() - .flat_map(|k| local_collations.note_validator_role(k, role)) - { - debug!(target: "p_net", "Broadcasting collation on relay parent {:?}", relay_parent); - send_polkadot_message( - ctx, - who.clone(), - Message::Collation(relay_parent, collation), - ) - } - } - } - - /// Convert the given `CollatorId` to a `PeerId`. - pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> { - self.collators.collator_id_to_peer_id(collator_id) - } -} - -impl Specialization for PolkadotProtocol { - fn status(&self) -> Vec { - Status { collating_for: self.collating_for.clone() }.encode() - } - - fn on_connect(&mut self, ctx: &mut dyn Context, who: PeerId, status: FullStatus) { - let local_status = Status::decode(&mut &status.chain_status[..]) - .unwrap_or_else(|_| Status { collating_for: None }); - - let validator = status.roles.contains(sc_network::config::Roles::AUTHORITY); - - let mut peer_info = PeerInfo { - collating_for: local_status.collating_for.clone(), - validator_keys: Default::default(), - claimed_validator: validator, - collator_state: CollatorState::Fresh, - }; - - if let Some((ref acc_id, ref para_id)) = local_status.collating_for { - if self.collator_peer(acc_id.clone()).is_some() { - ctx.report_peer(who, cost::COLLATOR_ALREADY_KNOWN); - return - } - ctx.report_peer(who.clone(), benefit::NEW_COLLATOR); - - let collator_role = self.collators.on_new_collator( - acc_id.clone(), - para_id.clone(), - who.clone(), - ); - - peer_info.collator_state.set_role(collator_role, |msg| send_polkadot_message( - ctx, - who.clone(), - msg, - )); - } - - // send session keys. - if peer_info.should_send_key() { - for local_session_key in self.live_validation_leaves.recent_keys() { - peer_info.collator_state.send_key(local_session_key.clone(), |msg| send_polkadot_message( - ctx, - who.clone(), - msg, - )); - } - } - - self.peers.insert(who, peer_info); - self.dispatch_pending_requests(ctx); - } - - fn on_disconnect(&mut self, ctx: &mut dyn Context, who: PeerId) { - if let Some(info) = self.peers.remove(&who) { - if let Some((acc_id, _)) = info.collating_for { - let new_primary = self.collators.on_disconnect(acc_id) - .and_then(|new_primary| self.collator_peer(new_primary)); - - if let Some((new_primary, primary_info)) = new_primary { - primary_info.collator_state.set_role(Role::Primary, |msg| send_polkadot_message( - ctx, - new_primary.clone(), - msg, - )); - } - } - - for key in info.validator_keys.as_slice().iter() { - self.validators.remove(key); - self.local_collations.on_disconnect(key); - } - - { - let pending = &mut self.pending; - self.in_flight.retain(|&(_, ref peer), val| { - let retain = peer != &who; - if !retain { - // swap with a dummy value which will be dropped immediately. - let (sender, _) = oneshot::channel(); - pending.push(::std::mem::replace(val, PoVBlockRequest { - attempted_peers: Default::default(), - validation_leaf: Default::default(), - candidate_hash: Default::default(), - pov_block_hash: Default::default(), - sender, - })); - } - - retain - }); - } - self.dispatch_pending_requests(ctx); - } - } - - fn on_message( - &mut self, - ctx: &mut dyn Context, - who: PeerId, - message: Vec, - ) { - match Message::decode(&mut &message[..]) { - Ok(msg) => { - ctx.report_peer(who.clone(), benefit::VALID_FORMAT); - self.on_polkadot_message(ctx, who, msg) - }, - Err(_) => { - trace!(target: "p_net", "Bad message from {}", who); - ctx.report_peer(who, cost::INVALID_FORMAT); - } - } - } - - fn maintain_peers(&mut self, ctx: &mut dyn Context) { - self.collators.collect_garbage(None); - self.local_collations.collect_garbage(None); - self.dispatch_pending_requests(ctx); - - for collator_action in self.collators.maintain_peers() { - match collator_action { - Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator), - Action::NewRole(account_id, role) => if let Some((collator, info)) = self.collator_peer(account_id) { - info.collator_state.set_role(role, |msg| send_polkadot_message( - ctx, - collator.clone(), - msg, - )) - }, - } - } - } - - fn on_block_imported(&mut self, _ctx: &mut dyn Context, hash: Hash, header: &Header) { - self.collators.collect_garbage(Some(&hash)); - self.local_collations.collect_garbage(Some(&header.parent_hash)); - } -} - -impl PolkadotProtocol { - // we received a collation from a peer - fn on_collation( - &mut self, - ctx: &mut dyn Context, - from: PeerId, - relay_parent: Hash, - collation: Collation - ) { - let collation_para = collation.info.parachain_index; - let collated_acc = collation.info.collator.clone(); - - match self.peers.get(&from) { - None => ctx.report_peer(from, cost::UNKNOWN_PEER), - Some(peer_info) => { - ctx.report_peer(from.clone(), benefit::KNOWN_PEER); - match peer_info.collating_for { - None => ctx.report_peer(from, cost::UNEXPECTED_MESSAGE), - Some((ref acc_id, ref para_id)) => { - ctx.report_peer(from.clone(), benefit::EXPECTED_MESSAGE); - let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; - if structurally_valid && collation.info.check_signature().is_ok() { - debug!(target: "p_net", "Received collation for parachain {:?} from peer {}", para_id, from); - ctx.report_peer(from, benefit::GOOD_COLLATION); - self.collators.on_collation(acc_id.clone(), relay_parent, collation) - } else { - ctx.report_peer(from, cost::INVALID_FORMAT) - }; - } - } - }, - } - } - - fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); - self.collators.await_collation(relay_parent, para_id, tx); - rx - } - - // get connected peer with given account ID for collation. - fn collator_peer(&mut self, collator_id: CollatorId) -> Option<(PeerId, &mut PeerInfo)> { - let check_info = |info: &PeerInfo| info - .collating_for - .as_ref() - .map_or(false, |&(ref acc_id, _)| acc_id == &collator_id); - - self.peers - .iter_mut() - .filter(|&(_, ref info)| check_info(&**info)) - .map(|(who, info)| (who.clone(), info)) - .next() - } - - // disconnect a collator by account-id. - fn disconnect_bad_collator(&mut self, ctx: &mut dyn Context, collator_id: CollatorId) { - if let Some((who, _)) = self.collator_peer(collator_id) { - ctx.report_peer(who, cost::BAD_COLLATION) - } - } -} - -impl PolkadotProtocol { - /// Add a local collation and broadcast it to the necessary peers. - /// - /// This should be called by a collator intending to get the locally-collated - /// block into the hands of validators. - /// It also places the outgoing message and block data in the local availability store. - pub fn add_local_collation( - &mut self, - ctx: &mut dyn Context, - relay_parent: Hash, - targets: HashSet, - collation: Collation, - ) { - debug!(target: "p_net", "Importing local collation on relay parent {:?} and parachain {:?}", - relay_parent, collation.info.parachain_index); - - for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { - match self.validators.get(&primary) { - Some(who) => { - debug!(target: "p_net", "Sending local collation to {:?}", primary); - send_polkadot_message( - ctx, - who.clone(), - Message::Collation(relay_parent, cloned_collation), - ) - }, - None => - warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), - } - } - } - - /// Give the network protocol a handle to an availability store, used for - /// circulation of parachain data required for validation. - pub fn register_availability_store(&mut self, availability_store: ::av_store::Store) { - self.availability_store = Some(availability_store); - } -} diff --git a/network/src/legacy/router.rs b/network/src/legacy/router.rs deleted file mode 100644 index ddffe72f0c83..000000000000 --- a/network/src/legacy/router.rs +++ /dev/null @@ -1,397 +0,0 @@ -// Copyright 2017-2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Statement routing and validation statement table router implementation. -//! -//! During the attestation process, validators exchange statements on validity and availability -//! of parachain candidates. -//! -//! The `Router` in this file hooks into the underlying network to fulfill -//! the `TableRouter` trait from `polkadot-validation`, which is expected to call into a shared statement table -//! and dispatch evaluation work as necessary when new statements come in. - -use sp_runtime::traits::{BlakeTwo256, Hash as HashT}; -use polkadot_validation::{ - SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated -}; -use polkadot_primitives::{Block, Hash}; -use polkadot_primitives::parachain::{ - AbridgedCandidateReceipt, ParachainHost, ValidatorIndex, PoVBlock, ErasureChunk, -}; -use sp_api::ProvideRuntimeApi; - -use futures::prelude::*; -use futures::{task::SpawnExt, future::ready}; -use parking_lot::Mutex; -use log::{debug, trace}; - -use std::collections::{HashMap, HashSet}; -use std::io; -use std::sync::Arc; -use std::pin::Pin; - -use crate::legacy::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement, ErasureChunkMessage}; -use crate::legacy::validation::{LeafWorkDataFetcher, Executor}; -use crate::legacy::{NetworkService, PolkadotProtocol}; - -/// Compute the gossip topic for attestations on the given parent hash. -pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash { - let mut v = parent_hash.as_ref().to_vec(); - v.extend(b"attestations"); - - BlakeTwo256::hash(&v[..]) -} - -/// Create a `Stream` of checked messages. -/// -/// The returned stream will not terminate, so it is required to make sure that the stream is -/// dropped when it is not required anymore. Otherwise, it will stick around in memory -/// infinitely. -pub(crate) fn checked_statements(network: &N, topic: Hash) -> - impl Stream { - // spin up a task in the background that processes all incoming statements - // validation has been done already by the gossip validator. - // this will block internally until the gossip messages stream is obtained. - network.gossip_messages_for(topic) - .filter_map(|msg| match msg.0 { - GossipMessage::Statement(s) => ready(Some(s.signed_statement)), - _ => ready(None) - }) -} - -/// Table routing implementation. -pub struct Router { - table: Arc, - attestation_topic: Hash, - fetcher: LeafWorkDataFetcher, - deferred_statements: Arc>, - message_validator: RegisteredMessageValidator, - drop_signal: Arc, -} - -impl Router { - pub(crate) fn new( - table: Arc, - fetcher: LeafWorkDataFetcher, - message_validator: RegisteredMessageValidator, - drop_signal: exit_future::Signal, - ) -> Self { - let parent_hash = fetcher.parent_hash(); - Router { - table, - fetcher, - attestation_topic: attestation_topic(parent_hash), - deferred_statements: Arc::new(Mutex::new(DeferredStatements::new())), - message_validator, - drop_signal: Arc::new(drop_signal), - } - } - - /// Return a `Stream` of checked messages. These should be imported into the router - /// with `import_statement`. - /// - /// The returned stream will not terminate, so it is required to make sure that the stream is - /// dropped when it is not required anymore. Otherwise, it will stick around in memory - /// infinitely. - pub(crate) fn checked_statements(&self) -> impl Stream { - checked_statements(&*self.network(), self.attestation_topic) - } - - fn parent_hash(&self) -> Hash { - self.fetcher.parent_hash() - } - - fn network(&self) -> &RegisteredMessageValidator { - self.fetcher.network() - } -} - -impl Clone for Router { - fn clone(&self) -> Self { - Router { - table: self.table.clone(), - fetcher: self.fetcher.clone(), - attestation_topic: self.attestation_topic, - deferred_statements: self.deferred_statements.clone(), - message_validator: self.message_validator.clone(), - drop_signal: self.drop_signal.clone(), - } - } -} - -impl + Send + Sync + 'static, T> Router where - P::Api: ParachainHost, - T: Clone + Executor + Send + 'static, -{ - /// Import a statement whose signature has been checked already. - pub(crate) fn import_statement(&self, statement: SignedStatement) { - trace!(target: "p_net", "importing validation statement {:?}", statement.statement); - - // defer any statements for which we haven't imported the candidate yet - let c_hash = { - let candidate_data = match statement.statement { - GenericStatement::Candidate(ref c) => Some(c.hash()), - GenericStatement::Valid(ref hash) - | GenericStatement::Invalid(ref hash) - => self.table.with_candidate(hash, |c| c.map(|_| *hash)), - }; - match candidate_data { - Some(x) => x, - None => { - self.deferred_statements.lock().push(statement); - return; - } - } - }; - - // import all statements pending on this candidate - let (mut statements, _traces) = if let GenericStatement::Candidate(_) = statement.statement { - self.deferred_statements.lock().take_deferred(&c_hash) - } else { - (Vec::new(), Vec::new()) - }; - - // prepend the candidate statement. - debug!(target: "validation", "Importing statements about candidate {:?}", c_hash); - statements.insert(0, statement); - let producers: Vec<_> = self.table.import_remote_statements( - self, - statements.iter().cloned(), - ); - // dispatch future work as necessary. - for (producer, statement) in producers.into_iter().zip(statements) { - if let Some(sender) = self.table.index_to_id(statement.sender) { - self.fetcher.knowledge().lock().note_statement(sender, &statement.statement); - - if let Some(work) = producer.map(|p| self.create_work(c_hash, p)) { - trace!(target: "validation", "driving statement work to completion"); - - let work = work.boxed().map(drop); - let _ = self.fetcher.executor().spawn(work); - } - } - } - } - - fn create_work(&self, candidate_hash: Hash, producer: ParachainWork) - -> impl Future + Send + 'static - where - D: Future> + Send + Unpin + 'static, - { - let table = self.table.clone(); - let network = self.network().clone(); - let knowledge = self.fetcher.knowledge().clone(); - let attestation_topic = self.attestation_topic; - let parent_hash = self.parent_hash(); - let api = self.fetcher.api().clone(); - - async move { - match producer.prime(api).validate().await { - Ok(validated) => { - // store the data before broadcasting statements, so other peers can fetch. - knowledge.lock().note_candidate( - candidate_hash, - Some(validated.pov_block().clone()), - ); - - // propagate the statement. - // consider something more targeted than gossip in the future. - let statement = GossipStatement::new( - parent_hash, - match table.import_validated(validated) { - None => return, - Some(s) => s, - } - ); - - network.gossip_message(attestation_topic, statement.into()); - }, - Err(err) => { - debug!(target: "p_net", "Failed to produce statements: {:?}", err); - } - } - } - } -} - -impl + Send, T> TableRouter for Router where - P::Api: ParachainHost, - T: Clone + Executor + Send + 'static, -{ - type Error = io::Error; - type SendLocalCollation = future::Ready>; - type FetchValidationProof = Pin> + Send>>; - - // We have fetched from a collator and here the receipt should have been already formed. - fn local_collation( - &self, - receipt: AbridgedCandidateReceipt, - pov_block: PoVBlock, - chunks: (ValidatorIndex, &[ErasureChunk]) - ) -> Self::SendLocalCollation { - // produce a signed statement - let hash = receipt.hash(); - let erasure_root = receipt.commitments.erasure_root; - let validated = Validated::collated_local( - receipt, - pov_block.clone(), - ); - - let statement = GossipStatement::new( - self.parent_hash(), - match self.table.import_validated(validated) { - None => return future::ready(Ok(())), - Some(s) => s, - }, - ); - - // give to network to make available. - self.fetcher.knowledge().lock().note_candidate(hash, Some(pov_block)); - self.network().gossip_message(self.attestation_topic, statement.into()); - - for chunk in chunks.1 { - let relay_parent = self.parent_hash(); - let message = ErasureChunkMessage { - chunk: chunk.clone(), - relay_parent, - candidate_hash: hash, - }; - - self.network().gossip_message( - av_store::erasure_coding_topic(relay_parent, erasure_root, chunk.index), - message.into() - ); - } - - future::ready(Ok(())) - } - - fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { - self.fetcher.fetch_pov_block(candidate) - } -} - -impl Drop for Router { - fn drop(&mut self) { - let parent_hash = self.parent_hash(); - self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); }); - } -} - -// A unique trace for valid statements issued by a validator. -#[derive(Hash, PartialEq, Eq, Clone, Debug)] -pub(crate) enum StatementTrace { - Valid(ValidatorIndex, Hash), - Invalid(ValidatorIndex, Hash), -} - -/// Helper for deferring statements whose associated candidate is unknown. -pub(crate) struct DeferredStatements { - deferred: HashMap>, - known_traces: HashSet, -} - -impl DeferredStatements { - /// Create a new `DeferredStatements`. - pub(crate) fn new() -> Self { - DeferredStatements { - deferred: HashMap::new(), - known_traces: HashSet::new(), - } - } - - /// Push a new statement onto the deferred pile. `Candidate` statements - /// cannot be deferred and are ignored. - pub(crate) fn push(&mut self, statement: SignedStatement) { - let (hash, trace) = match statement.statement { - GenericStatement::Candidate(_) => return, - GenericStatement::Valid(hash) => (hash, StatementTrace::Valid(statement.sender.clone(), hash)), - GenericStatement::Invalid(hash) => (hash, StatementTrace::Invalid(statement.sender.clone(), hash)), - }; - - if self.known_traces.insert(trace) { - self.deferred.entry(hash).or_insert_with(Vec::new).push(statement); - } - } - - /// Take all deferred statements referencing the given candidate hash out. - pub(crate) fn take_deferred(&mut self, hash: &Hash) -> (Vec, Vec) { - match self.deferred.remove(hash) { - None => (Vec::new(), Vec::new()), - Some(deferred) => { - let mut traces = Vec::new(); - for statement in deferred.iter() { - let trace = match statement.statement { - GenericStatement::Candidate(_) => continue, - GenericStatement::Valid(hash) => StatementTrace::Valid(statement.sender.clone(), hash), - GenericStatement::Invalid(hash) => StatementTrace::Invalid(statement.sender.clone(), hash), - }; - - self.known_traces.remove(&trace); - traces.push(trace); - } - - (deferred, traces) - } - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn deferred_statements_works() { - let mut deferred = DeferredStatements::new(); - let hash = [1; 32].into(); - let sig = Default::default(); - let sender_index = 0; - - let statement = SignedStatement { - statement: GenericStatement::Valid(hash), - sender: sender_index, - signature: sig, - }; - - // pre-push. - { - let (signed, traces) = deferred.take_deferred(&hash); - assert!(signed.is_empty()); - assert!(traces.is_empty()); - } - - deferred.push(statement.clone()); - deferred.push(statement.clone()); - - // draining: second push should have been ignored. - { - let (signed, traces) = deferred.take_deferred(&hash); - assert_eq!(signed.len(), 1); - - assert_eq!(traces.len(), 1); - assert_eq!(signed[0].clone(), statement); - assert_eq!(traces[0].clone(), StatementTrace::Valid(sender_index, hash)); - } - - // after draining - { - let (signed, traces) = deferred.take_deferred(&hash); - assert!(signed.is_empty()); - assert!(traces.is_empty()); - } - } -} diff --git a/network/src/legacy/tests/mod.rs b/network/src/legacy/tests/mod.rs deleted file mode 100644 index 9c8ff8d5dd4e..000000000000 --- a/network/src/legacy/tests/mod.rs +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2018-2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Tests for polkadot and validation network. - -use std::collections::HashMap; -use super::{PolkadotProtocol, Status, Message, FullStatus}; -use crate::legacy::validation::LeafWorkParams; - -use polkadot_primitives::{Block, Hash}; -use polkadot_primitives::parachain::{CollatorId, ValidatorId}; -use sp_core::crypto::UncheckedInto; -use codec::Encode; -use sc_network::{ - PeerId, Context, ReputationChange, config::Roles, specialization::NetworkSpecialization, -}; - -mod validation; - -#[derive(Default)] -struct TestContext { - disabled: Vec, - disconnected: Vec, - reputations: HashMap, - messages: Vec<(PeerId, Vec)>, -} - -impl Context for TestContext { - fn report_peer(&mut self, peer: PeerId, reputation: ReputationChange) { - let reputation = self.reputations.get(&peer).map_or(reputation.value, |v| v + reputation.value); - self.reputations.insert(peer.clone(), reputation); - - match reputation { - i if i < -100 => self.disabled.push(peer), - i if i < 0 => self.disconnected.push(peer), - _ => {} - } - } - - fn send_chain_specific(&mut self, who: PeerId, message: Vec) { - self.messages.push((who, message)) - } - - fn disconnect_peer(&mut self, _who: PeerId) { } -} - -impl TestContext { - fn has_message(&self, to: PeerId, message: Message) -> bool { - let encoded = message.encode(); - self.messages.iter().any(|&(ref peer, ref data)| - peer == &to && data == &encoded - ) - } -} - -#[derive(Default)] -pub struct TestChainContext { - pub known_map: HashMap, - pub ingress_roots: HashMap>, -} - -impl crate::legacy::gossip::ChainContext for TestChainContext { - fn is_known(&self, block_hash: &Hash) -> Option { - self.known_map.get(block_hash).map(|x| x.clone()) - } - - fn leaf_unrouted_roots(&self, leaf: &Hash, with_queue_root: &mut dyn FnMut(&Hash)) - -> Result<(), sp_blockchain::Error> - { - for root in self.ingress_roots.get(leaf).into_iter().flat_map(|roots| roots) { - with_queue_root(root) - } - - Ok(()) - } -} - -fn make_status(status: &Status, roles: Roles) -> FullStatus { - FullStatus { - version: 1, - min_supported_version: 1, - roles, - best_number: 0, - best_hash: Default::default(), - genesis_hash: Default::default(), - chain_status: status.encode(), - } -} - -fn make_validation_leaf_work(parent_hash: Hash, local_key: ValidatorId) -> LeafWorkParams { - LeafWorkParams { - local_session_key: Some(local_key), - parent_hash, - authorities: Vec::new(), - } -} - -#[test] -fn sends_session_key() { - let mut protocol = PolkadotProtocol::new(None); - - let peer_a = PeerId::random(); - let peer_b = PeerId::random(); - let parent_hash = [0; 32].into(); - let local_key: ValidatorId = [1; 32].unchecked_into(); - - let validator_status = Status { collating_for: None }; - let collator_status = Status { collating_for: Some(([2; 32].unchecked_into(), 5.into())) }; - - { - let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_a.clone(), make_status(&validator_status, Roles::AUTHORITY)); - assert!(ctx.messages.is_empty()); - } - - { - let mut ctx = TestContext::default(); - let params = make_validation_leaf_work(parent_hash, local_key.clone()); - protocol.new_validation_leaf_work(&mut ctx, params); - assert!(ctx.has_message(peer_a, Message::ValidatorId(local_key.clone()))); - } - - { - let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_b.clone(), make_status(&collator_status, Roles::NONE)); - assert!(ctx.has_message(peer_b.clone(), Message::ValidatorId(local_key))); - } -} - -#[test] -fn remove_bad_collator() { - let mut protocol = PolkadotProtocol::new(None); - - let who = PeerId::random(); - let collator_id: CollatorId = [2; 32].unchecked_into(); - - let status = Status { collating_for: Some((collator_id.clone(), 5.into())) }; - - { - let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, who.clone(), make_status(&status, Roles::NONE)); - } - - { - let mut ctx = TestContext::default(); - protocol.disconnect_bad_collator(&mut ctx, collator_id); - assert!(ctx.disabled.contains(&who)); - } -} - -#[test] -fn kick_collator() { - let mut protocol = PolkadotProtocol::new(None); - - let who = PeerId::random(); - let collator_id: CollatorId = [2; 32].unchecked_into(); - - let mut ctx = TestContext::default(); - let status = Status { collating_for: Some((collator_id.clone(), 5.into())) }; - protocol.on_connect(&mut ctx, who.clone(), make_status(&status, Roles::NONE)); - assert!(!ctx.disconnected.contains(&who)); - - protocol.on_connect(&mut ctx, who.clone(), make_status(&status, Roles::NONE)); - assert!(ctx.disconnected.contains(&who)); -} - -#[test] -fn many_session_keys() { - let mut protocol = PolkadotProtocol::new(None); - - let parent_a = [1; 32].into(); - let parent_b = [2; 32].into(); - - let local_key_a: ValidatorId = [3; 32].unchecked_into(); - let local_key_b: ValidatorId = [4; 32].unchecked_into(); - - let params_a = make_validation_leaf_work(parent_a, local_key_a.clone()); - let params_b = make_validation_leaf_work(parent_b, local_key_b.clone()); - - protocol.new_validation_leaf_work(&mut TestContext::default(), params_a); - protocol.new_validation_leaf_work(&mut TestContext::default(), params_b); - - assert_eq!(protocol.live_validation_leaves.recent_keys(), &[local_key_a.clone(), local_key_b.clone()]); - - let peer_a = PeerId::random(); - - // when connecting a peer, we should get both those keys. - { - let mut ctx = TestContext::default(); - - let status = Status { collating_for: None }; - protocol.on_connect(&mut ctx, peer_a.clone(), make_status(&status, Roles::AUTHORITY)); - - assert!(ctx.has_message(peer_a.clone(), Message::ValidatorId(local_key_a.clone()))); - assert!(ctx.has_message(peer_a, Message::ValidatorId(local_key_b.clone()))); - } - - let peer_b = PeerId::random(); - - assert!(protocol.remove_validation_session(parent_a)); - - { - let mut ctx = TestContext::default(); - - let status = Status { collating_for: None }; - protocol.on_connect(&mut ctx, peer_b.clone(), make_status(&status, Roles::AUTHORITY)); - - assert!(!ctx.has_message(peer_b.clone(), Message::ValidatorId(local_key_a.clone()))); - assert!(ctx.has_message(peer_b, Message::ValidatorId(local_key_b.clone()))); - } -} diff --git a/network/src/legacy/tests/validation.rs b/network/src/legacy/tests/validation.rs deleted file mode 100644 index db01272dee17..000000000000 --- a/network/src/legacy/tests/validation.rs +++ /dev/null @@ -1,398 +0,0 @@ -// Copyright 2019-2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! Tests and helpers for validation networking. - -#![allow(unused)] - -use crate::legacy::gossip::GossipMessage; -use sc_network::{Context as NetContext, PeerId}; -use sc_network_gossip::TopicNotification; -use sp_core::{NativeOrEncoded, ExecutionContext}; -use sp_keyring::Sr25519Keyring; -use crate::legacy::{PolkadotProtocol, NetworkService, GossipService, GossipMessageStream}; - -use polkadot_validation::{SharedTable, Network}; -use polkadot_primitives::{Block, BlockNumber, Hash, Header, BlockId}; -use polkadot_primitives::parachain::{ - Id as ParaId, Chain, DutyRoster, ParachainHost, ValidatorId, - FeeSchedule, HeadData, Retriable, CollatorId, ErasureChunk, AbridgedCandidateReceipt, - GlobalValidationSchedule, LocalValidationData, -}; -use parking_lot::Mutex; -use sp_blockchain::Result as ClientResult; -use sp_api::{ApiRef, Core, RuntimeVersion, StorageProof, ApiErrorExt, ApiExt, ProvideRuntimeApi}; -use sp_runtime::traits::{Block as BlockT, HashFor, NumberFor}; -use sp_state_machine::ChangesTrieState; - -use std::collections::HashMap; -use std::sync::Arc; -use std::pin::Pin; -use std::task::{Poll, Context}; -use futures::{prelude::*, channel::mpsc, future::{select, Either}, task::Spawn}; -use codec::Encode; - -use super::{TestContext, TestChainContext}; - -#[derive(Clone, Copy)] -struct NeverExit; - -impl Future for NeverExit { - type Output = (); - - fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll { - Poll::Pending - } -} - -fn clone_gossip(n: &TopicNotification) -> TopicNotification { - TopicNotification { - message: n.message.clone(), - sender: n.sender.clone(), - } -} - -async fn gossip_router( - mut incoming_messages: mpsc::UnboundedReceiver<(Hash, TopicNotification)>, - mut incoming_streams: mpsc::UnboundedReceiver<(Hash, mpsc::UnboundedSender)> -) { - let mut outgoing: Vec<(Hash, mpsc::UnboundedSender)> = Vec::new(); - let mut messages = Vec::new(); - - loop { - match select(incoming_messages.next(), incoming_streams.next()).await { - Either::Left((Some((topic, message)), _)) => { - outgoing.retain(|&(ref o_topic, ref sender)| { - o_topic != &topic || sender.unbounded_send(clone_gossip(&message)).is_ok() - }); - messages.push((topic, message)); - }, - Either::Right((Some((topic, sender)), _)) => { - for message in messages.iter() - .filter(|&&(ref t, _)| t == &topic) - .map(|&(_, ref msg)| clone_gossip(msg)) - { - if let Err(_) = sender.unbounded_send(message) { return } - } - - outgoing.push((topic, sender)); - }, - Either::Left((None, _)) | Either::Right((None, _)) => panic!("ended early.") - } - } -} - -#[derive(Clone)] -struct GossipHandle { - send_message: mpsc::UnboundedSender<(Hash, TopicNotification)>, - send_listener: mpsc::UnboundedSender<(Hash, mpsc::UnboundedSender)>, -} - -fn make_gossip() -> (impl Future, GossipHandle) { - let (message_tx, message_rx) = mpsc::unbounded(); - let (listener_tx, listener_rx) = mpsc::unbounded(); - - ( - gossip_router(message_rx, listener_rx), - GossipHandle { send_message: message_tx, send_listener: listener_tx }, - ) -} - -struct TestNetwork { - proto: Arc>, - gossip: GossipHandle, -} - -impl NetworkService for TestNetwork { - fn with_spec(&self, with: F) - where F: FnOnce(&mut PolkadotProtocol, &mut dyn NetContext) - { - let mut context = TestContext::default(); - let res = with(&mut *self.proto.lock(), &mut context); - // TODO: send context to worker for message routing. - // https://github.com/paritytech/polkadot/issues/215 - res - } -} - -impl GossipService for TestNetwork { - fn gossip_messages_for(&self, topic: Hash) -> GossipMessageStream { - let (tx, rx) = mpsc::unbounded(); - let _ = self.gossip.send_listener.unbounded_send((topic, tx)); - GossipMessageStream::new(rx.boxed()) - } - - fn send_message(&self, _: PeerId, _: GossipMessage) { - unimplemented!() - } - - fn gossip_message(&self, topic: Hash, message: GossipMessage) { - let notification = TopicNotification { message: message.encode(), sender: None }; - let _ = self.gossip.send_message.unbounded_send((topic, notification)); - } -} - -#[derive(Default)] -struct ApiData { - validators: Vec, - duties: Vec, - active_parachains: Vec<(ParaId, Option<(CollatorId, Retriable)>)>, -} - -#[derive(Default, Clone)] -struct TestApi { - data: Arc>, -} - -struct RuntimeApi { - data: Arc>, -} - -impl ProvideRuntimeApi for TestApi { - type Api = RuntimeApi; - - fn runtime_api<'a>(&'a self) -> ApiRef<'a, Self::Api> { - RuntimeApi { data: self.data.clone() }.into() - } -} - -impl Core for RuntimeApi { - fn Core_version_runtime_api_impl( - &self, - _: &BlockId, - _: ExecutionContext, - _: Option<()>, - _: Vec, - ) -> ClientResult> { - unimplemented!("Not required for testing!") - } - - fn Core_execute_block_runtime_api_impl( - &self, - _: &BlockId, - _: ExecutionContext, - _: Option<(Block)>, - _: Vec, - ) -> ClientResult> { - unimplemented!("Not required for testing!") - } - - fn Core_initialize_block_runtime_api_impl( - &self, - _: &BlockId, - _: ExecutionContext, - _: Option<&Header>, - _: Vec, - ) -> ClientResult> { - unimplemented!("Not required for testing!") - } -} - -impl ApiErrorExt for RuntimeApi { - type Error = sp_blockchain::Error; -} - -impl ApiExt for RuntimeApi { - type StateBackend = sp_state_machine::InMemoryBackend>; - - fn map_api_result Result, R, E>( - &self, - _: F - ) -> Result { - unimplemented!("Not required for testing!") - } - - fn runtime_version_at(&self, _: &BlockId) -> ClientResult { - unimplemented!("Not required for testing!") - } - - fn record_proof(&mut self) { } - - fn extract_proof(&mut self) -> Option { - None - } - - fn into_storage_changes( - &self, - _: &Self::StateBackend, - _: Option<&ChangesTrieState, NumberFor>>, - _: ::Hash, - ) -> std::result::Result, String> - where Self: Sized - { - unimplemented!("Not required for testing!") - } -} - -impl ParachainHost for RuntimeApi { - fn ParachainHost_validators_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _: Option<()>, - _: Vec, - ) -> ClientResult>> { - Ok(NativeOrEncoded::Native(self.data.lock().validators.clone())) - } - - fn ParachainHost_duty_roster_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _: Option<()>, - _: Vec, - ) -> ClientResult> { - - Ok(NativeOrEncoded::Native(DutyRoster { - validator_duty: self.data.lock().duties.clone(), - })) - } - - fn ParachainHost_active_parachains_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _: Option<()>, - _: Vec, - ) -> ClientResult)>>> { - Ok(NativeOrEncoded::Native(self.data.lock().active_parachains.clone())) - } - - fn ParachainHost_parachain_code_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _: Option, - _: Vec, - ) -> ClientResult>>> { - Ok(NativeOrEncoded::Native(Some(Vec::new()))) - } - - fn ParachainHost_global_validation_schedule_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _: Option<()>, - _: Vec, - ) -> ClientResult> { - Ok(NativeOrEncoded::Native(Default::default())) - } - - fn ParachainHost_local_validation_data_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _: Option, - _: Vec, - ) -> ClientResult>> { - Ok(NativeOrEncoded::Native(Some(Default::default()))) - } - - fn ParachainHost_get_heads_runtime_api_impl( - &self, - _at: &BlockId, - _: ExecutionContext, - _extrinsics: Option::Extrinsic>>, - _: Vec, - ) -> ClientResult>>> { - Ok(NativeOrEncoded::Native(Some(Vec::new()))) - } -} - -type TestValidationNetwork = crate::legacy::validation::ValidationNetwork; - -struct Built { - gossip: Pin>>, - api_handle: Arc>, - networks: Vec>, -} - -fn build_network(n: usize, spawner: SP)-> Built { - use crate::legacy::gossip::RegisteredMessageValidator; - let (gossip_router, gossip_handle) = make_gossip(); - let api_handle = Arc::new(Mutex::new(Default::default())); - let runtime_api = Arc::new(TestApi { data: api_handle.clone() }); - - let networks = (0..n).map(|_| { - let net = Arc::new(TestNetwork { - proto: Arc::new(Mutex::new(PolkadotProtocol::new(None))), - gossip: gossip_handle.clone(), - }); - - let message_val = RegisteredMessageValidator::::new_test( - TestChainContext::default(), - Box::new(|_, _| {}), - ); - - TestValidationNetwork::new( - message_val, - runtime_api.clone(), - spawner.clone(), - ) - }); - - let networks: Vec<_> = networks.collect(); - - Built { - gossip: gossip_router.boxed(), - api_handle, - networks, - } -} - -#[derive(Clone)] -struct DummyGossipMessages; - -use futures::stream; -impl av_store::ProvideGossipMessages for DummyGossipMessages { - fn gossip_messages_for( - &self, - _topic: Hash - ) -> Pin + Send>> { - stream::empty().boxed() - } - - fn gossip_erasure_chunk( - &self, - _relay_parent: Hash, - _candidate_hash: Hash, - _erasure_root: Hash, - _chunk: ErasureChunk, - ) {} -} - -fn make_table(data: &ApiData, local_key: &Sr25519Keyring, parent_hash: Hash) -> Arc { - use av_store::Store; - use sp_core::crypto::Pair; - - let sr_pair = local_key.pair(); - let local_key = polkadot_primitives::parachain::ValidatorPair::from(local_key.pair()); - let store = Store::new_in_memory(DummyGossipMessages); - let (group_info, _) = ::polkadot_validation::make_group_info( - DutyRoster { validator_duty: data.duties.clone() }, - &data.validators, // only possible as long as parachain crypto === aura crypto - Some(sr_pair.public().into()), - ).unwrap(); - - Arc::new(SharedTable::new( - data.validators.clone(), - group_info, - Some(Arc::new(local_key)), - parent_hash, - store, - None, - )) -} diff --git a/network/src/legacy/validation.rs b/network/src/legacy/validation.rs deleted file mode 100644 index 3ab7ede6cf51..000000000000 --- a/network/src/legacy/validation.rs +++ /dev/null @@ -1,678 +0,0 @@ -// Copyright 2017-2020 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -//! The "validation leaf work" networking code built on top of the base network service. -//! -//! This fulfills the `polkadot_validation::Network` trait, providing a hook to be called -//! each time validation leaf work begins on a new chain head. - -use sc_network::PeerId; -use polkadot_validation::{ - Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement, -}; -use polkadot_primitives::{Block, Hash}; -use polkadot_primitives::parachain::{ - Id as ParaId, Collation, ParachainHost, AbridgedCandidateReceipt, CollatorId, - ValidatorId, PoVBlock, -}; -use sp_api::ProvideRuntimeApi; - -use futures::prelude::*; -use futures::task::SpawnExt; -pub use futures::task::Spawn as Executor; -use futures::channel::oneshot; -use futures::future::{ready, select}; - -use std::collections::hash_map::{HashMap, Entry}; -use std::io; -use std::sync::Arc; -use std::pin::Pin; - -use arrayvec::ArrayVec; -use parking_lot::Mutex; - -use crate::legacy::router::Router; -use crate::legacy::gossip::{RegisteredMessageValidator, MessageValidationData}; - -use super::{NetworkService, PolkadotProtocol}; - -/// Params to instantiate validation work on a block-DAG leaf. -pub struct LeafWorkParams { - /// The local session key. - pub local_session_key: Option, - /// The parent hash. - pub parent_hash: Hash, - /// The authorities. - pub authorities: Vec, -} - -/// Wrapper around the network service -pub struct ValidationNetwork { - api: Arc

, - executor: T, - network: RegisteredMessageValidator, -} - -impl ValidationNetwork { - /// Create a new consensus networking object. - pub fn new( - network: RegisteredMessageValidator, - api: Arc

, - executor: T, - ) -> Self { - ValidationNetwork { network, api, executor } - } -} - -impl Clone for ValidationNetwork { - fn clone(&self) -> Self { - ValidationNetwork { - network: self.network.clone(), - api: self.api.clone(), - executor: self.executor.clone(), - } - } -} - -impl ValidationNetwork where - P: ProvideRuntimeApi + Send + Sync + 'static, - P::Api: ParachainHost, - T: Clone + Executor + Send + Sync + 'static, -{ - /// Instantiate block-DAG leaf work - /// (i.e. the work we want to be done by validators at some chain-head) - /// at a parent hash. - /// - /// If the used session key is new, it will be broadcast to peers. - /// If any validation leaf-work was already instantiated at this parent hash, - /// the underlying instance will be shared. - /// - /// If there was already validation leaf-work instantiated and a different - /// session key was set, then the new key will be ignored. - /// - /// This implies that there can be multiple services intantiating validation - /// leaf-work instances safely, but they should all be coordinated on which session keys - /// are being used. - pub fn instantiate_leaf_work(&self, params: LeafWorkParams) - -> oneshot::Receiver> - { - let parent_hash = params.parent_hash; - let network = self.network.clone(); - let api = self.api.clone(); - let task_executor = self.executor.clone(); - let authorities = params.authorities.clone(); - - let (tx, rx) = oneshot::channel(); - - self.network.with_spec(move |spec, ctx| { - let actions = network.new_local_leaf( - parent_hash, - MessageValidationData { authorities }, - ); - - actions.perform(&network); - - let work = spec.new_validation_leaf_work(ctx, params); - let _ = tx.send(LeafWorkDataFetcher { - network, - api, - task_executor, - parent_hash, - knowledge: work.knowledge().clone(), - }); - }); - - rx - } -} - -impl ValidationNetwork { - /// Convert the given `CollatorId` to a `PeerId`. - pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) -> - impl Future> + Send - { - let network = self.network.clone(); - - async move { - let (send, recv) = oneshot::channel(); - network.with_spec(move |spec, _| { - let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned()); - }); - recv.await.ok().and_then(|opt| opt) - } - } - - /// Create a `Stream` of checked statements for the given `relay_parent`. - /// - /// The returned stream will not terminate, so it is required to make sure that the stream is - /// dropped when it is not required anymore. Otherwise, it will stick around in memory - /// infinitely. - pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream { - crate::legacy::router::checked_statements(&self.network, crate::legacy::router::attestation_topic(relay_parent)) - } -} - -/// A long-lived network which can create parachain statement routing processes on demand. -impl ParachainNetwork for ValidationNetwork where - P: ProvideRuntimeApi + Send + Sync + 'static, - P::Api: ParachainHost, - T: Clone + Executor + Send + Sync + 'static, -{ - type Error = String; - type TableRouter = Router; - type BuildTableRouter = Box> + Send + Unpin>; - - fn build_table_router( - &self, - table: Arc, - authorities: &[ValidatorId], - ) -> Self::BuildTableRouter { - let (signal, exit) = exit_future::signal(); - let parent_hash = *table.consensus_parent_hash(); - let local_session_key = table.session_key(); - - let build_fetcher = self.instantiate_leaf_work(LeafWorkParams { - local_session_key, - parent_hash, - authorities: authorities.to_vec(), - }); - - let executor = self.executor.clone(); - let network = self.network.clone(); - let work = build_fetcher - .map_err(|e| format!("{:?}", e)) - .map_ok(move |fetcher| { - let table_router = Router::new( - table, - fetcher, - network, - signal, - ); - - let table_router_clone = table_router.clone(); - let work = table_router.checked_statements() - .for_each(move |msg| { - table_router_clone.import_statement(msg); - ready(()) - }); - - let work = select(work, exit).map(drop); - - let _ = executor.spawn(work); - - table_router - }); - - Box::new(work) - } -} - -/// Error when the network appears to be down. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub struct NetworkDown; - -impl Collators for ValidationNetwork where - P: ProvideRuntimeApi + Send + Sync + 'static, - P::Api: ParachainHost, -{ - type Error = NetworkDown; - type Collation = Pin> + Send>>; - - fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { - let (tx, rx) = oneshot::channel(); - let network = self.network.clone(); - - // A future that resolves when a collation is received. - async move { - network.with_spec(move |spec, _| { - let collation = spec.await_collation(relay_parent, parachain); - let _ = tx.send(collation); - }); - - rx.await - .map_err(|_| NetworkDown)? - .await - .map_err(|_| NetworkDown) - }.boxed() - } - - - fn note_bad_collator(&self, collator: CollatorId) { - self.network.with_spec(move |spec, ctx| spec.disconnect_bad_collator(ctx, collator)); - } -} - -#[derive(Default)] -struct KnowledgeEntry { - knows_block_data: Vec, - knows_outgoing: Vec, - pov: Option, -} - -/// Tracks knowledge of peers. -pub(crate) struct Knowledge { - candidates: HashMap, -} - -impl Knowledge { - /// Create a new knowledge instance. - pub(crate) fn new() -> Self { - Knowledge { - candidates: HashMap::new(), - } - } - - /// Note a statement seen from another validator. - pub(crate) fn note_statement(&mut self, from: ValidatorId, statement: &Statement) { - // those proposing the candidate or declaring it valid know everything. - // those claiming it invalid do not have the outgoing messages data as it is - // generated by valid execution. - match *statement { - GenericStatement::Candidate(ref c) => { - let entry = self.candidates.entry(c.hash()).or_insert_with(Default::default); - entry.knows_block_data.push(from.clone()); - entry.knows_outgoing.push(from); - } - GenericStatement::Valid(ref hash) => { - let entry = self.candidates.entry(*hash).or_insert_with(Default::default); - entry.knows_block_data.push(from.clone()); - entry.knows_outgoing.push(from); - } - GenericStatement::Invalid(ref hash) => self.candidates.entry(*hash) - .or_insert_with(Default::default) - .knows_block_data - .push(from), - } - } - - /// Note a candidate collated or seen locally. - pub(crate) fn note_candidate( - &mut self, - hash: Hash, - pov: Option, - ) { - let entry = self.candidates.entry(hash).or_insert_with(Default::default); - entry.pov = entry.pov.take().or(pov); - } -} - -/// A current validation leaf-work instance -#[derive(Clone)] -pub(crate) struct LiveValidationLeaf { - parent_hash: Hash, - knowledge: Arc>, - local_session_key: Option, -} - -impl LiveValidationLeaf { - /// Create a new validation leaf-work instance. Needs to be attached to the - /// network. - pub(crate) fn new(params: LeafWorkParams) -> Self { - LiveValidationLeaf { - parent_hash: params.parent_hash, - knowledge: Arc::new(Mutex::new(Knowledge::new())), - local_session_key: params.local_session_key, - } - } - - /// Get a handle to the shared knowledge relative to this consensus - /// instance. - pub(crate) fn knowledge(&self) -> &Arc> { - &self.knowledge - } - - // execute a closure with locally stored proof-of-validation for a candidate, or a slice of session identities - // we believe should have the data. - fn with_pov_block(&self, hash: &Hash, f: F) -> U - where F: FnOnce(Result<&PoVBlock, &[ValidatorId]>) -> U - { - let knowledge = self.knowledge.lock(); - let res = knowledge.candidates.get(hash) - .ok_or(&[] as &_) - .and_then(|entry| entry.pov.as_ref().ok_or(&entry.knows_block_data[..])); - - f(res) - } -} - -// 3 is chosen because sessions change infrequently and usually -// only the last 2 (current session and "last" session) are relevant. -// the extra is an error boundary. -const RECENT_SESSIONS: usize = 3; - -/// Result when inserting recent session key. -#[derive(PartialEq, Eq)] -pub(crate) enum InsertedRecentKey { - /// Key was already known. - AlreadyKnown, - /// Key was new and pushed out optional old item. - New(Option), -} - -/// Wrapper for managing recent session keys. -#[derive(Default)] -pub(crate) struct RecentValidatorIds { - inner: ArrayVec<[ValidatorId; RECENT_SESSIONS]>, -} - -impl RecentValidatorIds { - /// Insert a new session key. This returns one to be pushed out if the - /// set is full. - pub(crate) fn insert(&mut self, key: ValidatorId) -> InsertedRecentKey { - if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown } - - let old = if self.inner.len() == RECENT_SESSIONS { - Some(self.inner.remove(0)) - } else { - None - }; - - self.inner.push(key); - InsertedRecentKey::New(old) - } - - /// As a slice. Most recent is last. - pub(crate) fn as_slice(&self) -> &[ValidatorId] { - &*self.inner - } - - fn remove(&mut self, key: &ValidatorId) { - self.inner.retain(|k| k != key) - } -} - -/// Manages requests and keys for live validation leaf-work instances. -pub(crate) struct LiveValidationLeaves { - // recent local session keys. - recent: RecentValidatorIds, - // live validation leaf-work instances, on `parent_hash`. refcount retained alongside. - live_instances: HashMap, -} - -impl LiveValidationLeaves { - /// Create a new `LiveValidationLeaves` - pub(crate) fn new() -> Self { - LiveValidationLeaves { - recent: Default::default(), - live_instances: HashMap::new(), - } - } - - /// Note new leaf for validation work. If the used session key is new, - /// it returns it to be broadcasted to peers. - /// - /// If there was already work instantiated at this leaf and a different - /// session key was set, then the new key will be ignored. - pub(crate) fn new_validation_leaf( - &mut self, - params: LeafWorkParams, - ) -> (LiveValidationLeaf, Option) { - let parent_hash = params.parent_hash; - - let key = params.local_session_key.clone(); - let recent = &mut self.recent; - - let mut check_new_key = || { - let inserted_key = key.clone().map(|key| recent.insert(key)); - if let Some(InsertedRecentKey::New(_)) = inserted_key { - key.clone() - } else { - None - } - }; - - if let Some(&mut (ref mut rc, ref mut prev)) = self.live_instances.get_mut(&parent_hash) { - let maybe_new = if prev.local_session_key.is_none() { - prev.local_session_key = key.clone(); - check_new_key() - } else { - None - }; - - *rc += 1; - return (prev.clone(), maybe_new) - } - - let leaf_work = LiveValidationLeaf::new(params); - self.live_instances.insert(parent_hash, (1, leaf_work.clone())); - - (leaf_work, check_new_key()) - } - - /// Remove validation leaf-work. true indicates that it was actually removed. - pub(crate) fn remove(&mut self, parent_hash: Hash) -> bool { - let maybe_removed = if let Entry::Occupied(mut entry) = self.live_instances.entry(parent_hash) { - entry.get_mut().0 -= 1; - if entry.get().0 == 0 { - let (_, leaf_work) = entry.remove(); - Some(leaf_work) - } else { - None - } - } else { - None - }; - - let leaf_work = match maybe_removed { - None => return false, - Some(s) => s, - }; - - if let Some(ref key) = leaf_work.local_session_key { - let key_still_used = self.live_instances.values() - .any(|c| c.1.local_session_key.as_ref() == Some(key)); - - if !key_still_used { - self.recent.remove(key) - } - } - - true - } - - /// Recent session keys as a slice. - pub(crate) fn recent_keys(&self) -> &[ValidatorId] { - self.recent.as_slice() - } - - /// Call a closure with pov-data from validation leaf-work at parent hash for a given - /// candidate-receipt hash. - /// - /// This calls the closure with `Some(data)` where the leaf-work and data are live, - /// `Err(Some(keys))` when the leaf-work is live but the data unknown, with a list of keys - /// who have the data, and `Err(None)` where the leaf-work is unknown. - pub(crate) fn with_pov_block(&self, parent_hash: &Hash, c_hash: &Hash, f: F) -> U - where F: FnOnce(Result<&PoVBlock, Option<&[ValidatorId]>>) -> U - { - match self.live_instances.get(parent_hash) { - Some(c) => c.1.with_pov_block(c_hash, |res| f(res.map_err(Some))), - None => f(Err(None)) - } - } -} - -/// Can fetch data for a given validation leaf-work instance. -pub struct LeafWorkDataFetcher { - network: RegisteredMessageValidator, - api: Arc

, - task_executor: T, - knowledge: Arc>, - parent_hash: Hash, -} - -impl LeafWorkDataFetcher { - /// Get the parent hash. - pub(crate) fn parent_hash(&self) -> Hash { - self.parent_hash - } - - /// Get the shared knowledge. - pub(crate) fn knowledge(&self) -> &Arc> { - &self.knowledge - } - - /// Get the network service. - pub(crate) fn network(&self) -> &RegisteredMessageValidator { - &self.network - } - - /// Get the executor. - pub(crate) fn executor(&self) -> &T { - &self.task_executor - } - - /// Get the runtime API. - pub(crate) fn api(&self) -> &Arc

{ - &self.api - } -} - -impl Clone for LeafWorkDataFetcher { - fn clone(&self) -> Self { - LeafWorkDataFetcher { - network: self.network.clone(), - api: self.api.clone(), - task_executor: self.task_executor.clone(), - parent_hash: self.parent_hash, - knowledge: self.knowledge.clone(), - } - } -} - -impl + Send, T> LeafWorkDataFetcher where - P::Api: ParachainHost, - T: Clone + Executor + Send + 'static, -{ - /// Fetch PoV block for the given candidate receipt. - pub fn fetch_pov_block(&self, candidate: &AbridgedCandidateReceipt) - -> Pin> + Send>> { - - let parent_hash = self.parent_hash; - let network = self.network.clone(); - let candidate = candidate.clone(); - let (tx, rx) = oneshot::channel(); - - async move { - network.with_spec(move |spec, ctx| { - let inner_rx = spec.fetch_pov_block(ctx, &candidate, parent_hash); - let _ = tx.send(inner_rx); - }); - - let map_err = |_| io::Error::new( - io::ErrorKind::Other, - "Sending end of channel hung up", - ); - - rx.await - .map_err(map_err)? - .await - .map_err(map_err) - }.boxed() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use sp_core::crypto::UncheckedInto; - - #[test] - fn last_keys_works() { - let a: ValidatorId = [1; 32].unchecked_into(); - let b: ValidatorId = [2; 32].unchecked_into(); - let c: ValidatorId = [3; 32].unchecked_into(); - let d: ValidatorId = [4; 32].unchecked_into(); - - let mut recent = RecentValidatorIds::default(); - - match recent.insert(a.clone()) { - InsertedRecentKey::New(None) => {}, - _ => panic!("is new, not at capacity"), - } - - match recent.insert(a.clone()) { - InsertedRecentKey::AlreadyKnown => {}, - _ => panic!("not new"), - } - - match recent.insert(b.clone()) { - InsertedRecentKey::New(None) => {}, - _ => panic!("is new, not at capacity"), - } - - match recent.insert(b) { - InsertedRecentKey::AlreadyKnown => {}, - _ => panic!("not new"), - } - - match recent.insert(c.clone()) { - InsertedRecentKey::New(None) => {}, - _ => panic!("is new, not at capacity"), - } - - match recent.insert(c) { - InsertedRecentKey::AlreadyKnown => {}, - _ => panic!("not new"), - } - - match recent.insert(d.clone()) { - InsertedRecentKey::New(Some(old)) => assert_eq!(old, a), - _ => panic!("is new, and at capacity"), - } - - match recent.insert(d) { - InsertedRecentKey::AlreadyKnown => {}, - _ => panic!("not new"), - } - } - - #[test] - fn add_new_leaf_work_works() { - let mut live_leaves = LiveValidationLeaves::new(); - let key_a: ValidatorId = [0; 32].unchecked_into(); - let key_b: ValidatorId = [1; 32].unchecked_into(); - let parent_hash = [0xff; 32].into(); - - let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams { - parent_hash, - local_session_key: None, - authorities: Vec::new(), - }); - - let knowledge = leaf_work.knowledge().clone(); - - assert!(new_key.is_none()); - - let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams { - parent_hash, - local_session_key: Some(key_a.clone()), - authorities: Vec::new(), - }); - - // check that knowledge points to the same place. - assert_eq!(&**leaf_work.knowledge() as *const _, &*knowledge as *const _); - assert_eq!(new_key, Some(key_a.clone())); - - let (leaf_work, new_key) = live_leaves.new_validation_leaf(LeafWorkParams { - parent_hash, - local_session_key: Some(key_b.clone()), - authorities: Vec::new(), - }); - - assert_eq!(&**leaf_work.knowledge() as *const _, &*knowledge as *const _); - assert!(new_key.is_none()); - } -} diff --git a/network/src/lib.rs b/network/src/lib.rs index e2d8873248fe..a123d098e675 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -19,13 +19,14 @@ //! This manages routing for parachain statements, parachain block and outgoing message //! data fetching, communication between collators and validators, and more. -use polkadot_primitives::{Block, Hash}; +use polkadot_primitives::{Block, Hash, BlakeTwo256, HashT}; pub mod legacy; pub mod protocol; sc_network::construct_simple_protocol! { /// Stub until https://github.com/paritytech/substrate/pull/4665 is merged + #[derive(Clone)] pub struct PolkadotProtocol where Block = Block { } } @@ -35,23 +36,23 @@ pub type PolkadotNetworkService = sc_network::NetworkService Hash { + let mut v = candidate_hash.as_ref().to_vec(); + v.extend(b"erasure_chunks"); + + BlakeTwo256::hash(&v[..]) +} diff --git a/network/src/protocol.rs b/network/src/protocol.rs index c52d7b7ce140..d573d9e52e57 100644 --- a/network/src/protocol.rs +++ b/network/src/protocol.rs @@ -21,6 +21,7 @@ //! We handle events from `sc-network` in a thin wrapper that forwards to a //! background worker which also handles commands from other parts of the node. +use arrayvec::ArrayVec; use codec::{Decode, Encode}; use futures::channel::{mpsc, oneshot}; use futures::future::Either; @@ -37,18 +38,19 @@ use polkadot_primitives::{ }; use polkadot_validation::{ SharedTable, TableRouter, Network as ParachainNetwork, Validated, GenericStatement, Collators, + SignedStatement, }; use sc_network::{config::Roles, Event, PeerId}; use sp_api::ProvideRuntimeApi; -use std::collections::HashMap; +use std::collections::{hash_map::{Entry, HashMap}, HashSet}; use std::pin::Pin; use std::sync::{Arc, Weak}; use std::time::Duration; use super::{cost, benefit, PolkadotNetworkService}; -use crate::legacy::validation::{RecentValidatorIds, InsertedRecentKey}; use crate::legacy::collator_pool::Role as CollatorRole; +use crate::legacy::gossip::{GossipMessage, ErasureChunkMessage}; /// The current protocol version. pub const VERSION: u32 = 1; @@ -57,6 +59,8 @@ pub const MIN_SUPPORTED_VERSION: u32 = 1; /// The engine ID of the polkadot network protocol. pub const POLKADOT_ENGINE_ID: sp_runtime::ConsensusEngineId = *b"dot2"; +/// The protocol name. +pub const POLKADOT_PROTOCOL_NAME: &[u8] = b"dot2-proto"; pub use crate::legacy::gossip::ChainContext; @@ -70,7 +74,7 @@ enum ServiceToWorkerMsg { // service messages. BuildConsensusNetworking(Arc, Vec, oneshot::Sender), DropConsensusNetworking(Hash), - LocalCollation( + SubmitValidatedCollation( Hash, // relay-parent AbridgedCandidateReceipt, PoVBlock, @@ -81,6 +85,15 @@ enum ServiceToWorkerMsg { AbridgedCandidateReceipt, oneshot::Sender, ), + FetchErasureChunk( + Hash, // candidate-hash. + u32, // validator index. + oneshot::Sender, + ), + DistributeErasureChunk( + Hash, // candidate-hash, + ErasureChunk, + ), AwaitCollation( Hash, // relay-parent, ParaId, @@ -89,6 +102,17 @@ enum ServiceToWorkerMsg { NoteBadCollator( CollatorId, ), + RegisterAvailabilityStore( + av_store::Store, + ), + OurCollation( + HashSet, + Collation, + ), + ListenCheckedStatements( + Hash, // relay-parent, + oneshot::Sender + Send>>>, + ), } /// An async handle to the network service. @@ -117,6 +141,7 @@ pub fn start( const SERVICE_TO_WORKER_BUF: usize = 256; let mut event_stream = service.event_stream(); + service.register_notifications_protocol(POLKADOT_ENGINE_ID, POLKADOT_PROTOCOL_NAME); let (mut worker_sender, worker_receiver) = mpsc::channel(SERVICE_TO_WORKER_BUF); let gossip_validator = crate::legacy::gossip::register_validator( @@ -290,9 +315,54 @@ pub struct Config { pub collating_for: Option<(CollatorId, ParaId)>, } +// 3 is chosen because sessions change infrequently and usually +// only the last 2 (current session and "last" session) are relevant. +// the extra is an error boundary. +const RECENT_SESSIONS: usize = 3; + +/// Result when inserting recent session key. +#[derive(PartialEq, Eq)] +pub(crate) enum InsertedRecentKey { + /// Key was already known. + AlreadyKnown, + /// Key was new and pushed out optional old item. + New(Option), +} + +/// Wrapper for managing recent session keys. +#[derive(Default)] +struct RecentValidatorIds { + inner: ArrayVec<[ValidatorId; RECENT_SESSIONS]>, +} + +impl RecentValidatorIds { + /// Insert a new session key. This returns one to be pushed out if the + /// set is full. + fn insert(&mut self, key: ValidatorId) -> InsertedRecentKey { + if self.inner.contains(&key) { return InsertedRecentKey::AlreadyKnown } + + let old = if self.inner.len() == RECENT_SESSIONS { + Some(self.inner.remove(0)) + } else { + None + }; + + self.inner.push(key); + InsertedRecentKey::New(old) + } + + /// As a slice. Most recent is last. + fn as_slice(&self) -> &[ValidatorId] { + &*self.inner + } +} + struct ProtocolHandler { service: Arc, peers: HashMap, + // reverse mapping from validator-ID to PeerID. Multiple peers can represent + // the same validator because of sentry nodes. + connected_validators: HashMap>, collators: crate::legacy::collator_pool::CollatorPool, local_collations: crate::legacy::local_collations::LocalCollations, config: Config, @@ -306,6 +376,7 @@ impl ProtocolHandler { ProtocolHandler { service, peers: HashMap::new(), + connected_validators: HashMap::new(), collators: Default::default(), local_collations: Default::default(), config, @@ -332,11 +403,17 @@ impl ProtocolHandler { fn on_disconnect(&mut self, peer: PeerId) { let mut new_primary = None; if let Some(data) = self.peers.remove(&peer) { + // replace collator. if let Some((collator_id, _)) = data.ready_and_collating_for() { if self.collators.collator_id_to_peer_id(&collator_id) == Some(&peer) { new_primary = self.collators.on_disconnect(collator_id); } } + + // clean up stated validator IDs. + for validator_id in data.session_keys.as_slice().iter().cloned() { + self.validator_representative_removed(validator_id, &peer); + } } let service = &self.service; @@ -481,11 +558,12 @@ impl ProtocolHandler { } } - self.send_peer_collations(remote, collations_to_send); + send_peer_collations(&*self.service, remote, collations_to_send); } fn on_validator_id(&mut self, remote: PeerId, key: ValidatorId) { let mut collations_to_send = Vec::new(); + let mut invalidated_key = None; { let peer = match self.peers.get_mut(&remote) { @@ -501,21 +579,29 @@ impl ProtocolHandler { ProtocolState::Ready(_, _) => { if let InsertedRecentKey::New(Some(last)) = peer.session_keys.insert(key.clone()) { collations_to_send = self.local_collations.fresh_key(&last, &key); + invalidated_key = Some(last); } } } } - self.send_peer_collations(remote, collations_to_send); + if let Some(invalidated) = invalidated_key { + self.validator_representative_removed(invalidated, &remote); + } + + send_peer_collations(&*self.service, remote, collations_to_send); } - fn send_peer_collations(&self, remote: PeerId, collations: Vec<(Hash, Collation)>) { - for (relay_parent, collation) in collations { - self.service.write_notification( - remote.clone(), - POLKADOT_ENGINE_ID, - Message::Collation(relay_parent, collation).encode(), - ); + // call when the given peer no longer represents the given validator key. + // + // this can occur when the peer advertises a new key, invalidating an old one, + // or when the peer disconnects. + fn validator_representative_removed(&mut self, validator_id: ValidatorId, peer_id: &PeerId) { + if let Entry::Occupied(mut entry) = self.connected_validators.entry(validator_id) { + entry.get_mut().remove(peer_id); + if entry.get().is_empty() { + let _ = entry.remove_entry(); + } } } @@ -555,6 +641,39 @@ impl ProtocolHandler { } } } + + // distribute our (as a collator node) collation to peers. + fn distribute_our_collation(&mut self, targets: HashSet, collation: Collation) { + let relay_parent = collation.info.relay_parent; + let distribution = self.local_collations.add_collation(relay_parent, targets, collation); + + for (validator, collation) in distribution { + let validator_representatives = self.connected_validators.get(&validator) + .into_iter().flat_map(|reps| reps); + + for remote in validator_representatives { + send_peer_collations( + &*self.service, + remote.clone(), + std::iter::once((relay_parent, collation.clone())), + ); + } + } + } +} + +fn send_peer_collations( + service: &PolkadotNetworkService, + remote: PeerId, + collations: impl IntoIterator, +) { + for (relay_parent, collation) in collations { + service.write_notification( + remote.clone(), + POLKADOT_ENGINE_ID, + Message::Collation(relay_parent, collation).encode(), + ); + } } async fn worker_loop( @@ -627,7 +746,7 @@ async fn worker_loop( consensus_instances.insert(relay_parent, ConsensusNetworkingInstance { statement_table: table.clone(), relay_parent, - attestation_topic: crate::legacy::router::attestation_topic(relay_parent), + attestation_topic: crate::legacy::gossip::attestation_topic(relay_parent), _drop_signal: signal, }); @@ -650,13 +769,13 @@ async fn worker_loop( ServiceToWorkerMsg::DropConsensusNetworking(relay_parent) => { consensus_instances.remove(&relay_parent); } - ServiceToWorkerMsg::LocalCollation(relay_parent, receipt, pov_block, chunks) => { + ServiceToWorkerMsg::SubmitValidatedCollation(relay_parent, receipt, pov_block, chunks) => { let instance = match consensus_instances.get(&relay_parent) { None => continue, Some(instance) => instance, }; - distribute_local_collation( + distribute_validated_collation( instance, receipt, pov_block, @@ -668,6 +787,45 @@ async fn worker_loop( // TODO https://github.com/paritytech/polkadot/issues/742: // create a filter on gossip for it and send to sender. } + ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, validator_index, sender) => { + let topic = crate::erasure_coding_topic(&candidate_hash); + + // for every erasure-root, relay-parent pair, there should only be one + // valid chunk with the given index. + // + // so we only care about the first item of the filtered stream. + let get_msg = gossip_handle.gossip_messages_for(topic) + .filter_map(move |(msg, _)| { + future::ready(match msg { + GossipMessage::ErasureChunk(chunk) => + if chunk.chunk.index == validator_index { + Some(chunk.chunk) + } else { + None + }, + _ => None, + }) + }) + .into_future() + .map(|(item, _)| item.expect( + "gossip message streams do not conclude early; qed" + )); + + let _ = executor.spawn(async move { + let chunk = get_msg.await; + let _ = sender.send(chunk); + }); + } + ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, erasure_chunk) => { + let topic = crate::erasure_coding_topic(&candidate_hash); + gossip_handle.gossip_message( + topic, + GossipMessage::ErasureChunk(ErasureChunkMessage { + chunk: erasure_chunk, + candidate_hash, + }) + ); + } ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => { debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent); protocol_handler.await_collation(relay_parent, para_id, sender) @@ -675,6 +833,82 @@ async fn worker_loop( ServiceToWorkerMsg::NoteBadCollator(collator) => { protocol_handler.note_bad_collator(collator); } + ServiceToWorkerMsg::RegisterAvailabilityStore(store) => { + gossip_handle.register_availability_store(store); + } + ServiceToWorkerMsg::OurCollation(targets, collation) => { + protocol_handler.distribute_our_collation(targets, collation); + } + ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, sender) => { + let topic = crate::legacy::gossip::attestation_topic(relay_parent); + let checked_messages = gossip_handle.gossip_messages_for(topic) + .filter_map(|msg| match msg.0 { + GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), + _ => future::ready(None), + }) + .boxed(); + + let _ = sender.send(checked_messages); + } + } + } +} + +// A unique trace for valid statements issued by a validator. +#[derive(Hash, PartialEq, Eq, Clone, Debug)] +pub(crate) enum StatementTrace { + Valid(ValidatorIndex, Hash), + Invalid(ValidatorIndex, Hash), +} + +/// Helper for deferring statements whose associated candidate is unknown. +struct DeferredStatements { + deferred: HashMap>, + known_traces: HashSet, +} + +impl DeferredStatements { + /// Create a new `DeferredStatements`. + fn new() -> Self { + DeferredStatements { + deferred: HashMap::new(), + known_traces: HashSet::new(), + } + } + + /// Push a new statement onto the deferred pile. `Candidate` statements + /// cannot be deferred and are ignored. + fn push(&mut self, statement: SignedStatement) { + let (hash, trace) = match statement.statement { + GenericStatement::Candidate(_) => return, + GenericStatement::Valid(hash) => (hash, StatementTrace::Valid(statement.sender.clone(), hash)), + GenericStatement::Invalid(hash) => (hash, StatementTrace::Invalid(statement.sender.clone(), hash)), + }; + + if self.known_traces.insert(trace) { + self.deferred.entry(hash).or_insert_with(Vec::new).push(statement); + } + } + + /// Take all deferred statements referencing the given candidate hash out. + fn take_deferred(&mut self, hash: &Hash) -> (Vec, Vec) { + match self.deferred.remove(hash) { + None => (Vec::new(), Vec::new()), + Some(deferred) => { + let mut traces = Vec::new(); + for statement in deferred.iter() { + let trace = match statement.statement { + GenericStatement::Candidate(_) => continue, + GenericStatement::Valid(hash) => StatementTrace::Valid(statement.sender.clone(), hash), + GenericStatement::Invalid(hash) => StatementTrace::Invalid(statement.sender.clone(), hash), + }; + + self.known_traces.remove(&trace); + traces.push(trace); + } + + (deferred, traces) + } } } } @@ -693,14 +927,14 @@ async fn statement_import_loop( Api: ProvideRuntimeApi + Send + Sync + 'static, Api::Api: ParachainHost, { - let topic = crate::legacy::router::attestation_topic(relay_parent); + let topic = crate::legacy::gossip::attestation_topic(relay_parent); let mut checked_messages = gossip_handle.gossip_messages_for(topic) .filter_map(|msg| match msg.0 { - crate::legacy::gossip::GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), + GossipMessage::Statement(s) => future::ready(Some(s.signed_statement)), _ => future::ready(None), }); - let mut deferred_statements = crate::legacy::router::DeferredStatements::new(); + let mut deferred_statements = DeferredStatements::new(); loop { let statement = match future::select(exit, checked_messages.next()).await { @@ -758,7 +992,30 @@ async fn statement_import_loop( if let Some(producer) = producer { trace!(target: "validation", "driving statement work to completion"); - let work = producer.prime(api.clone()).validate(); + let table = table.clone(); + let gossip_handle = gossip_handle.clone(); + + let work = producer.prime(api.clone()).validate().map(move |res| { + let validated = match res { + Err(e) => { + debug!(target: "p_net", "Failed to act on statement: {}", e); + return + } + Ok(v) => v, + }; + + // propagate the statement. + let statement = crate::legacy::gossip::GossipStatement::new( + relay_parent, + match table.import_validated(validated) { + Some(s) => s, + None => return, + } + ); + + gossip_handle.gossip_message(topic, statement.into()); + }); + let work = future::select(work.boxed(), exit.clone()).map(drop); let _ = executor.spawn(work); } @@ -770,7 +1027,7 @@ async fn statement_import_loop( // distribute a "local collation": this is the collation gotten by a validator // from a collator. it needs to be distributed to other validators in the same // group. -fn distribute_local_collation( +fn distribute_validated_collation( instance: &ConsensusNetworkingInstance, receipt: AbridgedCandidateReceipt, pov_block: PoVBlock, @@ -779,7 +1036,6 @@ fn distribute_local_collation( ) { // produce a signed statement. let hash = receipt.hash(); - let erasure_root = receipt.commitments.erasure_root; let validated = Validated::collated_local( receipt, pov_block, @@ -796,15 +1052,13 @@ fn distribute_local_collation( gossip_handle.gossip_message(instance.attestation_topic, statement.into()); for chunk in chunks.1 { - let index = chunk.index; let message = crate::legacy::gossip::ErasureChunkMessage { chunk, - relay_parent: instance.relay_parent, candidate_hash: hash, }; gossip_handle.gossip_message( - av_store::erasure_coding_topic(instance.relay_parent, erasure_root, index), + crate::erasure_coding_topic(&hash), message.into(), ); } @@ -841,10 +1095,60 @@ impl Drop for RouterInner { } } +impl Service { + /// Register an availablility-store that the network can query. + pub fn register_availability_store(&self, store: av_store::Store) { + let _ = self.sender.clone() + .try_send(ServiceToWorkerMsg::RegisterAvailabilityStore(store)); + } + + /// Submit a collation that we (as a collator) have prepared to validators. + /// + /// Provide a set of validator-IDs we should distribute to. + pub fn distribute_collation(&self, targets: HashSet, collation: Collation) { + let _ = self.sender.clone() + .try_send(ServiceToWorkerMsg::OurCollation(targets, collation)); + } + + /// Returns a stream that listens for checked statements on a particular + /// relay chain parent hash. + /// + /// Take care to drop the stream, as the sending side will not be cleaned + /// up until it is. + pub fn checked_statements(&self, relay_parent: Hash) + -> Pin>> { + let (tx, rx) = oneshot::channel(); + let mut sender = self.sender.clone(); + + let receive_stream = async move { + sender.send( + ServiceToWorkerMsg::ListenCheckedStatements(relay_parent, tx) + ).map_err(future::Either::Left).await?; + + rx.map_err(future::Either::Right).await + }; + + receive_stream + .map(|res| match res { + Ok(s) => s.left_stream(), + Err(e) => { + log::warn!( + target: "p_net", + "Polkadot network worker appears to be down: {:?}", + e, + ); + stream::pending().right_stream() + } + }) + .flatten_stream() + .boxed() + } +} + impl ParachainNetwork for Service { type Error = future::Either; type TableRouter = Router; - type BuildTableRouter = Pin>>>; + type BuildTableRouter = Pin> + Send>>; fn build_table_router( &self, @@ -887,6 +1191,36 @@ impl Collators for Service { } } +impl av_store::ErasureNetworking for Service { + type Error = future::Either; + + fn fetch_erasure_chunk(&self, candidate_hash: &Hash, index: u32) + -> Pin> + Send>> + { + let (tx, rx) = oneshot::channel(); + let mut sender = self.sender.clone(); + + let candidate_hash = *candidate_hash; + Box::pin(async move { + sender.send( + ServiceToWorkerMsg::FetchErasureChunk(candidate_hash, index, tx) + ).map_err(future::Either::Left).await?; + + rx.map_err(future::Either::Right).await + }) + } + + fn distribute_erasure_chunk( + &self, + candidate_hash: Hash, + chunk: ErasureChunk, + ) { + let _ = self.sender.clone().try_send( + ServiceToWorkerMsg::DistributeErasureChunk(candidate_hash, chunk) + ); + } +} + /// Errors when interacting with the statement router. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum RouterError { @@ -909,7 +1243,7 @@ impl TableRouter for Router { pov_block: PoVBlock, chunks: (ValidatorIndex, &[ErasureChunk]), ) -> Self::SendLocalCollation { - let message = ServiceToWorkerMsg::LocalCollation( + let message = ServiceToWorkerMsg::SubmitValidatedCollation( self.inner.relay_parent.clone(), receipt, pov_block, diff --git a/service/src/lib.rs b/service/src/lib.rs index 45f56607b55d..d670ca01ab20 100644 --- a/service/src/lib.rs +++ b/service/src/lib.rs @@ -22,10 +22,8 @@ use sc_client::LongestChain; use std::sync::Arc; use std::time::Duration; use polkadot_primitives::{parachain, Hash, BlockId, AccountId, Nonce, Balance}; -use polkadot_network::legacy::{ - gossip::{self as network_gossip, Known}, - validation::ValidationNetwork, -}; +use polkadot_network::legacy::gossip::Known; +use polkadot_network::{protocol as network_protocol, PolkadotProtocol as StubSpecialization}; use service::{error::{Error as ServiceError}, ServiceBuilder}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use inherents::InherentDataProviders; @@ -42,7 +40,6 @@ pub use sc_client_api::backend::Backend; pub use sp_api::{Core as CoreApi, ConstructRuntimeApi, ProvideRuntimeApi, StateBackend}; pub use sp_runtime::traits::HashFor; pub use consensus_common::SelectChain; -pub use polkadot_network::legacy::PolkadotProtocol; pub use polkadot_primitives::parachain::{CollatorId, ParachainHost}; pub use polkadot_primitives::Block; pub use sp_runtime::traits::{Block as BlockT, self as runtime_traits, BlakeTwo256}; @@ -209,14 +206,17 @@ pub fn polkadot_new_full( authority_discovery_enabled: bool, slot_duration: u64, ) - -> Result, - SelectChain = LongestChain, Block>, - CallExecutor = TFullCallExecutor, - >, ServiceError> + -> Result<( + impl AbstractService< + Block = Block, + RuntimeApi = polkadot_runtime::RuntimeApi, + NetworkSpecialization = StubSpecialization, + Backend = TFullBackend, + SelectChain = LongestChain, Block>, + CallExecutor = TFullCallExecutor, + >, + FullNodeHandles, + ), ServiceError> { new_full(config, collating_for, max_block_data_size, authority_discovery_enabled, slot_duration) } @@ -229,18 +229,28 @@ pub fn kusama_new_full( authority_discovery_enabled: bool, slot_duration: u64, ) - -> Result, - SelectChain = LongestChain, Block>, - CallExecutor = TFullCallExecutor, - >, ServiceError> + -> Result<( + impl AbstractService< + Block = Block, + RuntimeApi = kusama_runtime::RuntimeApi, + NetworkSpecialization = StubSpecialization, + Backend = TFullBackend, + SelectChain = LongestChain, Block>, + CallExecutor = TFullCallExecutor, + >, + FullNodeHandles, + ), ServiceError> { new_full(config, collating_for, max_block_data_size, authority_discovery_enabled, slot_duration) } +/// Handles to other sub-services that full nodes instantiate, which consumers +/// of the node may use. +pub struct FullNodeHandles { + /// A handle to the Polkadot networking protocol. + pub polkadot_network: Option, +} + /// Builds a new service for a full client. pub fn new_full( config: Configuration, @@ -249,14 +259,17 @@ pub fn new_full( authority_discovery_enabled: bool, slot_duration: u64, ) - -> Result, - SelectChain = LongestChain, Block>, - CallExecutor = TFullCallExecutor, - >, ServiceError> + -> Result<( + impl AbstractService< + Block = Block, + RuntimeApi = Runtime, + NetworkSpecialization = StubSpecialization, + Backend = TFullBackend, + SelectChain = LongestChain, Block>, + CallExecutor = TFullCallExecutor, + >, + FullNodeHandles, + ), ServiceError> where Runtime: ConstructRuntimeApi> + Send + Sync + 'static, Runtime::RuntimeApi: @@ -294,7 +307,7 @@ pub fn new_full( let backend = builder.backend().clone(); let service = builder - .with_network_protocol(|_config| Ok(PolkadotProtocol::new(collating_for.clone())))? + .with_network_protocol(|_config| Ok(StubSpecialization::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _) )? @@ -305,11 +318,13 @@ pub fn new_full( let client = service.client(); let known_oracle = client.clone(); + + let mut handles = FullNodeHandles { polkadot_network: None }; let select_chain = if let Some(select_chain) = service.select_chain() { select_chain } else { info!("The node cannot start as an authority because it can't select chain."); - return Ok(service); + return Ok((service, handles)); }; let gossip_validator_select_chain = select_chain.clone(); @@ -332,11 +347,15 @@ pub fn new_full( } }; - let mut gossip_validator = network_gossip::register_validator( + let polkadot_network_service = network_protocol::start( service.network(), + network_protocol::Config { + collating_for, + }, (is_known, client.clone()), - &service.spawn_task_handle(), - ); + client.clone(), + service.spawn_task_handle(), + ).map_err(|e| format!("Could not spawn network worker: {:?}", e))?; if participates_in_consensus { let availability_store = { @@ -345,13 +364,14 @@ pub fn new_full( let mut path = PathBuf::from(db_path); path.push("availability"); - let gossip = polkadot_network::legacy::AvailabilityNetworkShim(gossip_validator.clone()); - #[cfg(not(target_os = "unknown"))] { av_store::Store::new( - av_store::Config { cache_size: None, path }, - gossip, + ::av_store::Config { + cache_size: None, + path, + }, + polkadot_network_service.clone(), )? } @@ -359,29 +379,12 @@ pub fn new_full( av_store::Store::new_in_memory(gossip) }; - { - let availability_store = availability_store.clone(); - service.network().with_spec( - |spec, _ctx| spec.register_availability_store(availability_store) - ); - } - - { - let availability_store = availability_store.clone(); - gossip_validator.register_availability_store(availability_store); - } - - // collator connections and validation network both fulfilled by this - let validation_network = ValidationNetwork::new( - gossip_validator, - service.client(), - service.spawn_task_handle(), - ); + polkadot_network_service.register_availability_store(availability_store.clone()); let (validation_service_handle, validation_service) = consensus::ServiceBuilder { client: client.clone(), - network: validation_network.clone(), - collators: validation_network, + network: polkadot_network_service.clone(), + collators: polkadot_network_service.clone(), spawner: service.spawn_task_handle(), availability_store: availability_store.clone(), select_chain: select_chain.clone(), @@ -493,41 +496,40 @@ pub fn new_full( )?; } - Ok(service) + handles.polkadot_network = Some(polkadot_network_service); + Ok((service, handles)) } /// Create a new Polkadot service for a light client. pub fn polkadot_new_light( config: Configuration, - collating_for: Option<(CollatorId, parachain::Id)>, ) -> Result, SelectChain = LongestChain, Block>, CallExecutor = TLightCallExecutor, >, ServiceError> { - new_light(config, collating_for) + new_light(config) } /// Create a new Kusama service for a light client. pub fn kusama_new_light( config: Configuration, - collating_for: Option<(CollatorId, parachain::Id)>, ) -> Result, SelectChain = LongestChain, Block>, CallExecutor = TLightCallExecutor, >, ServiceError> { - new_light(config, collating_for) + new_light(config) } // We can't use service::TLightClient due to @@ -551,12 +553,11 @@ type TLocalLightClient = Client< /// Builds a new service for a light client. pub fn new_light( config: Configuration, - collating_for: Option<(CollatorId, parachain::Id)>, ) -> Result, SelectChain = LongestChain, Block>, CallExecutor = TLightCallExecutor, @@ -619,7 +620,7 @@ where Ok((import_queue, finality_proof_request_builder)) })? - .with_network_protocol(|_config| Ok(PolkadotProtocol::new(collating_for.clone())))? + .with_network_protocol(|_config| Ok(StubSpecialization::new()))? .with_finality_proof_provider(|client, backend| Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, client)) as _) )? diff --git a/test-parachains/adder/collator/src/main.rs b/test-parachains/adder/collator/src/main.rs index dbf0de7b1b28..4f28fdc0720b 100644 --- a/test-parachains/adder/collator/src/main.rs +++ b/test-parachains/adder/collator/src/main.rs @@ -104,9 +104,9 @@ impl BuildParachainContext for AdderContext { self, _: Arc>, _: SP, - network: Arc, + network: impl Network + Clone + 'static, ) -> Result { - Ok(Self { _network: Some(network), ..self }) + Ok(Self { _network: Some(Arc::new(network)), ..self }) } } diff --git a/validation/src/pipeline.rs b/validation/src/pipeline.rs index be8e1e6fc90c..fb14a42e8f4a 100644 --- a/validation/src/pipeline.rs +++ b/validation/src/pipeline.rs @@ -137,6 +137,8 @@ pub struct FullOutput { /// should keep their chunk (by index). Other chunks do not need to be /// kept available long-term, but should be distributed to other validators. pub erasure_chunks: Vec, + /// The number of validators that were present at this validation. + pub n_validators: usize, } impl FullOutput { @@ -218,6 +220,7 @@ impl<'a> ValidatedCandidate<'a> { available_data, commitments, erasure_chunks: chunks, + n_validators, }) } } diff --git a/validation/src/shared_table/mod.rs b/validation/src/shared_table/mod.rs index 5391093d970d..eb38dae2b871 100644 --- a/validation/src/shared_table/mod.rs +++ b/validation/src/shared_table/mod.rs @@ -369,6 +369,7 @@ impl PrimedParachainWork ).await?; self.inner.availability_store.add_erasure_chunks( candidate, + full_output.n_validators as _, full_output.erasure_chunks, ).await?; @@ -593,7 +594,7 @@ mod tests { BlockData, ErasureChunk, AvailableData, }; use polkadot_erasure_coding::{self as erasure}; - use availability_store::ProvideGossipMessages; + use availability_store::ErasureNetworking; use futures::future; use futures::executor::block_on; use std::pin::Pin; @@ -605,21 +606,22 @@ mod tests { } #[derive(Clone)] - struct DummyGossipMessages; + struct DummyErasureNetworking; - impl ProvideGossipMessages for DummyGossipMessages { - fn gossip_messages_for( + impl ErasureNetworking for DummyErasureNetworking { + type Error = String; + + fn fetch_erasure_chunk( &self, - _topic: Hash - ) -> Pin + Send>> { - futures::stream::empty().boxed() + _candidate_hash: &Hash, + _index: u32, + ) -> Pin> + Send>> { + future::pending().boxed() } - fn gossip_erasure_chunk( + fn distribute_erasure_chunk( &self, - _relay_parent: Hash, _candidate_hash: Hash, - _erasure_root: Hash, _chunk: ErasureChunk, ) {} } @@ -668,7 +670,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - AvailabilityStore::new_in_memory(DummyGossipMessages), + AvailabilityStore::new_in_memory(DummyErasureNetworking), None, ); @@ -716,7 +718,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - AvailabilityStore::new_in_memory(DummyGossipMessages), + AvailabilityStore::new_in_memory(DummyErasureNetworking), None, ); @@ -741,7 +743,7 @@ mod tests { #[test] fn evaluate_makes_block_data_available() { - let store = AvailabilityStore::new_in_memory(DummyGossipMessages); + let store = AvailabilityStore::new_in_memory(DummyErasureNetworking); let relay_parent = [0; 32].into(); let para_id = 5.into(); let pov_block = pov_block_with_data(vec![1, 2, 3]); @@ -789,6 +791,7 @@ mod tests { proof: vec![], }).collect(), commitments: Default::default(), + n_validators, } )).validate()).unwrap(); @@ -802,7 +805,7 @@ mod tests { #[test] fn full_availability() { - let store = AvailabilityStore::new_in_memory(DummyGossipMessages); + let store = AvailabilityStore::new_in_memory(DummyErasureNetworking); let relay_parent = [0; 32].into(); let para_id = 5.into(); let pov_block = pov_block_with_data(vec![1, 2, 3]); @@ -853,6 +856,7 @@ mod tests { proof: vec![], }).collect(), commitments: Default::default(), + n_validators, } )).validate()).unwrap(); @@ -886,7 +890,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - AvailabilityStore::new_in_memory(DummyGossipMessages), + AvailabilityStore::new_in_memory(DummyErasureNetworking), None, ); @@ -945,7 +949,7 @@ mod tests { groups, Some(local_key.clone()), parent_hash, - AvailabilityStore::new_in_memory(DummyGossipMessages), + AvailabilityStore::new_in_memory(DummyErasureNetworking), None, ); diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index dfd193684e5a..edfd54750f63 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -397,6 +397,7 @@ impl ParachainValidationInstances where commitments, erasure_chunks, available_data, + .. } = full_output; let receipt = collation_info.into_receipt(commitments); @@ -421,6 +422,7 @@ impl ParachainValidationInstances where } if let Err(e) = av_clone.clone().add_erasure_chunks( receipt_clone, + n_validators as _, erasure_chunks_clone, ).await { warn!(target: "validation", "Failed to add erasure chunks: {}", e);