diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 0e87976b8ebe7..8389c95ea8cee 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -1437,7 +1437,7 @@ impl CallRuntimeAt for Client where } } -impl consensus::BlockImport for Client where +impl<'a, B, E, Block, RA> consensus::BlockImport for &'a Client where B: backend::Backend, E: CallExecutor + Clone + Send + Sync, Block: BlockT, @@ -1447,7 +1447,7 @@ impl consensus::BlockImport for Client /// Import a checked and validated block. If a justification is provided in /// `ImportBlock` then `finalized` *must* be true. fn import_block( - &self, + &mut self, import_block: ImportBlock, new_cache: HashMap>, ) -> Result { @@ -1458,7 +1458,7 @@ impl consensus::BlockImport for Client /// Check block preconditions. fn check_block( - &self, + &mut self, hash: Block::Hash, parent_hash: Block::Hash, ) -> Result { @@ -1482,6 +1482,30 @@ impl consensus::BlockImport for Client } } +impl consensus::BlockImport for Client where + B: backend::Backend, + E: CallExecutor + Clone + Send + Sync, + Block: BlockT, +{ + type Error = ConsensusError; + + fn import_block( + &mut self, + import_block: ImportBlock, + new_cache: HashMap>, + ) -> Result { + (&*self).import_block(import_block, new_cache) + } + + fn check_block( + &mut self, + hash: Block::Hash, + parent_hash: Block::Hash, + ) -> Result { + (&*self).check_block(hash, parent_hash) + } +} + impl CurrentHeight for Client where B: backend::Backend, E: CallExecutor, diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index 1ac5c2c1b9817..4ea7dc51857cd 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -36,8 +36,8 @@ use consensus_common::{self, BlockImport, Environment, Proposer, SelectChain, well_known_cache_keys::{self, Id as CacheKeyId} }; use consensus_common::import_queue::{ - Verifier, BasicQueue, SharedBlockImport, SharedJustificationImport, SharedFinalityProofImport, - SharedFinalityProofRequestBuilder, + Verifier, BasicQueue, BoxBlockImport, BoxJustificationImport, BoxFinalityProofImport, + BoxFinalityProofRequestBuilder, }; use client::{ block_builder::api::BlockBuilder as BlockBuilderApi, @@ -54,6 +54,7 @@ use primitives::Pair; use inherents::{InherentDataProviders, InherentData}; use futures::{Future, IntoFuture, future}; +use parking_lot::Mutex; use tokio_timer::Timeout; use log::{error, warn, debug, info, trace}; @@ -134,7 +135,7 @@ pub fn start_aura( local_key: Arc

