diff --git a/core/client/src/client.rs b/core/client/src/client.rs index d0a53802fad7a..8ae21ca6ab4e0 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -47,7 +47,7 @@ use state_machine::{ }; use executor::{RuntimeVersion, RuntimeInfo}; use consensus::{ - Error as ConsensusError, BlockImportParams, + Error as ConsensusError, BlockStatus, BlockImportParams, ImportResult, BlockOrigin, ForkChoiceStrategy, well_known_cache_keys::Id as CacheKeyId, SelectChain, self, @@ -171,21 +171,6 @@ pub struct ClientInfo { pub used_state_cache_size: Option, } -/// Block status. -#[derive(Debug, PartialEq, Eq)] -pub enum BlockStatus { - /// Added to the import queue. - Queued, - /// Already in the blockchain and the state is available. - InChainWithState, - /// In the blockchain, but the state is not available. - InChainPruned, - /// Block or parent is known to be bad. - KnownBad, - /// Not in the queue or the blockchain. - Unknown, -} - /// Summary of an imported block #[derive(Clone, Debug)] pub struct BlockImportNotification { @@ -1187,10 +1172,7 @@ impl Client where Ok(()) } - fn notify_imported( - &self, - notify_import: ImportSummary, - ) -> error::Result<()> { + fn notify_imported(&self, notify_import: ImportSummary) -> error::Result<()> { if let Some(storage_changes) = notify_import.storage_changes { // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? self.storage_notifications.lock() @@ -1941,6 +1923,16 @@ pub mod utils { } } +impl consensus::block_validation::Chain for Client + where BE: backend::Backend, + E: CallExecutor, + B: BlockT +{ + fn block_status(&self, id: &BlockId) -> Result> { + Client::block_status(self, id).map_err(|e| Box::new(e) as Box<_>) + } +} + #[cfg(test)] pub(crate) mod tests { use std::collections::HashMap; diff --git a/core/client/src/lib.rs b/core/client/src/lib.rs index 3919b3970f889..5a6d65f7d0824 100644 --- a/core/client/src/lib.rs +++ b/core/client/src/lib.rs @@ -113,7 +113,7 @@ pub use crate::call_executor::{CallExecutor, LocalCallExecutor}; pub use crate::client::{ new_with_backend, new_in_mem, - BlockBody, BlockStatus, ImportNotifications, FinalityNotifications, BlockchainEvents, + BlockBody, ImportNotifications, FinalityNotifications, BlockchainEvents, BlockImportNotification, Client, ClientInfo, ExecutionStrategies, FinalityNotification, LongestChain, BlockOf, ProvideUncles, utils, apply_aux, diff --git a/core/consensus/common/src/block_validation.rs b/core/consensus/common/src/block_validation.rs new file mode 100644 index 0000000000000..be181b05c6f4d --- /dev/null +++ b/core/consensus/common/src/block_validation.rs @@ -0,0 +1,66 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. +// +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Block announcement validation. + +use crate::BlockStatus; +use sr_primitives::{generic::BlockId, traits::Block}; +use std::{error::Error, sync::Arc}; + +/// A type which provides access to chain information. +pub trait Chain { + /// Retrieve the status of the block denoted by the given [`BlockId`]. + fn block_status(&self, id: &BlockId) -> Result>; +} + +impl, B: Block> Chain for Arc { + fn block_status(&self, id: &BlockId) -> Result> { + (&**self).block_status(id) + } +} + +/// Result of `BlockAnnounceValidator::validate`. +#[derive(Debug, PartialEq, Eq)] +pub enum Validation { + /// Valid block announcement. + Success, + /// Invalid block announcement. + Failure, +} + +/// Type which checks incoming block announcements. +pub trait BlockAnnounceValidator { + /// Validate the announced header and its associated data. + fn validate(&mut self, header: &B::Header, data: &[u8]) -> Result>; +} + +/// Default implementation of `BlockAnnounceValidator`. +#[derive(Debug)] +pub struct DefaultBlockAnnounceValidator { + chain: C +} + +impl DefaultBlockAnnounceValidator { + pub fn new(chain: C) -> Self { + Self { chain } + } +} + +impl> BlockAnnounceValidator for DefaultBlockAnnounceValidator { + fn validate(&mut self, _h: &B::Header, _d: &[u8]) -> Result> { + Ok(Validation::Success) + } +} diff --git a/core/consensus/common/src/lib.rs b/core/consensus/common/src/lib.rs index 3fd0a0c694c7e..4c5797219d2a5 100644 --- a/core/consensus/common/src/lib.rs +++ b/core/consensus/common/src/lib.rs @@ -35,6 +35,7 @@ use sr_primitives::traits::{Block as BlockT, DigestFor}; use futures::prelude::*; pub use inherents::InherentData; +pub mod block_validation; pub mod offline_tracker; pub mod error; pub mod block_import; @@ -52,6 +53,21 @@ pub use block_import::{ }; pub use select_chain::SelectChain; +/// Block status. +#[derive(Debug, PartialEq, Eq)] +pub enum BlockStatus { + /// Added to the import queue. + Queued, + /// Already in the blockchain and the state is available. + InChainWithState, + /// In the blockchain, but the state is not available. + InChainPruned, + /// Block or parent is known to be bad. + KnownBad, + /// Not in the queue or the blockchain. + Unknown, +} + /// Environment producer for a Consensus instance. Creates proposer instance and communication streams. pub trait Environment { /// The proposer type this creates. diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 7185e93208730..652c33c0262a5 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -127,7 +127,7 @@ pub trait Network: Clone + Send + 'static { fn report(&self, who: network::PeerId, cost_benefit: i32); /// Inform peers that a block with given hash should be downloaded. - fn announce(&self, block: Block::Hash); + fn announce(&self, block: Block::Hash, associated_data: Vec); } /// Create a unique topic for a round and set-id combo. @@ -197,8 +197,8 @@ impl Network for Arc> where self.report_peer(who, cost_benefit) } - fn announce(&self, block: B::Hash) { - self.announce_block(block) + fn announce(&self, block: B::Hash, associated_data: Vec) { + self.announce_block(block, associated_data) } } @@ -727,7 +727,7 @@ impl> Sink for OutgoingMessages ); // send the target block hash to the background block announcer - self.announce_sender.send(target_hash); + self.announce_sender.send(target_hash, Vec::new()); // propagate the message to peers let topic = round_topic::(self.round, self.set_id); diff --git a/core/finality-grandpa/src/communication/periodic.rs b/core/finality-grandpa/src/communication/periodic.rs index 333178dec2343..81c18891d03b5 100644 --- a/core/finality-grandpa/src/communication/periodic.rs +++ b/core/finality-grandpa/src/communication/periodic.rs @@ -118,7 +118,7 @@ pub(super) fn neighbor_packet_worker(net: N) -> ( /// A background worker for performing block announcements. struct BlockAnnouncer { net: N, - block_rx: mpsc::UnboundedReceiver, + block_rx: mpsc::UnboundedReceiver<(B::Hash, Vec)>, latest_voted_blocks: VecDeque, reannounce_after: Duration, delay: Delay, @@ -199,8 +199,8 @@ impl> Future for BlockAnnouncer { match self.block_rx.poll().expect("unbounded receivers do not error; qed") { Async::Ready(None) => return Ok(Async::Ready(())), Async::Ready(Some(block)) => { - if self.note_block(block) { - self.net.announce(block); + if self.note_block(block.0) { + self.net.announce(block.0, block.1); self.reset_delay(); } }, @@ -229,7 +229,7 @@ impl> Future for BlockAnnouncer { ); for block in self.latest_voted_blocks.iter() { - self.net.announce(*block); + self.net.announce(*block, Vec::new()); } }, Ok(Async::NotReady) => return Ok(Async::NotReady), @@ -240,15 +240,12 @@ impl> Future for BlockAnnouncer { /// A sender used to send block hashes to announce to a background job. #[derive(Clone)] -pub(super) struct BlockAnnounceSender(mpsc::UnboundedSender); +pub(super) struct BlockAnnounceSender(mpsc::UnboundedSender<(B::Hash, Vec)>); impl BlockAnnounceSender { /// Send a block hash for the background worker to announce. - pub fn send( - &self, - block: B::Hash, - ) { - if let Err(err) = self.0.unbounded_send(block) { + pub fn send(&self, block: B::Hash, associated_data: Vec) { + if let Err(err) = self.0.unbounded_send((block, associated_data)) { debug!(target: "afg", "Failed to send block to background announcer: {:?}", err); } } diff --git a/core/finality-grandpa/src/communication/tests.rs b/core/finality-grandpa/src/communication/tests.rs index 28de0e538c84a..6215e30b80966 100644 --- a/core/finality-grandpa/src/communication/tests.rs +++ b/core/finality-grandpa/src/communication/tests.rs @@ -88,7 +88,7 @@ impl super::Network for TestNetwork { } /// Inform peers that a block with given hash should be downloaded. - fn announce(&self, block: Hash) { + fn announce(&self, block: Hash, _associated_data: Vec) { let _ = self.sender.unbounded_send(Event::Announce(block)); } } @@ -559,7 +559,7 @@ fn periodically_reannounce_voted_blocks_on_stall() { for _ in 0..=12 { let hash = Hash::random(); hashes.lock().push(hash); - announce_sender.send(hash); + announce_sender.send(hash, Vec::new()); } // we should see an event for each of those announcements diff --git a/core/network/src/chain.rs b/core/network/src/chain.rs index b7465685ae8c1..d23068179ff55 100644 --- a/core/network/src/chain.rs +++ b/core/network/src/chain.rs @@ -16,10 +16,10 @@ //! Blockchain access trait -use client::{self, Client as SubstrateClient, ClientInfo, BlockStatus, CallExecutor}; +use client::{self, Client as SubstrateClient, ClientInfo, CallExecutor}; use client::error::Error; use client::light::fetcher::ChangesProof; -use consensus::{BlockImport, Error as ConsensusError}; +use consensus::{BlockImport, BlockStatus, Error as ConsensusError}; use sr_primitives::traits::{Block as BlockT, Header as HeaderT}; use sr_primitives::generic::{BlockId}; use sr_primitives::Justification; diff --git a/core/network/src/config.rs b/core/network/src/config.rs index 7392a4aaf7b4e..0582fcb300f1e 100644 --- a/core/network/src/config.rs +++ b/core/network/src/config.rs @@ -26,7 +26,7 @@ use crate::chain::{Client, FinalityProofProvider}; use crate::on_demand_layer::OnDemand; use crate::service::{ExHashT, TransactionPool}; use bitflags::bitflags; -use consensus::import_queue::ImportQueue; +use consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; use sr_primitives::traits::{Block as BlockT}; use std::sync::Arc; use libp2p::identity::{Keypair, secp256k1, ed25519}; @@ -80,6 +80,9 @@ pub struct Params { /// Customization of the network. Use this to plug additional networking capabilities. pub specialization: S, + + /// Type to check incoming block announcements. + pub block_announce_validator: Box + Send> } bitflags! { diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index ea993bdce0b38..b2cea2cd5c82b 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -23,14 +23,17 @@ use libp2p::core::{ConnectedPoint, nodes::Substream, muxing::StreamMuxerBox}; use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use primitives::storage::StorageKey; -use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin}; +use consensus::{ + BlockOrigin, + block_validation::BlockAnnounceValidator, + import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin} +}; use sr_primitives::{generic::BlockId, ConsensusEngineId, Justification}; use sr_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub, SaturatedConversion }; -use consensus::import_queue::{BlockImportResult, BlockImportError}; -use message::{BlockAttributes, Direction, FromBlock, Message, RequestId}; +use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId}; use message::generic::{Message as GenericMessage, ConsensusMessage}; use event::Event; use consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; @@ -366,9 +369,16 @@ impl, H: ExHashT> Protocol { finality_proof_request_builder: Option>, protocol_id: ProtocolId, peerset_config: peerset::PeersetConfig, + block_announce_validator: Box + Send> ) -> error::Result<(Protocol, peerset::PeersetHandle)> { let info = chain.info(); - let sync = ChainSync::new(config.roles, chain.clone(), &info, finality_proof_request_builder); + let sync = ChainSync::new( + config.roles, + chain.clone(), + &info, + finality_proof_request_builder, + block_announce_validator, + ); let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); let behaviour = LegacyProto::new(protocol_id, versions, peerset); @@ -376,7 +386,7 @@ impl, H: ExHashT> Protocol { let protocol = Protocol { tick_timeout: Box::new(futures_timer::Interval::new(TICK_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()), propagate_timeout: Box::new(futures_timer::Interval::new(PROPAGATE_TIMEOUT).map(|v| Ok::<_, ()>(v)).compat()), - config: config, + config, context_data: ContextData { peers: HashMap::new(), chain, @@ -384,7 +394,7 @@ impl, H: ExHashT> Protocol { light_dispatch: LightDispatch::new(checker), genesis_hash: info.chain.genesis_hash, sync, - specialization: specialization, + specialization, consensus_gossip: ConsensusGossip::new(), handshaking_peers: HashMap::new(), transaction_pool, @@ -1010,7 +1020,7 @@ impl, H: ExHashT> Protocol { /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - pub fn announce_block(&mut self, hash: B::Hash) { + pub fn announce_block(&mut self, hash: B::Hash, data: Vec) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { @@ -1030,10 +1040,10 @@ impl, H: ExHashT> Protocol { let is_best = self.context_data.chain.info().chain.best_hash == hash; debug!(target: "sync", "Reannouncing block {:?}", hash); - self.send_announcement(&header, is_best, true) + self.send_announcement(&header, data, is_best, true) } - fn send_announcement(&mut self, header: &B::Header, is_best: bool, force: bool) { + fn send_announcement(&mut self, header: &B::Header, data: Vec, is_best: bool, force: bool) { let hash = header.hash(); for (who, ref mut peer) in self.context_data.peers.iter_mut() { @@ -1050,7 +1060,12 @@ impl, H: ExHashT> Protocol { } } else { None - } + }, + data: if peer.info.protocol_version >= 4 { + Some(data.clone()) + } else { + None + }, }); self.behaviour.send_packet(who, message) @@ -1074,29 +1089,22 @@ impl, H: ExHashT> Protocol { self.send_message(who, GenericMessage::Status(status)) } - fn on_block_announce( - &mut self, - who: PeerId, - announce: message::BlockAnnounce - ) -> CustomMessageOutcome { - let header = announce.header; - let hash = header.hash(); - { - if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { - peer.known_blocks.insert(hash.clone()); - } + fn on_block_announce(&mut self, who: PeerId, announce: BlockAnnounce) -> CustomMessageOutcome { + let hash = announce.header.hash(); + if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { + peer.known_blocks.insert(hash.clone()); } self.light_dispatch.update_best_number(LightDispatchIn { behaviour: &mut self.behaviour, peerset: self.peerset_handle.clone(), - }, who.clone(), *header.number()); + }, who.clone(), *announce.header.number()); let is_their_best = match announce.state.unwrap_or(message::BlockState::Best) { message::BlockState::Best => true, message::BlockState::Normal => false, }; - match self.sync.on_block_announce(who.clone(), hash, &header, is_their_best) { + match self.sync.on_block_announce(who.clone(), hash, &announce, is_their_best) { sync::OnBlockAnnounce::Request(peer, req) => { self.send_message(peer, GenericMessage::BlockRequest(req)); return CustomMessageOutcome::None @@ -1131,7 +1139,7 @@ impl, H: ExHashT> Protocol { blocks: vec![ message::generic::BlockData { hash: hash, - header: Some(header), + header: Some(announce.header), body: None, receipt: None, message_queue: None, @@ -1156,7 +1164,7 @@ impl, H: ExHashT> Protocol { /// Call this when a block has been imported in the import queue and we should announce it on /// the network. - pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, is_best: bool) { + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header, data: Vec, is_best: bool) { if is_best { self.sync.update_chain_info(header); } @@ -1172,7 +1180,7 @@ impl, H: ExHashT> Protocol { } // send out block announcements - self.send_announcement(&header, is_best, false); + self.send_announcement(header, data, is_best, false); } /// Call this when a block has been finalized. The sync layer may have some additional diff --git a/core/network/src/protocol/message.rs b/core/network/src/protocol/message.rs index 2faa68a4e2002..6560ed0f13a52 100644 --- a/core/network/src/protocol/message.rs +++ b/core/network/src/protocol/message.rs @@ -273,6 +273,8 @@ pub mod generic { pub header: H, /// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common. pub state: Option, + /// Data associated with this block announcement, e.g. a candidate message. + pub data: Option>, } // Custom Encode/Decode impl to maintain backwards compatibility with v3. @@ -284,6 +286,9 @@ pub mod generic { if let Some(state) = &self.state { state.encode_to(dest); } + if let Some(data) = &self.data { + data.encode_to(dest) + } } } @@ -291,9 +296,11 @@ pub mod generic { fn decode(input: &mut I) -> Result { let header = H::decode(input)?; let state = BlockState::decode(input).ok(); + let data = Vec::decode(input).ok(); Ok(BlockAnnounce { header, state, + data, }) } } diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index 52e88034a385d..cf7d899d6dc7b 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -28,11 +28,15 @@ //! use blocks::BlockCollection; -use client::{BlockStatus, ClientInfo, error::Error as ClientError}; -use consensus::{BlockOrigin, import_queue::{IncomingBlock, BlockImportResult, BlockImportError}}; +use client::{ClientInfo, error::Error as ClientError}; +use consensus::{BlockOrigin, BlockStatus, + block_validation::{BlockAnnounceValidator, Validation}, + import_queue::{IncomingBlock, BlockImportResult, BlockImportError} +}; use crate::{ config::{Roles, BoxFinalityProofRequestBuilder}, - message::{self, generic::FinalityProofRequest, BlockAttributes, BlockRequest, BlockResponse, FinalityProofResponse}, + message::{self, generic::FinalityProofRequest, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse, + FinalityProofResponse}, protocol }; use either::Either; @@ -122,6 +126,8 @@ pub struct ChainSync { request_builder: Option>, /// A flag that caches idle state with no pending requests. is_idle: bool, + /// A type to check incoming block announcements. + block_announce_validator: Box + Send> } /// All the data we have about a Peer that we are trying to sync with @@ -271,7 +277,8 @@ impl ChainSync { role: Roles, client: Arc>, info: &ClientInfo, - request_builder: Option> + request_builder: Option>, + block_announce_validator: Box + Send> ) -> Self { let mut required_block_attributes = BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION; @@ -293,6 +300,7 @@ impl ChainSync { best_importing_number: Zero::zero(), request_builder, is_idle: false, + block_announce_validator, } } @@ -885,9 +893,10 @@ impl ChainSync { /// header (call `on_block_data`). The network request isn't sent /// in this case. Both hash and header is passed as an optimization /// to avoid rehashing the header. - pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, header: &B::Header, is_best: bool) + pub fn on_block_announce(&mut self, who: PeerId, hash: B::Hash, announce: &BlockAnnounce, is_best: bool) -> OnBlockAnnounce { + let header = &announce.header; let number = *header.number(); debug!(target: "sync", "Received block announcement with number {:?}", number); if number.is_zero() { @@ -932,6 +941,20 @@ impl ChainSync { return OnBlockAnnounce::Nothing } + // Let external validator check the block announcement. + let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice()); + match self.block_announce_validator.validate(&header, assoc_data) { + Ok(Validation::Success) => (), + Ok(Validation::Failure) => { + debug!(target: "sync", "block announcement validation of block {} from {} failed", hash, who); + return OnBlockAnnounce::Nothing + } + Err(e) => { + error!(target: "sync", "block announcement validation errored: {}", e); + return OnBlockAnnounce::Nothing + } + } + // stale block case let requires_additional_data = !self.role.is_light(); if number <= self.best_queued_number { diff --git a/core/network/src/service.rs b/core/network/src/service.rs index a1cba0395e360..3ca7bffdb45c5 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -113,9 +113,7 @@ impl, H: ExHashT> NetworkWorker /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new( - params: Params, - ) -> Result, Error> { + pub fn new(params: Params) -> Result, Error> { let (to_worker, from_worker) = mpsc::unbounded(); if let Some(ref path) = params.network_config.net_config_path { @@ -178,6 +176,7 @@ impl, H: ExHashT> NetworkWorker params.finality_proof_request_builder, params.protocol_id, peerset_config, + params.block_announce_validator )?; // Build the swarm. @@ -297,8 +296,8 @@ impl, H: ExHashT> NetworkWorker } /// You must call this when a new block is imported by the client. - pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, is_best: bool) { - self.network_service.user_protocol_mut().on_block_imported(hash, &header, is_best); + pub fn on_block_imported(&mut self, hash: B::Hash, header: B::Header, data: Vec, is_best: bool) { + self.network_service.user_protocol_mut().on_block_imported(hash, &header, data, is_best); } /// You must call this when a new block is finalized by the client. @@ -394,8 +393,8 @@ impl, H: ExHashT> NetworkServic /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. This function forces such an announcement. - pub fn announce_block(&self, hash: B::Hash) { - let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash)); + pub fn announce_block(&self, hash: B::Hash, data: Vec) { + let _ = self.to_worker.unbounded_send(ServerToWorkerMsg::AnnounceBlock(hash, data)); } /// Send a consensus message through the gossip @@ -580,7 +579,7 @@ impl NetworkStateInfo for NetworkService enum ServerToWorkerMsg> { PropagateExtrinsics, RequestJustification(B::Hash, NumberFor), - AnnounceBlock(B::Hash), + AnnounceBlock(B::Hash, Vec), ExecuteWithSpec(Box) + Send>), ExecuteWithGossip(Box, &mut dyn Context) + Send>), GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), @@ -653,8 +652,8 @@ impl, H: ExHashT> Stream for Ne } ServerToWorkerMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => self.network_service.user_protocol_mut().gossip_consensus_message(topic, engine_id, message, recipient), - ServerToWorkerMsg::AnnounceBlock(hash) => - self.network_service.user_protocol_mut().announce_block(hash), + ServerToWorkerMsg::AnnounceBlock(hash, data) => + self.network_service.user_protocol_mut().announce_block(hash, data), ServerToWorkerMsg::RequestJustification(hash, number) => self.network_service.user_protocol_mut().request_justification(&hash, number), ServerToWorkerMsg::PropagateExtrinsics => diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 5fd67acd1b625..ae3163380a0f0 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -35,6 +35,7 @@ use client::error::Result as ClientResult; use client::block_builder::BlockBuilder; use client::backend::{AuxStore, Backend, Finalizer}; use crate::config::Roles; +use consensus::block_validation::DefaultBlockAnnounceValidator; use consensus::import_queue::BasicQueue; use consensus::import_queue::{ BoxBlockImport, BoxJustificationImport, Verifier, BoxFinalityProofImport, @@ -252,8 +253,8 @@ impl> Peer { } /// Announces an important block on the network. - pub fn announce_block(&self, hash: ::Hash) { - self.network.service().announce_block(hash); + pub fn announce_block(&self, hash: ::Hash, data: Vec) { + self.network.service().announce_block(hash, data); } /// Add blocks to the peer -- edit the block before adding @@ -300,11 +301,11 @@ impl> Peer { Default::default() }; self.block_import.import_block(import_block, cache).expect("block_import failed"); - self.network.on_block_imported(hash, header, true); + self.network.on_block_imported(hash, header, Vec::new(), true); at = hash; } - self.network.service().announce_block(at.clone()); + self.network.service().announce_block(at.clone(), Vec::new()); at } @@ -537,6 +538,7 @@ pub trait TestNetFactory: Sized { protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), import_queue, specialization: self::SpecializationFactory::create(), + block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())) }).unwrap(); self.mut_peers(|peers| { @@ -600,6 +602,7 @@ pub trait TestNetFactory: Sized { protocol_id: ProtocolId::from(&b"test-protocol-name"[..]), import_queue, specialization: self::SpecializationFactory::create(), + block_announce_validator: Box::new(DefaultBlockAnnounceValidator::new(client.clone())) }).unwrap(); self.mut_peers(|peers| { @@ -662,7 +665,7 @@ pub trait TestNetFactory: Sized { // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = peer.imported_blocks_stream.poll() { - peer.network.on_block_imported(notification.hash, notification.header, true); + peer.network.on_block_imported(notification.hash, notification.header, Vec::new(), true); } // We poll `finality_notification_stream`, but we only take the last event. diff --git a/core/network/src/test/sync.rs b/core/network/src/test/sync.rs index 6cba21c1719bd..d50190f6573ff 100644 --- a/core/network/src/test/sync.rs +++ b/core/network/src/test/sync.rs @@ -444,7 +444,7 @@ fn can_sync_small_non_best_forks() { assert!(net.peer(0).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); assert!(!net.peer(1).client().header(&BlockId::Hash(small_hash)).unwrap().is_some()); - net.peer(0).announce_block(small_hash); + net.peer(0).announce_block(small_hash, Vec::new()); // after announcing, peer 1 downloads the block. @@ -499,7 +499,7 @@ fn light_peer_imports_header_from_announce() { let mut runtime = current_thread::Runtime::new().unwrap(); fn import_with_announce(net: &mut TestNet, runtime: &mut current_thread::Runtime, hash: H256) { - net.peer(0).announce_block(hash); + net.peer(0).announce_block(hash, Vec::new()); runtime.block_on(futures::future::poll_fn::<(), (), _>(|| { net.poll(); diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 431776b31e9b4..cf84df3ef6b1d 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -190,6 +190,9 @@ macro_rules! new_impl { network::config::ProtocolId::from(protocol_id_full) }; + let block_announce_validator = + Box::new(consensus_common::block_validation::DefaultBlockAnnounceValidator::new(client.clone())); + let network_params = network::config::Params { roles: $config.roles, network_config: $config.network.clone(), @@ -201,6 +204,7 @@ macro_rules! new_impl { import_queue, protocol_id, specialization: network_protocol, + block_announce_validator, }; let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); @@ -682,7 +686,7 @@ fn build_network_future< // We poll `imported_blocks_stream`. while let Ok(Async::Ready(Some(notification))) = imported_blocks_stream.poll() { - network.on_block_imported(notification.hash, notification.header, notification.is_new_best); + network.on_block_imported(notification.hash, notification.header, Vec::new(), notification.is_new_best); } // We poll `finality_notification_stream`, but we only take the last event.