, client: Arc, select_chain: SC, - block_import: Arc, + block_import: I, env: Arc, sync_oracle: SO, inherent_data_providers: InherentDataProviders, @@ -157,7 +158,7 @@ pub fn start_aura( { let worker = AuraWorker { client: client.clone(), - block_import, + block_import: Arc::new(Mutex::new(block_import)), env, local_key, sync_oracle: sync_oracle.clone(), @@ -179,7 +180,7 @@ pub fn start_aura( struct AuraWorker { client: Arc, - block_import: Arc, + block_import: Arc>, env: Arc, local_key: Arc

, sync_oracle: SO, @@ -339,7 +340,7 @@ impl SlotWorker for AuraWorker w "hash_previously" => ?header_hash ); - if let Err(e) = block_import.import_block(import_block, Default::default()) { + if let Err(e) = block_import.lock().import_block(import_block, Default::default()) { warn!(target: "aura", "Error with block built on {:?}: {:?}", parent_hash, e); telemetry!(CONSENSUS_WARN; "aura.err_with_block_built_on"; @@ -673,10 +674,10 @@ fn register_aura_inherent_data_provider( /// Start an import queue for the Aura consensus algorithm. pub fn import_queue( slot_duration: SlotDuration, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, - finality_proof_request_builder: Option>, + block_import: BoxBlockImport, + justification_import: Option>, + finality_proof_import: Option>, + finality_proof_request_builder: Option>, client: Arc, inherent_data_providers: InherentDataProviders, ) -> Result, consensus_common::Error> where diff --git a/core/consensus/babe/src/lib.rs b/core/consensus/babe/src/lib.rs index 853b1c91bf1b6..bcbae57cb1707 100644 --- a/core/consensus/babe/src/lib.rs +++ b/core/consensus/babe/src/lib.rs @@ -32,8 +32,8 @@ pub use digest::{BabePreDigest, BABE_VRF_PREFIX}; pub use babe_primitives::*; pub use consensus_common::SyncOracle; use consensus_common::import_queue::{ - SharedBlockImport, SharedJustificationImport, SharedFinalityProofImport, - SharedFinalityProofRequestBuilder, + BoxBlockImport, BoxJustificationImport, BoxFinalityProofImport, + BoxFinalityProofRequestBuilder, }; use consensus_common::well_known_cache_keys::Id as CacheKeyId; use runtime_primitives::{generic, generic::{BlockId, OpaqueDigestItemId}, Justification}; @@ -152,7 +152,7 @@ pub struct BabeParams { pub select_chain: SC, /// A block importer - pub block_import: Arc, + pub block_import: I, /// The environment pub env: Arc, @@ -200,7 +200,7 @@ pub fn start_babe(BabeParams { { let worker = BabeWorker { client: client.clone(), - block_import, + block_import: Arc::new(Mutex::new(block_import)), env, local_key, sync_oracle: sync_oracle.clone(), @@ -220,7 +220,7 @@ pub fn start_babe(BabeParams { struct BabeWorker { client: Arc, - block_import: Arc, + block_import: Arc>, env: Arc, local_key: Arc, sync_oracle: SO, @@ -397,7 +397,7 @@ impl SlotWorker for BabeWorker w "hash_previously" => ?header_hash, ); - if let Err(e) = block_import.import_block(import_block, Default::default()) { + if let Err(e) = block_import.lock().import_block(import_block, Default::default()) { warn!(target: "babe", "Error with block built on {:?}: {:?}", parent_hash, e); telemetry!(CONSENSUS_WARN; "babe.err_with_block_built_on"; @@ -829,10 +829,10 @@ fn initialize_authorities_cache(client: &C) -> Result<(), ConsensusError> /// Start an import queue for the Babe consensus algorithm. pub fn import_queue( config: Config, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, - finality_proof_request_builder: Option>, + block_import: BoxBlockImport, + justification_import: Option>, + finality_proof_import: Option>, + finality_proof_request_builder: Option>, client: Arc, inherent_data_providers: InherentDataProviders, ) -> Result<(BabeImportQueue, BabeLink), consensus_common::Error> where diff --git a/core/consensus/common/src/block_import.rs b/core/consensus/common/src/block_import.rs index 5213a5c73de8e..b0a3ce2a4945b 100644 --- a/core/consensus/common/src/block_import.rs +++ b/core/consensus/common/src/block_import.rs @@ -20,6 +20,7 @@ use runtime_primitives::traits::{Block as BlockT, DigestItemFor, Header as Heade use runtime_primitives::Justification; use std::borrow::Cow; use std::collections::HashMap; +use std::sync::Arc; use crate::well_known_cache_keys; use crate::import_queue::Verifier; @@ -175,7 +176,7 @@ pub trait BlockImport { /// Check block preconditions. fn check_block( - &self, + &mut self, hash: B::Hash, parent_hash: B::Hash, ) -> Result; @@ -184,23 +185,45 @@ pub trait BlockImport { /// /// Cached data can be accessed through the blockchain cache. fn import_block( - &self, + &mut self, block: ImportBlock, cache: HashMap>, ) -> Result; } +impl BlockImport for Arc +where for<'r> &'r T: BlockImport +{ + type Error = E; + + fn check_block( + &mut self, + hash: B::Hash, + parent_hash: B::Hash, + ) -> Result { + (&**self).check_block(hash, parent_hash) + } + + fn import_block( + &mut self, + block: ImportBlock, + cache: HashMap>, + ) -> Result { + (&**self).import_block(block, cache) + } +} + /// Justification import trait pub trait JustificationImport { type Error: ::std::error::Error + Send + 'static; /// Called by the import queue when it is started. Returns a list of justifications to request /// from the network. - fn on_start(&self) -> Vec<(B::Hash, NumberFor)> { Vec::new() } + fn on_start(&mut self) -> Vec<(B::Hash, NumberFor)> { Vec::new() } /// Import a Block justification and finalize the given block. fn import_justification( - &self, + &mut self, hash: B::Hash, number: NumberFor, justification: Justification, @@ -213,11 +236,11 @@ pub trait FinalityProofImport { /// Called by the import queue when it is started. Returns a list of finality proofs to request /// from the network. - fn on_start(&self) -> Vec<(B::Hash, NumberFor)> { Vec::new() } + fn on_start(&mut self) -> Vec<(B::Hash, NumberFor)> { Vec::new() } /// Import a Block justification and finalize the given block. Returns finalized block or error. fn import_finality_proof( - &self, + &mut self, hash: B::Hash, number: NumberFor, finality_proof: Vec, @@ -228,5 +251,5 @@ pub trait FinalityProofImport { /// Finality proof request builder. pub trait FinalityProofRequestBuilder: Send { /// Build data blob, associated with the request. - fn build_request_data(&self, hash: &B::Hash) -> Vec; + fn build_request_data(&mut self, hash: &B::Hash) -> Vec; } diff --git a/core/consensus/common/src/import_queue.rs b/core/consensus/common/src/import_queue.rs index 3987844a1e040..aa2b88c88b849 100644 --- a/core/consensus/common/src/import_queue.rs +++ b/core/consensus/common/src/import_queue.rs @@ -39,16 +39,16 @@ mod basic_queue; pub mod buffered_link; /// Shared block import struct used by the queue. -pub type SharedBlockImport = Arc + Send + Sync>; +pub type BoxBlockImport = Box + Send + Sync>; /// Shared justification import struct used by the queue. -pub type SharedJustificationImport = Arc + Send + Sync>; +pub type BoxJustificationImport = Box + Send + Sync>; /// Shared finality proof import struct used by the queue. -pub type SharedFinalityProofImport = Arc + Send + Sync>; +pub type BoxFinalityProofImport = Box + Send + Sync>; /// Shared finality proof request builder struct used by the queue. -pub type SharedFinalityProofRequestBuilder = Arc + Send + Sync>; +pub type BoxFinalityProofRequestBuilder = Box + Send + Sync>; /// Maps to the Origin used by the network. pub type Origin = libp2p::PeerId; @@ -140,7 +140,7 @@ pub trait Link: Send { /// Request a finality proof for the given block. fn request_finality_proof(&mut self, _hash: &B::Hash, _number: NumberFor) {} /// Remember finality proof request builder on start. - fn set_finality_proof_request_builder(&mut self, _request_builder: SharedFinalityProofRequestBuilder) {} + fn set_finality_proof_request_builder(&mut self, _request_builder: BoxFinalityProofRequestBuilder) {} /// Adjusts the reputation of the given peer. fn report_peer(&mut self, _who: Origin, _reputation_change: i32) {} /// Restart sync. @@ -173,7 +173,7 @@ pub enum BlockImportError { /// Single block import function. pub fn import_single_block>( - import_handle: &dyn BlockImport, + import_handle: &mut dyn BlockImport, block_origin: BlockOrigin, block: IncomingBlock, verifier: Arc, diff --git a/core/consensus/common/src/import_queue/basic_queue.rs b/core/consensus/common/src/import_queue/basic_queue.rs index b06334270c783..e56979614eaa8 100644 --- a/core/consensus/common/src/import_queue/basic_queue.rs +++ b/core/consensus/common/src/import_queue/basic_queue.rs @@ -21,8 +21,8 @@ use runtime_primitives::{Justification, traits::{Block as BlockT, Header as Head use crate::error::Error as ConsensusError; use crate::block_import::{BlockImport, BlockOrigin}; use crate::import_queue::{ - BlockImportResult, BlockImportError, Verifier, SharedBlockImport, SharedFinalityProofImport, - SharedFinalityProofRequestBuilder, SharedJustificationImport, ImportQueue, Link, Origin, + BlockImportResult, BlockImportError, Verifier, BoxBlockImport, BoxFinalityProofImport, + BoxFinalityProofRequestBuilder, BoxJustificationImport, ImportQueue, Link, Origin, IncomingBlock, import_single_block, buffered_link::{self, BufferedLinkSender, BufferedLinkReceiver} }; @@ -44,7 +44,7 @@ pub struct BasicQueue { /// Results coming from the worker task. result_port: BufferedLinkReceiver, /// Sent through the link as soon as possible. - finality_proof_request_builder: Option>, + finality_proof_request_builder: Option>, /// Since we have to be in a tokio context in order to spawn background tasks, we first store /// the task to spawn here, then extract it as soon as we are in a tokio context. /// If `Some`, contains the task to spawn in the background. If `None`, the future has already @@ -63,10 +63,10 @@ impl BasicQueue { /// finality proof importer. pub fn new>( verifier: Arc, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, - finality_proof_request_builder: Option>, + block_import: BoxBlockImport, + justification_import: Option>, + finality_proof_import: Option>, + finality_proof_request_builder: Option>, ) -> Self { let (result_sender, result_port) = buffered_link::buffered_link(); let (future, worker_sender) = BlockImportWorker::new( @@ -148,9 +148,9 @@ enum ToWorkerMsg { struct BlockImportWorker> { result_sender: BufferedLinkSender, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, + block_import: BoxBlockImport, + justification_import: Option>, + finality_proof_import: Option>, verifier: Arc, } @@ -158,9 +158,9 @@ impl> BlockImportWorker { fn new( result_sender: BufferedLinkSender, verifier: Arc, - block_import: SharedBlockImport, - justification_import: Option>, - finality_proof_import: Option>, + block_import: BoxBlockImport, + justification_import: Option>, + finality_proof_import: Option>, ) -> (impl Future + Send, mpsc::UnboundedSender>) { let (sender, mut port) = mpsc::unbounded(); @@ -172,13 +172,13 @@ impl> BlockImportWorker { finality_proof_import, }; - if let Some(justification_import) = worker.justification_import.as_ref() { + if let Some(justification_import) = worker.justification_import.as_mut() { for (hash, number) in justification_import.on_start() { worker.result_sender.request_justification(&hash, number); } } - if let Some(finality_proof_import) = worker.finality_proof_import.as_ref() { + if let Some(finality_proof_import) = worker.finality_proof_import.as_mut() { for (hash, number) in finality_proof_import.on_start() { worker.result_sender.request_finality_proof(&hash, number); } @@ -211,7 +211,7 @@ impl> BlockImportWorker { fn import_a_batch_of_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { let (imported, count, results) = import_many_blocks( - &*self.block_import, + &mut *self.block_import, origin, blocks, self.verifier.clone(), @@ -295,8 +295,9 @@ impl> BlockImportWorker { } fn import_finality_proof(&mut self, who: Origin, hash: B::Hash, number: NumberFor, finality_proof: Vec) { - let result = self.finality_proof_import.as_ref().map(|finality_proof_import| { - finality_proof_import.import_finality_proof(hash, number, finality_proof, &*self.verifier) + let verifier = &*self.verifier; + let result = self.finality_proof_import.as_mut().map(|finality_proof_import| { + finality_proof_import.import_finality_proof(hash, number, finality_proof, verifier) .map_err(|e| { debug!( "Finality proof import failed with {:?} for hash: {:?} number: {:?} coming from node: {:?}", @@ -319,7 +320,7 @@ impl> BlockImportWorker { number: NumberFor, justification: Justification ) { - let success = self.justification_import.as_ref().map(|justification_import| { + let success = self.justification_import.as_mut().map(|justification_import| { justification_import.import_justification(hash, number, justification) .map_err(|e| { debug!( @@ -340,7 +341,7 @@ impl> BlockImportWorker { /// Import several blocks at once, returning import result for each block. fn import_many_blocks>( - import_handle: &dyn BlockImport, + import_handle: &mut dyn BlockImport, blocks_origin: BlockOrigin, blocks: Vec>, verifier: Arc, diff --git a/core/consensus/common/src/import_queue/buffered_link.rs b/core/consensus/common/src/import_queue/buffered_link.rs index 6a42ddb01afab..88ccf5c24653d 100644 --- a/core/consensus/common/src/import_queue/buffered_link.rs +++ b/core/consensus/common/src/import_queue/buffered_link.rs @@ -34,7 +34,7 @@ use futures::{prelude::*, sync::mpsc}; use runtime_primitives::traits::{Block as BlockT, NumberFor}; -use crate::import_queue::{Origin, Link, SharedFinalityProofRequestBuilder}; +use crate::import_queue::{Origin, Link, BoxFinalityProofRequestBuilder}; /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer @@ -60,7 +60,7 @@ enum BlockImportWorkerMsg { RequestJustification(B::Hash, NumberFor), FinalityProofImported(Origin, (B::Hash, NumberFor), Result<(B::Hash, NumberFor), ()>), RequestFinalityProof(B::Hash, NumberFor), - SetFinalityProofRequestBuilder(SharedFinalityProofRequestBuilder), + SetFinalityProofRequestBuilder(BoxFinalityProofRequestBuilder), ReportPeer(Origin, i32), Restart, } @@ -107,7 +107,7 @@ impl Link for BufferedLinkSender { let _ = self.tx.unbounded_send(BlockImportWorkerMsg::RequestFinalityProof(hash.clone(), number)); } - fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + fn set_finality_proof_request_builder(&mut self, request_builder: BoxFinalityProofRequestBuilder) { let _ = self.tx.unbounded_send(BlockImportWorkerMsg::SetFinalityProofRequestBuilder(request_builder)); } diff --git a/core/finality-grandpa/src/import.rs b/core/finality-grandpa/src/import.rs index d447d04ebf768..ccef682fea682 100644 --- a/core/finality-grandpa/src/import.rs +++ b/core/finality-grandpa/src/import.rs @@ -63,6 +63,21 @@ pub struct GrandpaBlockImport, RA, PRA, SC> { api: Arc, } +impl, RA, PRA, SC: Clone> Clone for + GrandpaBlockImport +{ + fn clone(&self) -> Self { + GrandpaBlockImport { + inner: self.inner.clone(), + select_chain: self.select_chain.clone(), + authority_set: self.authority_set.clone(), + send_voter_commands: self.send_voter_commands.clone(), + consensus_changes: self.consensus_changes.clone(), + api: self.api.clone(), + } + } +} + impl, RA, PRA, SC> JustificationImport for GrandpaBlockImport where NumberFor: grandpa::BlockNumberOps, @@ -76,7 +91,7 @@ impl, RA, PRA, SC> JustificationImport { type Error = ConsensusError; - fn on_start(&self) -> Vec<(Block::Hash, NumberFor)> { + fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor)> { let mut out = Vec::new(); let chain_info = self.inner.info().chain; @@ -106,7 +121,7 @@ impl, RA, PRA, SC> JustificationImport } fn import_justification( - &self, + &mut self, hash: Block::Hash, number: NumberFor, justification: Justification, @@ -390,7 +405,7 @@ impl, RA, PRA, SC> BlockImport { type Error = ConsensusError; - fn import_block(&self, mut block: ImportBlock, new_cache: HashMap>) + fn import_block(&mut self, mut block: ImportBlock, new_cache: HashMap>) -> Result { let hash = block.post_header().hash(); @@ -410,7 +425,7 @@ impl, RA, PRA, SC> BlockImport // we don't want to finalize on `inner.import_block` let mut justification = block.justification.take(); let enacts_consensus_change = !new_cache.is_empty(); - let import_result = self.inner.import_block(block, new_cache); + let import_result = (&*self.inner).import_block(block, new_cache); let mut imported_aux = { match import_result { @@ -504,7 +519,7 @@ impl, RA, PRA, SC> BlockImport } fn check_block( - &self, + &mut self, hash: Block::Hash, parent_hash: Block::Hash, ) -> Result { @@ -548,7 +563,7 @@ where /// If `enacts_change` is set to true, then finalizing this block *must* /// enact an authority set change, the function will panic otherwise. fn import_justification( - &self, + &mut self, hash: Block::Hash, number: NumberFor, justification: Justification, diff --git a/core/finality-grandpa/src/light_import.rs b/core/finality-grandpa/src/light_import.rs index fcb575ec1a14c..18b6e6ff64544 100644 --- a/core/finality-grandpa/src/light_import.rs +++ b/core/finality-grandpa/src/light_import.rs @@ -27,7 +27,7 @@ use client::{ }; use parity_codec::{Encode, Decode}; use consensus_common::{ - import_queue::{Verifier, SharedFinalityProofRequestBuilder}, well_known_cache_keys, + import_queue::{Verifier, BoxFinalityProofRequestBuilder}, well_known_cache_keys, BlockOrigin, BlockImport, FinalityProofImport, ImportBlock, ImportResult, ImportedAux, Error as ConsensusError, FinalityProofRequestBuilder, }; @@ -84,6 +84,16 @@ pub struct GrandpaLightBlockImport, RA> { data: Arc>>, } +impl, RA> Clone for GrandpaLightBlockImport { + fn clone(&self) -> Self { + GrandpaLightBlockImport { + client: self.client.clone(), + authority_set_provider: self.authority_set_provider.clone(), + data: self.data.clone(), + } + } +} + /// Mutable data of light block importer. struct LightImportData> { last_finalized: Block::Hash, @@ -100,8 +110,8 @@ struct LightAuthoritySet { impl, RA> GrandpaLightBlockImport { /// Create finality proof request builder. - pub fn create_finality_proof_request_builder(&self) -> SharedFinalityProofRequestBuilder { - Arc::new(GrandpaFinalityProofRequestBuilder(self.data.clone())) as _ + pub fn create_finality_proof_request_builder(&self) -> BoxFinalityProofRequestBuilder { + Box::new(GrandpaFinalityProofRequestBuilder(self.data.clone())) as _ } } @@ -116,7 +126,7 @@ impl, RA> BlockImport type Error = ConsensusError; fn import_block( - &self, + &mut self, block: ImportBlock, new_cache: HashMap>, ) -> Result { @@ -126,7 +136,7 @@ impl, RA> BlockImport } fn check_block( - &self, + &mut self, hash: Block::Hash, parent_hash: Block::Hash, ) -> Result { @@ -144,7 +154,7 @@ impl, RA> FinalityProofImport { type Error = ConsensusError; - fn on_start(&self) -> Vec<(Block::Hash, NumberFor)> { + fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor)> { let mut out = Vec::new(); let chain_info = self.client.info().chain; @@ -159,7 +169,7 @@ impl, RA> FinalityProofImport } fn import_finality_proof( - &self, + &mut self, hash: Block::Hash, number: NumberFor, finality_proof: Vec, @@ -206,7 +216,7 @@ impl LightAuthoritySet { struct GrandpaFinalityProofRequestBuilder>(Arc>>); impl> FinalityProofRequestBuilder for GrandpaFinalityProofRequestBuilder { - fn build_request_data(&self, _hash: &B::Hash) -> Vec { + fn build_request_data(&mut self, _hash: &B::Hash) -> Vec { let data = self.0.read(); make_finality_proof_request( data.last_finalized, @@ -217,7 +227,7 @@ impl> FinalityProofRequestBuilder for GrandpaFinalityPro /// Try to import new block. fn do_import_block, RA, J>( - client: &Client, + mut client: &Client, data: &mut LightImportData, mut block: ImportBlock, new_cache: HashMap>, @@ -236,7 +246,7 @@ fn do_import_block, RA, J>( // we don't want to finalize on `inner.import_block` let justification = block.justification.take(); let enacts_consensus_change = !new_cache.is_empty(); - let import_result = client.import_block(block, new_cache); + let import_result = BlockImport::import_block(&mut client, block, new_cache); let mut imported_aux = match import_result { Ok(ImportResult::Imported(aux)) => aux, @@ -537,6 +547,19 @@ pub mod tests { pub GrandpaLightBlockImport ); + impl, RA> Clone + for NoJustificationsImport where + NumberFor: grandpa::BlockNumberOps, + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + DigestFor: Encode, + RA: Send + Sync, + { + fn clone(&self) -> Self { + NoJustificationsImport(self.0.clone()) + } + } + impl, RA> BlockImport for NoJustificationsImport where NumberFor: grandpa::BlockNumberOps, @@ -548,7 +571,7 @@ pub mod tests { type Error = ConsensusError; fn import_block( - &self, + &mut self, mut block: ImportBlock, new_cache: HashMap>, ) -> Result { @@ -557,7 +580,7 @@ pub mod tests { } fn check_block( - &self, + &mut self, hash: Block::Hash, parent_hash: Block::Hash, ) -> Result { @@ -575,12 +598,12 @@ pub mod tests { { type Error = ConsensusError; - fn on_start(&self) -> Vec<(Block::Hash, NumberFor)> { + fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor)> { self.0.on_start() } fn import_finality_proof( - &self, + &mut self, hash: Block::Hash, number: NumberFor, finality_proof: Vec, diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 9babc74a3ea6e..1d3cb30b7d077 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -30,8 +30,8 @@ use client::{ }; use test_client::{self, runtime::BlockNumber}; use consensus_common::{BlockOrigin, ForkChoiceStrategy, ImportedAux, ImportBlock, ImportResult}; -use consensus_common::import_queue::{SharedBlockImport, SharedJustificationImport, SharedFinalityProofImport, - SharedFinalityProofRequestBuilder, +use consensus_common::import_queue::{BoxBlockImport, BoxJustificationImport, BoxFinalityProofImport, + BoxFinalityProofRequestBuilder, }; use std::collections::{HashMap, HashSet}; use std::result; @@ -106,10 +106,10 @@ impl TestNetFactory for GrandpaTestNet { fn make_block_import(&self, client: PeersClient) -> ( - SharedBlockImport, - Option>, - Option>, - Option>, + BoxBlockImport, + Option>, + Option>, + Option>, PeerData, ) { @@ -124,8 +124,9 @@ impl TestNetFactory for GrandpaTestNet { Arc::new(self.test_config.clone()), select_chain, ).expect("Could not create block import for fresh peer."); - let shared_import = Arc::new(import); - (shared_import.clone(), Some(shared_import), None, None, Mutex::new(Some(link))) + let justification_import = Box::new(import.clone()); + let block_import = Box::new(import); + (block_import, Some(justification_import), None, None, Mutex::new(Some(link))) }, PeersClient::Light(ref client) => { use crate::light_import::tests::light_block_import_without_justifications; @@ -139,8 +140,9 @@ impl TestNetFactory for GrandpaTestNet { Arc::new(self.test_config.clone()) ).expect("Could not create block import for fresh peer."); let finality_proof_req_builder = import.0.create_finality_proof_request_builder(); - let shared_import = Arc::new(import); - (shared_import.clone(), None, Some(shared_import), Some(finality_proof_req_builder), Mutex::new(None)) + let proof_import = Box::new(import.clone()); + let block_import = Box::new(import); + (block_import, None, Some(proof_import), Some(finality_proof_req_builder), Mutex::new(None)) }, } } @@ -952,7 +954,7 @@ fn allows_reimporting_change_blocks() { let mut net = GrandpaTestNet::new(api.clone(), 3); let client = net.peer(0).client().clone(); - let (block_import, ..) = net.make_block_import(client.clone()); + let (mut block_import, ..) = net.make_block_import(client.clone()); let full_client = client.as_full().unwrap(); let builder = full_client.new_block_at(&BlockId::Number(0), Default::default()).unwrap(); @@ -1001,7 +1003,7 @@ fn test_bad_justification() { let mut net = GrandpaTestNet::new(api.clone(), 3); let client = net.peer(0).client().clone(); - let (block_import, ..) = net.make_block_import(client.clone()); + let (mut block_import, ..) = net.make_block_import(client.clone()); let full_client = client.as_full().expect("only full clients are used in test"); let builder = full_client.new_block_at(&BlockId::Number(0), Default::default()).unwrap(); @@ -1093,15 +1095,16 @@ fn voter_persists_its_votes() { on_exit: Exit, telemetry_on_connect: None, }; - let mut voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); - - let voter = future::poll_fn(move || { - // we need to keep the block_import alive since it owns the - // sender for the voter commands channel, if that gets dropped - // then the voter will stop - let _block_import = _block_import.clone(); - voter.poll() - }); + + let voter = run_grandpa_voter(grandpa_params) + .expect("all in order with client and network") + .then(move |r| { + // we need to keep the block_import alive since it owns the + // sender for the voter commands channel, if that gets dropped + // then the voter will stop + drop(_block_import); + r + }); voter.select2(rx.into_future()).then(|res| match res { Ok(future::Either::A(x)) => { diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index b736b4b67af95..e12dc9be9dea0 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -28,7 +28,7 @@ use runtime_primitives::traits::{ Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub, SaturatedConversion }; -use consensus::import_queue::SharedFinalityProofRequestBuilder; +use consensus::import_queue::BoxFinalityProofRequestBuilder; use message::{ BlockRequest as BlockRequestMessage, FinalityProofRequest as FinalityProofRequestMessage, Message, @@ -1245,7 +1245,7 @@ impl, H: ExHashT> Protocol { self.sync.block_imported(hash, number) } - pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { + pub fn set_finality_proof_request_builder(&mut self, request_builder: BoxFinalityProofRequestBuilder) { self.sync.set_finality_proof_request_builder(request_builder) } diff --git a/core/network/src/protocol/sync.rs b/core/network/src/protocol/sync.rs index 04780862228da..87e52fd5f30c0 100644 --- a/core/network/src/protocol/sync.rs +++ b/core/network/src/protocol/sync.rs @@ -37,7 +37,7 @@ use log::{debug, trace, warn, info, error}; use crate::protocol::PeerInfo as ProtocolPeerInfo; use libp2p::PeerId; use client::{BlockStatus, ClientInfo}; -use consensus::{BlockOrigin, import_queue::{IncomingBlock, SharedFinalityProofRequestBuilder}}; +use consensus::{BlockOrigin, import_queue::{IncomingBlock, BoxFinalityProofRequestBuilder}}; use client::error::Error as ClientError; use blocks::BlockCollection; use extra_requests::ExtraRequests; @@ -171,7 +171,7 @@ pub struct ChainSync { queue_blocks: HashSet, /// The best block number that we are currently importing best_importing_number: NumberFor, - request_builder: Option>, + request_builder: Option>, } /// Reported sync state. @@ -662,7 +662,7 @@ impl ChainSync { protocol.send_finality_proof_request(peer, message::generic::FinalityProofRequest { id: 0, block: request.0, - request: self.request_builder.as_ref() + request: self.request_builder.as_mut() .map(|builder| builder.build_request_data(&request.0)) .unwrap_or_default() }) @@ -715,7 +715,7 @@ impl ChainSync { self.extra_finality_proofs.try_finalize_root(request_block, finalization_result, true); } - pub fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { + pub fn set_finality_proof_request_builder(&mut self, builder: BoxFinalityProofRequestBuilder) { self.request_builder = Some(builder) } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index cd47b0056a929..7512500cc50d0 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -28,7 +28,7 @@ use std::{collections::HashMap, fs, marker::PhantomData, io, path::Path}; use std::sync::{Arc, atomic::{AtomicBool, AtomicUsize, Ordering}}; -use consensus::import_queue::{ImportQueue, Link, SharedFinalityProofRequestBuilder}; +use consensus::import_queue::{ImportQueue, Link, BoxFinalityProofRequestBuilder}; use futures::{prelude::*, sync::mpsc}; use log::{warn, error, info}; use libp2p::core::{swarm::NetworkBehaviour, transport::boxed::Boxed, muxing::StreamMuxerBox}; @@ -683,7 +683,7 @@ impl<'a, B: BlockT, S: NetworkSpecialization, H: ExHashT> Link for Network fn restart(&mut self) { self.protocol.user_protocol_mut().restart() } - fn set_finality_proof_request_builder(&mut self, builder: SharedFinalityProofRequestBuilder) { + fn set_finality_proof_request_builder(&mut self, builder: BoxFinalityProofRequestBuilder) { self.protocol.user_protocol_mut().set_finality_proof_request_builder(builder) } } diff --git a/core/network/src/test/block_import.rs b/core/network/src/test/block_import.rs index 4155fc22613a1..0afcfa1f36c8f 100644 --- a/core/network/src/test/block_import.rs +++ b/core/network/src/test/block_import.rs @@ -46,16 +46,16 @@ fn prepare_good_block() -> (TestClient, Hash, u64, PeerId, IncomingBlock) fn import_single_good_block_works() { let (_, _hash, number, peer_id, block) = prepare_good_block(); assert_eq!( - import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), + import_single_block(&mut test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), Ok(BlockImportResult::ImportedUnknown(number, Default::default(), Some(peer_id))) ); } #[test] fn import_single_good_known_block_is_ignored() { - let (client, _hash, number, _, block) = prepare_good_block(); + let (mut client, _hash, number, _, block) = prepare_good_block(); assert_eq!( - import_single_block(&client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), + import_single_block(&mut client, BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), Ok(BlockImportResult::ImportedKnown(number)) ); } @@ -65,7 +65,7 @@ fn import_single_good_block_without_header_fails() { let (_, _, _, peer_id, mut block) = prepare_good_block(); block.header = None; assert_eq!( - import_single_block(&test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), + import_single_block(&mut test_client::new(), BlockOrigin::File, block, Arc::new(PassThroughVerifier(true))), Err(BlockImportError::IncompleteHeader(Some(peer_id))) ); } @@ -75,7 +75,7 @@ fn async_import_queue_drops() { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = Arc::new(PassThroughVerifier(true)); - let queue = BasicQueue::new(verifier, Arc::new(test_client::new()), None, None, None); + let queue = BasicQueue::new(verifier, Box::new(test_client::new()), None, None, None); drop(queue); } } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index 3f92696d27253..ad9baef41dc73 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -34,16 +34,17 @@ use client::backend::AuxStore; use crate::config::Roles; use consensus::import_queue::BasicQueue; use consensus::import_queue::{ - SharedBlockImport, SharedJustificationImport, Verifier, SharedFinalityProofImport, - SharedFinalityProofRequestBuilder, + BoxBlockImport, BoxJustificationImport, Verifier, BoxFinalityProofImport, + BoxFinalityProofRequestBuilder, }; -use consensus::block_import::BlockImport; +use consensus::block_import::{BlockImport, ImportResult}; use consensus::{Error as ConsensusError, well_known_cache_keys::{self, Id as CacheKeyId}}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; use futures::prelude::*; use crate::{NetworkWorker, NetworkService, config::ProtocolId}; use crate::config::{NetworkConfiguration, TransportConfig}; use libp2p::PeerId; +use parking_lot::Mutex; use primitives::{H256, Blake2Hasher}; use crate::protocol::{Context, ProtocolConfig}; use runtime_primitives::generic::{BlockId, OpaqueDigestItemId}; @@ -142,10 +143,10 @@ impl PeersClient { } } - pub fn as_block_import(&self) -> SharedBlockImport { + pub fn as_block_import(&self) -> BoxBlockImport { match *self { - PeersClient::Full(ref client) => client.clone() as _, - PeersClient::Light(ref client) => client.clone() as _, + PeersClient::Full(ref client) => Box::new(client.clone()) as _, + PeersClient::Light(ref client) => Box::new(client.clone()) as _, } } @@ -214,7 +215,7 @@ pub struct Peer> { verifier: Arc>, /// We keep a copy of the block_import so that we can invoke it for locally-generated blocks, /// instead of going through the import queue. - block_import: Arc>, + block_import: Box>, network: NetworkWorker::Hash>, imported_blocks_stream: futures::stream::Fuse>, finality_notification_stream: futures::stream::Fuse>, @@ -367,6 +368,33 @@ impl SpecializationFactory for DummySpecialization { } } +/// Implements `BlockImport` on an `Arc>`. Used internally. Necessary to overcome the way the +/// `TestNet` trait is designed, more specifically `make_block_import` returning a `Box` makes it +/// impossible to clone the underlying object. +struct BlockImportAdapter(Arc>>); + +impl Clone for BlockImportAdapter { + fn clone(&self) -> Self { + BlockImportAdapter(self.0.clone()) + } +} + +impl> BlockImport for BlockImportAdapter { + type Error = T::Error; + + fn check_block(&mut self, hash: Hash, parent_hash: Hash) -> Result { + self.0.lock().check_block(hash, parent_hash) + } + + fn import_block( + &mut self, + block: ImportBlock, + cache: HashMap>, + ) -> Result { + self.0.lock().import_block(block, cache) + } +} + pub trait TestNetFactory: Sized { type Specialization: NetworkSpecialization + SpecializationFactory; type Verifier: 'static + Verifier; @@ -384,10 +412,10 @@ pub trait TestNetFactory: Sized { /// Get custom block import handle for fresh client, along with peer data. fn make_block_import(&self, client: PeersClient) -> ( - SharedBlockImport, - Option>, - Option>, - Option>, + BoxBlockImport, + Option>, + Option>, + Option>, Self::PeerData, ) { @@ -422,10 +450,11 @@ pub trait TestNetFactory: Sized { let verifier = self.make_verifier(PeersClient::Full(client.clone()), config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Full(client.clone())); + let block_import = BlockImportAdapter(Arc::new(Mutex::new(block_import))); let import_queue = Box::new(BasicQueue::new( verifier.clone(), - block_import.clone(), + Box::new(block_import.clone()), justification_import, finality_proof_import, finality_proof_request_builder, @@ -462,7 +491,7 @@ pub trait TestNetFactory: Sized { client: PeersClient::Full(client), imported_blocks_stream, finality_notification_stream, - block_import, + block_import: Box::new(block_import), verifier, network, }); @@ -478,10 +507,11 @@ pub trait TestNetFactory: Sized { let verifier = self.make_verifier(PeersClient::Light(client.clone()), &config); let (block_import, justification_import, finality_proof_import, finality_proof_request_builder, data) = self.make_block_import(PeersClient::Light(client.clone())); + let block_import = BlockImportAdapter(Arc::new(Mutex::new(block_import))); let import_queue = Box::new(BasicQueue::new( verifier.clone(), - block_import.clone(), + Box::new(block_import.clone()), justification_import, finality_proof_import, finality_proof_request_builder, @@ -516,7 +546,7 @@ pub trait TestNetFactory: Sized { peers.push(Peer { data, verifier, - block_import, + block_import: Box::new(block_import), client: PeersClient::Light(client), imported_blocks_stream, finality_notification_stream, @@ -615,7 +645,7 @@ impl JustificationImport for ForceFinalized { type Error = ConsensusError; fn import_justification( - &self, + &mut self, hash: H256, _number: NumberFor, justification: Justification, @@ -656,13 +686,13 @@ impl TestNetFactory for JustificationTestNet { fn make_block_import(&self, client: PeersClient) -> ( - SharedBlockImport, - Option>, - Option>, - Option>, + BoxBlockImport, + Option>, + Option>, + Option>, Self::PeerData, ) { - (client.as_block_import(), Some(Arc::new(ForceFinalized(client))), None, None, Default::default()) + (client.as_block_import(), Some(Box::new(ForceFinalized(client))), None, None, Default::default()) } } diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index 83f0488a7e6cb..fa6b4012e9c43 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -938,9 +938,9 @@ impl network::TransactionPool, ComponentBlock< /// LightService = LightComponents /// { |config| >::new(config) }, /// FullImportQueue = BasicQueue -/// { |_, client, _| Ok(BasicQueue::new(Arc::new(MyVerifier), client, None, None, None)) }, +/// { |_, client, _| Ok(BasicQueue::new(Arc::new(MyVerifier), Box::new(client), None, None, None)) }, /// LightImportQueue = BasicQueue -/// { |_, client| Ok(BasicQueue::new(Arc::new(MyVerifier), client, None, None, None)) }, +/// { |_, client| Ok(BasicQueue::new(Arc::new(MyVerifier), Box::new(client), None, None, None)) }, /// SelectChain = LongestChain, Self::Block> /// { |config: &FactoryFullConfiguration, client: Arc>| { /// #[allow(deprecated)] diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 48b310db5fc03..791324d927394 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -371,7 +371,7 @@ pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrin info!("Checking block sync"); let first_address = { let first_service = &network.full_nodes[0].1; - let client = first_service.get().client(); + let mut client = first_service.get().client(); for i in 0 .. NUM_BLOCKS { if i % 128 == 0 { info!("Generating #{}", i); diff --git a/core/test-client/src/client_ext.rs b/core/test-client/src/client_ext.rs index 7d05b1f570da9..5f677108c359d 100644 --- a/core/test-client/src/client_ext.rs +++ b/core/test-client/src/client_ext.rs @@ -57,7 +57,7 @@ impl ClientExt for Client where B: client::backend::Backend, E: client::CallExecutor, - Self: BlockImport, + for<'r> &'r Self: BlockImport, Block: BlockT::Out>, { fn import(&self, origin: BlockOrigin, block: Block) @@ -75,7 +75,7 @@ impl ClientExt for Client fork_choice: ForkChoiceStrategy::LongestChain, }; - self.import_block(import, HashMap::new()).map(|_| ()) + BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ()) } fn import_justified( @@ -96,7 +96,7 @@ impl ClientExt for Client fork_choice: ForkChoiceStrategy::LongestChain, }; - self.import_block(import, HashMap::new()).map(|_| ()) + BlockImport::import_block(&mut (&*self), import, HashMap::new()).map(|_| ()) } fn finalize_block( diff --git a/node-template/src/service.rs b/node-template/src/service.rs index 38a2481a71a70..986e450929cc7 100644 --- a/node-template/src/service.rs +++ b/node-template/src/service.rs @@ -100,7 +100,7 @@ construct_service_factory! { { |config: &mut FactoryFullConfiguration , client: Arc>, _select_chain: Self::SelectChain| { import_queue::<_, _, Pair>( SlotDuration::get_or_compute(&*client)?, - client.clone(), + Box::new(client.clone()), None, None, None, @@ -115,7 +115,7 @@ construct_service_factory! { { |config: &mut FactoryFullConfiguration, client: Arc>| { import_queue::<_, _, Pair>( SlotDuration::get_or_compute(&*client)?, - client.clone(), + Box::new(client.clone()), None, None, None, diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index f3efdfd98296f..25382d148e188 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -50,7 +50,7 @@ construct_simple_protocol! { pub struct NodeConfig { /// grandpa connection to import block // FIXME #1134 rather than putting this on the config, let's have an actual intermediate setup state - pub grandpa_import_setup: Option<(Arc>, grandpa::LinkHalfForService)>, + pub grandpa_import_setup: Option<(grandpa::BlockImportForService, grandpa::LinkHalfForService)>, inherent_data_providers: InherentDataProviders, } @@ -100,7 +100,7 @@ construct_service_factory! { Arc::new(aura_key), client, select_chain, - block_import.clone(), + block_import, proposer, service.network(), service.config.custom.inherent_data_providers.clone(), @@ -161,15 +161,14 @@ construct_service_factory! { grandpa::block_import::<_, _, _, RuntimeApi, FullClient, _>( client.clone(), client.clone(), select_chain )?; - let block_import = Arc::new(block_import); let justification_import = block_import.clone(); config.custom.grandpa_import_setup = Some((block_import.clone(), link_half)); import_queue::<_, _, AuraPair>( slot_duration, - block_import, - Some(justification_import), + Box::new(block_import), + Some(Box::new(justification_import)), None, None, client, @@ -186,15 +185,14 @@ construct_service_factory! { let block_import = grandpa::light_block_import::<_, _, _, RuntimeApi, LightClient>( client.clone(), Arc::new(fetch_checker), client.clone() )?; - let block_import = Arc::new(block_import); let finality_proof_import = block_import.clone(); let finality_proof_request_builder = finality_proof_import.create_finality_proof_request_builder(); import_queue::<_, _, AuraPair>( SlotDuration::get_or_compute(&*client)?, - block_import, + Box::new(block_import), None, - Some(finality_proof_import), + Some(Box::new(finality_proof_import)), Some(finality_proof_request_builder), client, config.custom.inherent_data_providers.clone(), diff --git a/test-utils/transaction-factory/src/lib.rs b/test-utils/transaction-factory/src/lib.rs index 8f292f3a02a2e..e90ca412ac7d9 100644 --- a/test-utils/transaction-factory/src/lib.rs +++ b/test-utils/transaction-factory/src/lib.rs @@ -176,5 +176,5 @@ fn import_block( auxiliary: Vec::new(), fork_choice: ForkChoiceStrategy::LongestChain, }; - client.import_block(import, HashMap::new()).expect("Failed to import block"); + (&**client).import_block(import, HashMap::new()).expect("Failed to import block"); }