diff --git a/Cargo.lock b/Cargo.lock index c9d66bbb6a06..993b4bdd6be3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -663,11 +663,12 @@ dependencies = [ [[package]] name = "finality-grandpa" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1803,6 +1804,7 @@ dependencies = [ "srml-treasury 0.1.0", "srml-upgrade-key 0.1.0", "substrate-client 0.1.0", + "substrate-fg-primitives 0.1.0", "substrate-keyring 0.1.0", "substrate-primitives 0.1.0", ] @@ -3178,20 +3180,37 @@ dependencies = [ "wasmi 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-fg-primitives" +version = "0.1.0" +dependencies = [ + "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", + "sr-std 0.1.0", + "substrate-client 0.1.0", + "substrate-primitives 0.1.0", +] + [[package]] name = "substrate-finality-grandpa" version = "0.1.0" dependencies = [ - "finality-grandpa 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "env_logger 0.5.13 (registry+https://github.com/rust-lang/crates.io-index)", + "finality-grandpa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "sr-primitives 0.1.0", "substrate-client 0.1.0", + "substrate-consensus-common 0.1.0", + "substrate-fg-primitives 0.1.0", "substrate-keyring 0.1.0", "substrate-network 0.1.0", "substrate-primitives 0.1.0", + "substrate-test-client 0.1.0", "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -4362,7 +4381,7 @@ dependencies = [ "checksum failure_derive 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "64c2d913fe8ed3b6c6518eedf4538255b989945c14c2a7d5cbff62a5e2120596" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" -"checksum finality-grandpa 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "20d8cf871510f0d57630e75f9e65f87cba29581ccab1f78666d8b2e422d0baa6" +"checksum finality-grandpa 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "be6d2735e8f570474c7925a60ebe04ec0bdd9eea7cc4fddab78a0ecfdefec20e" "checksum fixed-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a557e80084b05c32b455963ff565a9de6f2866da023d6671705c6aff6f65e01c" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" diff --git a/Cargo.toml b/Cargo.toml index e38e3c3e2373..7be891ec5999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ members = [ "core/consensus/rhd", "core/executor", "core/finality-grandpa", + "core/finality-grandpa/primitives", "core/keyring", "core/network", "core/primitives", diff --git a/core/client/db/src/lib.rs b/core/client/db/src/lib.rs index f485841fd897..c047c9398958 100644 --- a/core/client/db/src/lib.rs +++ b/core/client/db/src/lib.rs @@ -274,6 +274,18 @@ pub struct BlockImportOperation { updates: MemoryDB, changes_trie_updates: MemoryDB, pending_block: Option>, + aux_ops: Vec<(Vec, Option>)>, +} + +impl BlockImportOperation { + fn apply_aux(&mut self, transaction: &mut DBTransaction) { + for (key, maybe_val) in self.aux_ops.drain(..) { + match maybe_val { + Some(val) => transaction.put_vec(columns::AUX, &key, val), + None => transaction.delete(columns::AUX, &key), + } + } + } } impl client::backend::BlockImportOperation @@ -345,6 +357,13 @@ where Block: BlockT, self.changes_trie_updates = update; Ok(()) } + + fn set_aux(&mut self, ops: I) -> Result<(), client::error::Error> + where I: IntoIterator, Option>)> + { + self.aux_ops = ops.into_iter().collect(); + Ok(()) + } } struct StorageDb { @@ -636,6 +655,7 @@ impl client::backend::Backend for Backend whe old_state: state, updates: MemoryDB::default(), changes_trie_updates: MemoryDB::default(), + aux_ops: Vec::new(), }) } @@ -643,6 +663,7 @@ impl client::backend::Backend for Backend whe -> Result<(), client::error::Error> { let mut transaction = DBTransaction::new(); + operation.apply_aux(&mut transaction); if let Some(pending_block) = operation.pending_block { let hash = pending_block.header.hash(); diff --git a/core/client/db/src/light.rs b/core/client/db/src/light.rs index f336df4d28ef..eed75f25f8f0 100644 --- a/core/client/db/src/light.rs +++ b/core/client/db/src/light.rs @@ -43,6 +43,7 @@ pub(crate) mod columns { pub const HEADER: Option = Some(2); pub const CACHE: Option = Some(3); pub const CHT: Option = Some(4); + pub const AUX: Option = Some(5); } /// Light blockchain storage. Stores most recent headers + CHTs for older headers. @@ -238,6 +239,7 @@ impl LightBlockchainStorage for LightStorage header: Block::Header, authorities: Option>, leaf_state: NewBlockState, + aux_ops: Vec<(Vec, Option>)>, ) -> ClientResult<()> { let mut transaction = DBTransaction::new(); @@ -253,6 +255,13 @@ impl LightBlockchainStorage for LightStorage ::utils::number_and_hash_to_lookup_key(number, hash) }; + for (key, maybe_val) in aux_ops { + match maybe_val { + Some(val) => transaction.put_vec(columns::AUX, &key, val), + None => transaction.delete(columns::AUX, &key), + } + } + if leaf_state.is_best() { // handle reorg. { @@ -427,7 +436,7 @@ pub(crate) mod tests { ) -> Hash { let header = prepare_header(parent, number, extrinsics_root); let hash = header.hash(); - db.import_header(header, authorities, NewBlockState::Best).unwrap(); + db.import_header(header, authorities, NewBlockState::Best, Vec::new()).unwrap(); hash } @@ -439,7 +448,7 @@ pub(crate) mod tests { ) -> Hash { let header = prepare_header(parent, number, Default::default()); let hash = header.hash(); - db.import_header(header, authorities, NewBlockState::Best).unwrap(); + db.import_header(header, authorities, NewBlockState::Best, Vec::new()).unwrap(); hash } @@ -451,7 +460,7 @@ pub(crate) mod tests { ) -> Hash { let header = prepare_header(parent, number, Default::default()); let hash = header.hash(); - db.import_header(header, authorities, NewBlockState::Final).unwrap(); + db.import_header(header, authorities, NewBlockState::Final, Vec::new()).unwrap(); hash } @@ -463,7 +472,7 @@ pub(crate) mod tests { ) -> Hash { let header = prepare_header(parent, number, Default::default()); let hash = header.hash(); - db.import_header(header, authorities, NewBlockState::Normal).unwrap(); + db.import_header(header, authorities, NewBlockState::Normal, Vec::new()).unwrap(); hash } diff --git a/core/client/src/backend.rs b/core/client/src/backend.rs index b953c9aedfda..a4e6852560df 100644 --- a/core/client/src/backend.rs +++ b/core/client/src/backend.rs @@ -74,6 +74,9 @@ pub trait BlockImportOperation where fn reset_storage(&mut self, top: StorageMap, children: ChildrenStorageMap) -> error::Result; /// Inject changes trie data into the database. fn update_changes_trie(&mut self, update: MemoryDB) -> error::Result<()>; + /// Update auxiliary keys. Values are `None` if should be deleted. + fn set_aux(&mut self, ops: I) -> error::Result<()> + where I: IntoIterator, Option>)>; } /// Client backend. Manages the data layer. diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 6cf62dca2c9c..f889a7045fa0 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -497,6 +497,7 @@ impl Client where body: Option>, authorities: Option>, finalized: bool, + aux: Vec<(Vec, Option>)>, ) -> error::Result where RA: TaggedTransactionQueue, E: CallExecutor + Send + Sync + Clone, @@ -590,6 +591,8 @@ impl Client where if let Some(Some(changes_update)) = changes_update { transaction.update_changes_trie(changes_update)?; } + + transaction.set_aux(aux)?; self.backend.commit_operation(transaction)?; if make_notifications { @@ -775,7 +778,9 @@ impl Client where /// TODO [snd] possibly implement this on blockchain::Backend and just redirect here /// Returns `Ok(None)` if `target_hash` is not found in search space. /// TODO [snd] write down time complexity - pub fn best_containing(&self, target_hash: Block::Hash, maybe_max_number: Option>) -> error::Result> { + pub fn best_containing(&self, target_hash: Block::Hash, maybe_max_number: Option>) + -> error::Result> + { let target_header = { match self.backend.blockchain().header(BlockId::Hash(target_hash))? { Some(x) => x, @@ -911,7 +916,8 @@ impl CallApiAt for Client where B: backend::Backend, E: CallExecutor + Clone + Send + Sync, Block: BlockT, - RA: CoreAPI + RA: CoreAPI, // not strictly necessary at the moment + // but we want to bound to make sure the API is actually available. { fn call_api_at( &self, @@ -966,7 +972,7 @@ impl consensus::BlockImport for Client post_digests, body, finalized, - .. + auxiliary, } = import_block; let parent_hash = header.parent_hash().clone(); @@ -998,6 +1004,7 @@ impl consensus::BlockImport for Client body, new_authorities, finalized, + auxiliary, ); *self.importing_block.write() = None; diff --git a/core/client/src/in_mem.rs b/core/client/src/in_mem.rs index 70adb97607bc..4c135e840cef 100644 --- a/core/client/src/in_mem.rs +++ b/core/client/src/in_mem.rs @@ -96,6 +96,7 @@ struct BlockchainStorage { genesis_hash: Block::Hash, cht_roots: HashMap, Block::Hash>, leaves: LeafSet>, + aux: HashMap, Vec>, } /// In-memory blockchain. Supports concurrent reads. @@ -144,6 +145,7 @@ impl Blockchain { genesis_hash: Default::default(), cht_roots: HashMap::new(), leaves: LeafSet::new(), + aux: HashMap::new(), })); Blockchain { storage: storage.clone(), @@ -247,6 +249,16 @@ impl Blockchain { self.storage.write().finalized_hash = hash; Ok(()) } + + fn write_aux(&self, ops: Vec<(Vec, Option>)>) { + let mut storage = self.storage.write(); + for (k, v) in ops { + match v { + Some(v) => storage.aux.insert(k, v), + None => storage.aux.remove(&k), + }; + } + } } impl HeaderBackend for Blockchain { @@ -320,6 +332,7 @@ impl light::blockchain::Storage for Blockchain header: Block::Header, authorities: Option>, state: NewBlockState, + aux_ops: Vec<(Vec, Option>)>, ) -> error::Result<()> { let hash = header.hash(); let parent_hash = *header.parent_hash(); @@ -328,6 +341,7 @@ impl light::blockchain::Storage for Blockchain self.cache.insert(parent_hash, authorities); } + self.write_aux(aux_ops); Ok(()) } @@ -356,6 +370,7 @@ pub struct BlockImportOperation { old_state: InMemory, new_state: Option>, changes_trie_update: Option>, + aux: Option, Option>)>>, } impl backend::BlockImportOperation for BlockImportOperation @@ -426,6 +441,13 @@ where self.new_state = Some(InMemory::from(transaction)); Ok(root) } + + fn set_aux(&mut self, ops: I) -> error::Result<()> + where I: IntoIterator, Option>)> + { + self.aux = Some(ops.into_iter().collect()); + Ok(()) + } } /// In-memory backend. Keeps all states and blocks in memory. Useful for testing. @@ -438,7 +460,6 @@ where states: RwLock>>, changes_trie_storage: InMemoryChangesTrieStorage, blockchain: Blockchain, - aux: RwLock, Vec>>, } impl Backend @@ -453,7 +474,6 @@ where states: RwLock::new(HashMap::new()), changes_trie_storage: InMemoryChangesTrieStorage::new(), blockchain: Blockchain::new(), - aux: RwLock::new(HashMap::new()), } } } @@ -481,6 +501,7 @@ where old_state: state, new_state: None, changes_trie_update: None, + aux: None, }) } @@ -508,6 +529,10 @@ where self.blockchain.cache.insert(parent_hash, operation.pending_authorities); } } + + if let Some(ops) = operation.aux { + self.blockchain.write_aux(ops); + } Ok(()) } @@ -535,18 +560,18 @@ where } fn insert_aux<'a, 'b: 'a, 'c: 'a, I: IntoIterator, D: IntoIterator>(&self, insert: I, delete: D) -> error::Result<()> { - let mut aux = self.aux.write(); + let mut storage = self.blockchain.storage.write(); for (k, v) in insert { - aux.insert(k.to_vec(), v.to_vec()); + storage.aux.insert(k.to_vec(), v.to_vec()); } for k in delete { - aux.remove(*k); + storage.aux.remove(*k); } Ok(()) } fn get_aux(&self, key: &[u8]) -> error::Result>> { - Ok(self.aux.read().get(key).cloned()) + Ok(self.blockchain.storage.read().aux.get(key).cloned()) } } diff --git a/core/client/src/light/backend.rs b/core/client/src/light/backend.rs index 8d2414396a8d..884d0377041b 100644 --- a/core/client/src/light/backend.rs +++ b/core/client/src/light/backend.rs @@ -46,6 +46,7 @@ pub struct ImportOperation { header: Option, authorities: Option>, leaf_state: NewBlockState, + aux_ops: Vec<(Vec, Option>)>, _phantom: ::std::marker::PhantomData<(S, F)>, } @@ -86,6 +87,7 @@ impl ClientBackend for Backend where header: None, authorities: None, leaf_state: NewBlockState::Normal, + aux_ops: Vec::new(), _phantom: Default::default(), }) } @@ -96,6 +98,7 @@ impl ClientBackend for Backend where header, operation.authorities, operation.leaf_state, + operation.aux_ops, ) } @@ -193,6 +196,13 @@ where let mut op = in_mem.begin_operation(BlockId::Hash(Default::default()))?; op.reset_storage(top, children) } + + fn set_aux(&mut self, ops: I) -> ClientResult<()> + where I: IntoIterator, Option>)> + { + self.aux_ops = ops.into_iter().collect(); + Ok(()) + } } impl StateBackend for OnDemandState diff --git a/core/client/src/light/blockchain.rs b/core/client/src/light/blockchain.rs index 97c20ecc9074..2dd7097751a5 100644 --- a/core/client/src/light/blockchain.rs +++ b/core/client/src/light/blockchain.rs @@ -35,11 +35,15 @@ use light::fetcher::{Fetcher, RemoteHeaderRequest}; /// Light client blockchain storage. pub trait Storage: BlockchainHeaderBackend { /// Store new header. Should refuse to revert any finalized blocks. + /// + /// Takes new authorities, the leaf state of the new block, and + /// any auxiliary storage updates to place in the same operation. fn import_header( &self, header: Block::Header, authorities: Option>, state: NewBlockState, + aux_ops: Vec<(Vec, Option>)>, ) -> ClientResult<()>; /// Mark historic header as finalized. diff --git a/core/client/src/runtime_api/macros.rs b/core/client/src/runtime_api/macros.rs index 238314680bf4..b58f2143c888 100644 --- a/core/client/src/runtime_api/macros.rs +++ b/core/client/src/runtime_api/macros.rs @@ -16,6 +16,9 @@ //! Macros for declaring and implementing the runtime APIs. +// these are part of the public API, so need to be re-exported +pub use runtime_version::{ApiId, RuntimeVersion}; + /// Declare the given API traits. /// /// # Example: diff --git a/core/client/src/runtime_api/mod.rs b/core/client/src/runtime_api/mod.rs index 62f0f0ca0362..e15171836049 100644 --- a/core/client/src/runtime_api/mod.rs +++ b/core/client/src/runtime_api/mod.rs @@ -24,12 +24,11 @@ pub use state_machine::OverlayedChanges; pub use runtime_primitives::{traits::Block as BlockT, generic::BlockId}; #[cfg(feature = "std")] use runtime_primitives::traits::ApiRef; -use runtime_version::ApiId; +pub use runtime_version::ApiId; #[doc(hidden)] pub use rstd::slice; #[cfg(feature = "std")] use rstd::result; -#[doc(hidden)] pub use codec::{Encode, Decode}; #[cfg(feature = "std")] use error; @@ -73,6 +72,30 @@ pub trait CallApiAt { changes: &mut OverlayedChanges, initialised_block: &mut Option>, ) -> error::Result>; + + /// Call the given api function with strong arguments at the given block + /// and returns the decoded result. + fn call_api_at_strong( + &self, + at: &BlockId, + function: &'static str, + args: &In, + changes: &mut OverlayedChanges, + initialised_block: &mut Option>, + ) -> error::Result where Self: Sized { + let raw = self.call_api_at( + at, + function, + args.encode(), + changes, + initialised_block, + )?; + + match Out::decode(&mut &raw[..]) { + Some(out) => Ok(out), + None => bail!(error::ErrorKind::CallResultDecode(function)), + } + } } /// The ApiIds for the various standard runtime APIs. diff --git a/core/consensus/aura/src/lib.rs b/core/consensus/aura/src/lib.rs index 029fbe802527..babfef8dc180 100644 --- a/core/consensus/aura/src/lib.rs +++ b/core/consensus/aura/src/lib.rs @@ -389,11 +389,11 @@ pub type AuraImportQueue = BasicQueue>; /// Start an import queue for the Aura consensus algorithm. pub fn import_queue(config: Config, client: Arc) -> AuraImportQueue where B: Block, - C: Authorities + BlockImport + Send + Sync, + C: Authorities + BlockImport + Send + Sync, DigestItemFor: CompatibleDigestItem, { - let verifier = Arc::new(AuraVerifier { config, client }); - BasicQueue::new(verifier) + let verifier = Arc::new(AuraVerifier { config, client: client.clone() }); + BasicQueue::new(verifier, client) } @@ -443,12 +443,13 @@ mod tests { const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); pub struct AuraTestNet { - peers: Vec>>>, + peers: Vec, ()>>>, started: bool } impl TestNetFactory for AuraTestNet { type Verifier = AuraVerifier; + type PeerData = (); /// Create new test network with peers and given config. fn from_config(_config: &ProtocolConfig) -> Self { @@ -465,15 +466,15 @@ mod tests { Arc::new(AuraVerifier { client, config }) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F ) { + fn mut_peers>>)>(&mut self, closure: F) { closure(&mut self.peers); } diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index b507ffd6d7c1..a4b20e51777b 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -6,17 +6,22 @@ authors = ["Parity Technologies "] [dependencies] futures = "0.1.17" parity-codec = "2.1" +parity-codec-derive = "2.0" sr-primitives = { path = "../sr-primitives" } +substrate-consensus-common = { path = "../consensus/common" } substrate-primitives = { path = "../primitives" } substrate-client = { path = "../client" } log = "0.4" +parking_lot = "0.4" tokio = "0.1.7" +substrate-fg-primitives = { path = "primitives" } [dependencies.finality-grandpa] -version = "0.2.0" +version = "0.3.0" features = ["derive-codec"] [dev-dependencies] substrate-network = { path = "../network", features = ["test-helpers"] } -parking_lot = "0.4" substrate-keyring = { path = "../keyring" } +substrate-test-client = { path = "../test-client"} +env_logger = "0.5" diff --git a/core/finality-grandpa/primitives/Cargo.toml b/core/finality-grandpa/primitives/Cargo.toml new file mode 100644 index 000000000000..8f9c6628b86d --- /dev/null +++ b/core/finality-grandpa/primitives/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "substrate-fg-primitives" +version = "0.1.0" +authors = ["Parity Technologies "] + +[dependencies] +substrate-client = { path = "../../client", default-features = false } +substrate-primitives = { path = "../../primitives", default-features = false } +parity-codec = { version = "2.1", default-features = false } +parity-codec-derive = { version = "2.1", default-features = false } +sr-primitives = { path = "../../sr-primitives", default-features = false } +sr-std = { path = "../../sr-std", default-features = false } + +[features] +default = ["std"] +std = [ + "substrate-primitives/std", + "substrate-client/std", + "parity-codec/std", + "parity-codec-derive/std", + "sr-primitives/std", + "sr-std/std", +] diff --git a/core/finality-grandpa/primitives/src/lib.rs b/core/finality-grandpa/primitives/src/lib.rs new file mode 100644 index 000000000000..aaca423ecdf4 --- /dev/null +++ b/core/finality-grandpa/primitives/src/lib.rs @@ -0,0 +1,86 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Primitives for GRANDPA integration, suitable for WASM compilation. + +#![cfg_attr(not(feature = "std"), no_std)] + +extern crate substrate_primitives; +extern crate sr_primitives; +extern crate parity_codec; + +#[macro_use] +extern crate parity_codec_derive; + +#[macro_use] +extern crate substrate_client as client; + +extern crate sr_std as rstd; + +use substrate_primitives::AuthorityId; +use sr_primitives::traits::{Block as BlockT, DigestFor, NumberFor}; +use rstd::vec::Vec; + +/// A scheduled change of authority set. +#[cfg_attr(feature = "std", derive(Debug, PartialEq))] +#[derive(Clone, Encode, Decode)] +pub struct ScheduledChange { + /// The new authorities after the change, along with their respective weights. + pub next_authorities: Vec<(AuthorityId, u64)>, + /// The number of blocks to delay. + pub delay: N, +} + +/// WASM function call to check for pending changes. +pub const PENDING_CHANGE_CALL: &str = "grandpa_pending_change"; +/// WASM function call to get current GRANDPA authorities. +pub const AUTHORITIES_CALL: &str = "grandpa_authorities"; + +/// The ApiIds for GRANDPA API. +pub mod id { + use client::runtime_api::ApiId; + + /// ApiId for the GrandpaApi trait. + pub const GRANDPA_API: ApiId = *b"fgrandpa"; +} + +decl_runtime_apis! { + /// APIs for integrating the GRANDPA finality gadget into runtimes. + /// This should be implemented on the runtime side. + /// + /// This is primarily used for negotiating authority-set changes for the + /// gadget. GRANDPA uses a signalling model of changing authority sets: + /// changes should be signalled with a delay of N blocks, and then automatically + /// applied in the runtime after those N blocks have passed. + /// + /// The consensus protocol will coordinate the handoff externally. + pub trait GrandpaApi { + /// Check a digest for pending changes. + /// Return `None` if there are no pending changes. + /// + /// Precedence towards earlier or later digest items can be given + /// based on the rules of the chain. + /// + /// No change should be scheduled if one is already and the delay has not + /// passed completely. + fn grandpa_pending_change(digest: DigestFor) + -> Option>>; + + /// Get the current GRANDPA authorities and weights. This should not change except + /// for when changes are scheduled and the corresponding delay has passed. + fn grandpa_authorities() -> Vec<(AuthorityId, u64)>; + } +} diff --git a/core/finality-grandpa/src/authorities.rs b/core/finality-grandpa/src/authorities.rs new file mode 100644 index 000000000000..c97769cb4356 --- /dev/null +++ b/core/finality-grandpa/src/authorities.rs @@ -0,0 +1,344 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Utilities for dealing with authorities, authority sets, and handoffs. + +use parking_lot::RwLock; +use substrate_primitives::AuthorityId; + +use std::cmp::Ord; +use std::fmt::Debug; +use std::ops::Add; +use std::sync::Arc; + +/// A shared authority set. +pub(crate) struct SharedAuthoritySet { + inner: Arc>>, +} + +impl Clone for SharedAuthoritySet { + fn clone(&self) -> Self { + SharedAuthoritySet { inner: self.inner.clone() } + } +} + +impl SharedAuthoritySet { + /// The genesis authority set. + pub(crate) fn genesis(initial: Vec<(AuthorityId, u64)>) -> Self { + SharedAuthoritySet { + inner: Arc::new(RwLock::new(AuthoritySet::genesis(initial))) + } + } + + /// Acquire a reference to the inner read-write lock. + pub(crate) fn inner(&self) -> &RwLock> { + &*self.inner + } +} + +impl SharedAuthoritySet + where N: Add + Ord + Clone + Debug +{ + /// Get the earliest limit-block number, if any. + pub(crate) fn current_limit(&self) -> Option { + self.inner.read().current_limit() + } + + /// Get the current set ID. This is incremented every time the set changes. + pub(crate) fn set_id(&self) -> u64 { + self.inner.read().set_id + } +} + +impl From> for SharedAuthoritySet { + fn from(set: AuthoritySet) -> Self { + SharedAuthoritySet { inner: Arc::new(RwLock::new(set)) } + } +} + +/// Status of the set after changes were applied. +pub(crate) struct Status { + /// Whether internal changes were made. + pub(crate) changed: bool, + /// `Some` when underlying authority set has changed, containing the + /// block where that set changed. + pub(crate) new_set_block: Option<(H, N)>, +} + +/// A set of authorities. +#[derive(Debug, Clone, Encode, Decode)] +pub(crate) struct AuthoritySet { + current_authorities: Vec<(AuthorityId, u64)>, + set_id: u64, + pending_changes: Vec>, +} + +impl AuthoritySet { + /// Get a genesis set with given authorities. + pub(crate) fn genesis(initial: Vec<(AuthorityId, u64)>) -> Self { + AuthoritySet { + current_authorities: initial, + set_id: 0, + pending_changes: Vec::new(), + } + } + + /// Get the current set id and a reference to the current authority set. + pub(crate) fn current(&self) -> (u64, &[(AuthorityId, u64)]) { + (self.set_id, &self.current_authorities[..]) + } +} + +impl AuthoritySet + where N: Add + Ord + Clone + Debug, +{ + /// Note an upcoming pending transition. + pub(crate) fn add_pending_change(&mut self, pending: PendingChange) { + // ordered first by effective number and then by signal-block number. + let key = (pending.effective_number(), pending.canon_height.clone()); + let idx = self.pending_changes + .binary_search_by_key(&key, |change| ( + change.effective_number(), + change.canon_height.clone(), + )) + .unwrap_or_else(|i| i); + + self.pending_changes.insert(idx, pending); + } + + /// Inspect pending changes. + #[cfg(test)] + pub(crate) fn pending_changes(&self) -> &[PendingChange] { + &self.pending_changes + } + + /// Get the earliest limit-block number, if any. + pub(crate) fn current_limit(&self) -> Option { + self.pending_changes.get(0).map(|change| change.effective_number().clone()) + } + + /// Apply or prune any pending transitions. Provide a closure that can be used to check for the + /// finalized block with given number. + /// + /// When the set has changed, the return value will be `Ok(Some((H, N)))` which is the cnaonical + /// block where the set last changed. + pub(crate) fn apply_changes(&mut self, just_finalized: N, mut canonical: F) + -> Result, E> + where F: FnMut(N) -> Result + { + let mut status = Status { + changed: false, + new_set_block: None, + }; + loop { + let remove_up_to = match self.pending_changes.first() { + None => break, + Some(change) => { + let effective_number = change.effective_number(); + if effective_number > just_finalized { break } + + // check if the block that signalled the change is canonical in + // our chain. + if canonical(change.canon_height.clone())? == change.canon_hash { + // apply this change: make the set canonical + info!(target: "finality", "Applying authority set change scheduled at block #{:?}", + change.canon_height); + + self.current_authorities = change.next_authorities.clone(); + self.set_id += 1; + + status.new_set_block = Some(( + canonical(effective_number.clone())?, + effective_number.clone(), + )); + + // discard any signalled changes + // that happened before or equal to the effective number of the change. + self.pending_changes.iter() + .take_while(|c| c.canon_height <= effective_number) + .count() + } else { + 1 // prune out this entry; it's no longer relevant. + } + } + }; + + let remove_up_to = ::std::cmp::min(remove_up_to, self.pending_changes.len()); + self.pending_changes.drain(..remove_up_to); + status.changed = true; // always changed because we strip at least the first change. + } + + Ok(status) + } +} + +/// A pending change to the authority set. +/// +/// This will be applied when the announcing block is at some depth within +/// the finalized chain. +#[derive(Debug, Clone, Encode, Decode, PartialEq)] +pub(crate) struct PendingChange { + /// The new authorities and weights to apply. + pub(crate) next_authorities: Vec<(AuthorityId, u64)>, + /// How deep in the finalized chain the announcing block must be + /// before the change is applied. + pub(crate) finalization_depth: N, + /// The announcing block's height. + pub(crate) canon_height: N, + /// The announcing block's hash. + pub(crate) canon_hash: H, +} + +impl + Clone> PendingChange { + /// Returns the effective number this change will be applied at. + fn effective_number(&self) -> N { + self.canon_height.clone() + self.finalization_depth.clone() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn changes_sorted_in_correct_order() { + let mut authorities = AuthoritySet { + current_authorities: Vec::new(), + set_id: 0, + pending_changes: Vec::new(), + }; + + let change_a = PendingChange { + next_authorities: Vec::new(), + finalization_depth: 10, + canon_height: 5, + canon_hash: "hash_a", + }; + + let change_b = PendingChange { + next_authorities: Vec::new(), + finalization_depth: 0, + canon_height: 16, + canon_hash: "hash_b", + }; + + let change_c = PendingChange { + next_authorities: Vec::new(), + finalization_depth: 5, + canon_height: 10, + canon_hash: "hash_c", + }; + + authorities.add_pending_change(change_a.clone()); + authorities.add_pending_change(change_b.clone()); + authorities.add_pending_change(change_c.clone()); + + assert_eq!(authorities.pending_changes, vec![change_a, change_c, change_b]); + } + + #[test] + fn apply_change() { + let mut authorities = AuthoritySet { + current_authorities: Vec::new(), + set_id: 0, + pending_changes: Vec::new(), + }; + + let set_a = vec![([1; 32].into(), 5)]; + let set_b = vec![([2; 32].into(), 5)]; + + let change_a = PendingChange { + next_authorities: set_a.clone(), + finalization_depth: 10, + canon_height: 5, + canon_hash: "hash_a", + }; + + let change_b = PendingChange { + next_authorities: set_b.clone(), + finalization_depth: 10, + canon_height: 5, + canon_hash: "hash_b", + }; + + authorities.add_pending_change(change_a.clone()); + authorities.add_pending_change(change_b.clone()); + + authorities.apply_changes(10, |_| Err(())).unwrap(); + assert!(authorities.current_authorities.is_empty()); + + authorities.apply_changes(15, |n| match n { + 5 => Ok("hash_a"), + 15 => Ok("hash_15_canon"), + _ => Err(()), + }).unwrap(); + + assert_eq!(authorities.current_authorities, set_a); + assert_eq!(authorities.set_id, 1); + assert!(authorities.pending_changes.is_empty()); + } + + #[test] + fn apply_many_changes_at_once() { + let mut authorities = AuthoritySet { + current_authorities: Vec::new(), + set_id: 0, + pending_changes: Vec::new(), + }; + + let set_a = vec![([1; 32].into(), 5)]; + let set_b = vec![([2; 32].into(), 5)]; + let set_c = vec![([3; 32].into(), 5)]; + + let change_a = PendingChange { + next_authorities: set_a.clone(), + finalization_depth: 10, + canon_height: 5, + canon_hash: "hash_a", + }; + + // will be ignored because it was signalled when change_a still pending. + let change_b = PendingChange { + next_authorities: set_b.clone(), + finalization_depth: 10, + canon_height: 15, + canon_hash: "hash_b", + }; + + let change_c = PendingChange { + next_authorities: set_c.clone(), + finalization_depth: 10, + canon_height: 16, + canon_hash: "hash_c", + }; + + authorities.add_pending_change(change_a.clone()); + authorities.add_pending_change(change_b.clone()); + authorities.add_pending_change(change_c.clone()); + + authorities.apply_changes(26, |n| match n { + 5 => Ok("hash_a"), + 15 => Ok("hash_b"), + 16 => Ok("hash_c"), + 26 => Ok("hash_26"), + _ => Err(()), + }).unwrap(); + + assert_eq!(authorities.current_authorities, set_c); + assert_eq!(authorities.set_id, 2); // has been bumped only twice + assert!(authorities.pending_changes.is_empty()); + } +} diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 21f75f47f89e..b107c3089e4a 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -16,15 +16,50 @@ //! Integration of the GRANDPA finality gadget into substrate. //! -//! This is a long-running future that produces finality notifications. +//! This crate provides a long-running future that produces finality notifications. +//! +//! # Usage +//! +//! First, create a block-import wrapper with the `block_import` function. +//! The GRANDPA worker needs to be linked together with this block import object, +//! so a `LinkHalf` is returned as well. All blocks imported (from network or consensus or otherwise) +//! must pass through this wrapper, otherwise consensus is likely to break in +//! unexpected ways. +//! +//! Next, use the `LinkHalf` and a local configuration to `run_grandpa`. This requires a +//! `Network` implementation. The returned future should be driven to completion and +//! will finalize blocks in the background. +//! +//! # Changing authority sets +//! +//! The rough idea behind changing authority sets in GRANDPA is that at some point, +//! we obtain agreement for some maximum block height that the current set can +//! finalize, and once a block with that height is finalized the next set will +//! pick up finalization from there. +//! +//! Technically speaking, this would be implemented as a voting rule which says, +//! "if there is a signal for a change in N blocks in block B, only vote on +//! chains with length NUM(B) + N if they contain B". This conditional-inclusion +//! logic is complex to compute because it requires looking arbitrarily far +//! back in the chain. +//! +//! Instead, we keep track of a list of all signals we've seen so far, +//! sorted ascending by the block number they would be applied at. We never vote +//! on chains with number higher than the earliest handoff block number +//! (this is num(signal) + N). When finalizing a block, we either apply or prune +//! any signaled changes based on whether the signaling block is included in the +//! newly-finalized chain. extern crate finality_grandpa as grandpa; extern crate futures; extern crate substrate_client as client; extern crate sr_primitives as runtime_primitives; +extern crate substrate_consensus_common as consensus_common; extern crate substrate_primitives; extern crate tokio; +extern crate parking_lot; extern crate parity_codec as codec; +extern crate substrate_fg_primitives as fg_primitives; #[macro_use] extern crate log; @@ -33,44 +68,84 @@ extern crate log; extern crate substrate_network as network; #[cfg(test)] -extern crate parking_lot; +extern crate substrate_keyring as keyring; #[cfg(test)] -extern crate substrate_keyring as keyring; +extern crate substrate_test_client as test_client; + +#[cfg(test)] +extern crate env_logger; + +#[macro_use] +extern crate parity_codec_derive; use futures::prelude::*; use futures::stream::Fuse; use futures::sync::mpsc; -use client::{Client, ImportNotifications, backend::Backend, CallExecutor, blockchain::HeaderBackend}; +use client::{Client, error::Error as ClientError, ImportNotifications, backend::Backend, CallExecutor}; +use client::blockchain::HeaderBackend; +use client::runtime_api::TaggedTransactionQueue; use codec::{Encode, Decode}; -use runtime_primitives::traits::{As, NumberFor, Block as BlockT, Header as HeaderT}; +use consensus_common::{BlockImport, ImportBlock, ImportResult}; +use runtime_primitives::traits::{ + NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi +}; +use fg_primitives::GrandpaApi; use runtime_primitives::generic::BlockId; use substrate_primitives::{ed25519, H256, AuthorityId, Blake2Hasher}; use tokio::timer::Interval; use grandpa::Error as GrandpaError; -use grandpa::{voter, round::State as RoundState, Prevote, Precommit, Equivocation}; +use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps}; use std::collections::{VecDeque, HashMap}; use std::sync::Arc; use std::time::{Instant, Duration}; +use authorities::SharedAuthoritySet; + +pub use fg_primitives::ScheduledChange; + +mod authorities; + +#[cfg(test)] +mod tests; + const LAST_COMPLETED_KEY: &[u8] = b"grandpa_completed_round"; +const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters"; + +/// round-number, round-state +type LastCompleted = (u64, RoundState); /// A GRANDPA message for a substrate chain. -pub type Message = grandpa::Message<::Hash>; +pub type Message = grandpa::Message<::Hash, NumberFor>; /// A signed message. -pub type SignedMessage = grandpa::SignedMessage<::Hash, ed25519::Signature, AuthorityId>; +pub type SignedMessage = grandpa::SignedMessage< + ::Hash, + NumberFor, + ed25519::Signature, + AuthorityId, +>; +/// A prevote message for this chain's block type. +pub type Prevote = grandpa::Prevote<::Hash, NumberFor>; +/// A precommit message for this chain's block type. +pub type Precommit = grandpa::Precommit<::Hash, NumberFor>; /// Configuration for the GRANDPA service. +#[derive(Clone)] pub struct Config { /// The expected duration for a message to be gossiped across the network. pub gossip_duration: Duration, - /// The voters. - // TODO: make dynamic - pub voters: Vec, /// The local signing key. pub local_key: Option>, + /// Some local identifier of the voter. + pub name: Option, +} + +impl Config { + fn name(&self) -> &str { + self.name.as_ref().map(|s| s.as_str()).unwrap_or("") + } } /// Errors that can occur while voting in GRANDPA. @@ -83,7 +158,7 @@ pub enum Error { /// A blockchain error. Blockchain(String), /// Could not complete a round on disk. - CouldNotCompleteRound(::client::error::Error), + Client(ClientError), /// A timer failed to fire. Timer(::tokio::timer::Error), } @@ -104,13 +179,13 @@ pub trait Network: Clone { /// Get a stream of messages for a specific round. This stream should /// never logically conclude. - fn messages_for(&self, round: u64) -> Self::In; + fn messages_for(&self, round: u64, set_id: u64) -> Self::In; /// Send a message at a specific round out. - fn send_message(&self, round: u64, message: Vec); + fn send_message(&self, round: u64, set_id: u64, message: Vec); /// Clean up messages for a round. - fn drop_messages(&self, round: u64); + fn drop_messages(&self, round: u64, set_id: u64); } /// Something which can determine if a block is known. @@ -118,24 +193,23 @@ pub trait BlockStatus { /// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block /// is definitely known and has been imported. /// If an unexpected error occurs, return that. - fn block_number(&self, hash: Block::Hash) -> Result, Error>; + fn block_number(&self, hash: Block::Hash) -> Result>, Error>; } impl, RA> BlockStatus for Arc> where B: Backend, E: CallExecutor + Send + Sync, RA: Send + Sync, - NumberFor: As, + NumberFor: BlockNumberOps, { - fn block_number(&self, hash: Block::Hash) -> Result, Error> { + fn block_number(&self, hash: Block::Hash) -> Result>, Error> { self.block_number_from_id(&BlockId::Hash(hash)) .map_err(|e| Error::Blockchain(format!("{:?}", e))) - .map(|num| num.map(|n| n.as_())) } } /// Buffering imported messages until blocks with given hashes are imported. -pub struct UntilImported { +struct UntilImported { import_notifications: Fuse>, status_check: Status, inner: Fuse, @@ -270,6 +344,7 @@ impl, I> Stream for UntilImported { round: u64, + set_id: u64, inner: I, network: N, } @@ -293,13 +368,27 @@ impl Sink for ClearOnDrop { impl Drop for ClearOnDrop { fn drop(&mut self) { - self.network.drop_messages(self.round); + self.network.drop_messages(self.round, self.set_id); } } +fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { + let mut v = message.encode(); + + round.using_encoded(|s| v.extend(s)); + set_id.using_encoded(|s| v.extend(s)); + + v +} + // converts a message stream into a stream of signed messages. // the output stream checks signatures also. -fn checked_message_stream(inner: S, voters: Vec) +fn checked_message_stream( + round: u64, + set_id: u64, + inner: S, + voters: Arc>, +) -> impl Stream,Error=Error> where S: Stream,Error=()> { @@ -313,13 +402,13 @@ fn checked_message_stream(inner: S, voters: Vec) }) .and_then(move |msg| { // check signature. - if !voters.contains(&msg.id) { + if !voters.contains_key(&msg.id) { debug!(target: "afg", "Skipping message from unknown voter {}", msg.id); return Ok(None); } let as_public = ::ed25519::Public::from_raw(msg.id.0); - let encoded_raw = msg.message.encode(); + let encoded_raw = localized_payload(round, set_id, &msg.message); if ::ed25519::verify_strong(&msg.signature, &encoded_raw, as_public) { Ok(Some(msg)) } else { @@ -332,9 +421,10 @@ fn checked_message_stream(inner: S, voters: Vec) } fn outgoing_messages( - local_key: Option>, - voters: Vec, round: u64, + set_id: u64, + local_key: Option>, + voters: Arc>, network: N, ) -> ( impl Stream,Error=Error>, @@ -342,15 +432,20 @@ fn outgoing_messages( ) { let locals = local_key.and_then(|pair| { let public = pair.public(); - voters.iter().find(|id| id.0 == public.0).map(move |id| (pair, id.clone())) + let id = AuthorityId(public.0); + if voters.contains_key(&id) { + Some((pair, id)) + } else { + None + } }); let (tx, rx) = mpsc::unbounded(); let rx = rx .map(move |msg: Message| { - // when locals exist. sign messages on import + // when locals exist, sign messages on import if let Some((ref pair, local_id)) = locals { - let encoded = msg.encode(); + let encoded = localized_payload(round, set_id, &msg); let signature = pair.sign(&encoded[..]); let signed = SignedMessage:: { message: msg, @@ -359,7 +454,7 @@ fn outgoing_messages( }; // forward to network. - network.send_message(round, signed.encode()); + network.send_message(round, set_id, signed.encode()); Some(signed) } else { None @@ -377,20 +472,22 @@ fn outgoing_messages( } /// The environment we run GRANDPA in. -pub struct Environment { +struct Environment { inner: Arc>, - voters: HashMap, + voters: Arc>, config: Config, + authority_set: SharedAuthoritySet>, network: N, + set_id: u64, } -impl, B, E, N, RA> grandpa::Chain for Environment where +impl, B, E, N, RA> grandpa::Chain> for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + 'static, N: Network + 'static, N::In: 'static, - NumberFor: As, + NumberFor: BlockNumberOps, { fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result, GrandpaError> { let tree_route_res = ::client::blockchain::tree_route( @@ -418,14 +515,23 @@ impl, B, E, N, RA> grandpa::Chain for Envi Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect()) } - fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, u32)> { - match self.inner.best_containing(block, None) { + fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor)> { + // we refuse to vote beyond the current limit number where transitions are scheduled to + // occur. + // once blocks are finalized that make that transition irrelevant or activate it, + // we will proceed onwards. most of the time there will be no pending transition. + let limit = self.authority_set.current_limit(); + trace!(target: "afg", "Finding best chain containing block {:?} with number limit {:?}", block, limit); + + match self.inner.best_containing(block, limit) { Ok(Some(hash)) => { let header = self.inner.header(&BlockId::Hash(hash)).ok()? .expect("Header known to exist after `best_containing` call; qed"); - Some((hash, header.number().as_())) + Some((hash, header.number().clone())) } + // Ok(None) can be returned when `block` is after `limit`. That might cause issues. + // might be better to return the header itself in this (rare) case. Ok(None) => None, Err(e) => { debug!(target: "afg", "Encountered error finding best chain containing {:?}: {:?}", block, e); @@ -435,22 +541,65 @@ impl, B, E, N, RA> grandpa::Chain for Envi } } -impl, N, RA> voter::Environment for Environment where +/// A new authority set along with the canonical block it changed at. +#[derive(Debug)] +struct NewAuthoritySet { + canon_number: N, + canon_hash: H, + set_id: u64, + authorities: Vec<(AuthorityId, u64)>, +} + +/// Signals either an early exit of a voter or an error. +#[derive(Debug)] +enum ExitOrError { + /// An error occurred. + Error(Error), + /// Early exit of the voter: the new set ID and the new authorities along with respective weights. + AuthoritiesChanged(NewAuthoritySet), +} + +impl From for ExitOrError { + fn from(e: Error) -> Self { + ExitOrError::Error(e) + } +} + +impl From for ExitOrError { + fn from(e: ClientError) -> Self { + ExitOrError::Error(Error::Client(e)) + } +} + +impl From for ExitOrError { + fn from(e: grandpa::Error) -> Self { + ExitOrError::Error(Error::from(e)) + } +} + +impl, N, RA> voter::Environment> for Environment where Block: 'static, B: Backend + 'static, E: CallExecutor + 'static + Send + Sync, N: Network + 'static, N::In: 'static, RA: 'static + Send + Sync, - NumberFor: As, + NumberFor: BlockNumberOps, { type Timer = Box>; type Id = AuthorityId; type Signature = ed25519::Signature; - type In = Box, Error = Self::Error>>; - type Out = Box, SinkError = Self::Error>>; - type Error = Error; - + type In = Box, Self::Signature, Self::Id>, + Error = Self::Error, + >>; + type Out = Box>, + SinkError = Self::Error, + >>; + type Error = ExitOrError>; + + #[allow(unreachable_code)] fn round_data( &self, round: u64 @@ -464,14 +613,17 @@ impl, N, RA> voter::Environment for // TODO: dispatch this with `mpsc::spawn`. let incoming = checked_message_stream::( - self.network.messages_for(round), - self.config.voters.clone(), + round, + self.set_id, + self.network.messages_for(round, self.set_id), + self.voters.clone(), ); let (out_rx, outgoing) = outgoing_messages::( - self.config.local_key.clone(), - self.config.voters.clone(), round, + self.set_id, + self.config.local_key.clone(), + self.voters.clone(), self.network.clone(), ); @@ -484,51 +636,121 @@ impl, N, RA> voter::Environment for ); // join incoming network messages with locally originating ones. - let incoming = Box::new(incoming.select(out_rx)); + let incoming = Box::new(out_rx.select(incoming).map_err(Into::into)); // schedule network message cleanup when sink drops. let outgoing = Box::new(ClearOnDrop { round, + set_id: self.set_id, network: self.network.clone(), - inner: outgoing, + inner: outgoing.sink_map_err(Into::into), }); voter::RoundData { - prevote_timer: Box::new(prevote_timer.map_err(Error::Timer)), - precommit_timer: Box::new(precommit_timer.map_err(Error::Timer)), - voters: self.voters.clone(), + prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())), + precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())), + voters: (&*self.voters).clone(), incoming, outgoing, } } - fn completed(&self, round: u64, state: RoundState) -> Result<(), Self::Error> { + fn completed(&self, round: u64, state: RoundState>) -> Result<(), Self::Error> { + debug!( + target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}", + self.config.name(), + round, + self.set_id, + state.estimate.as_ref().map(|e| e.1), + state.finalized.as_ref().map(|e| e.1), + ); + let encoded_state = (round, state).encode(); if let Err(e) = self.inner.backend() .insert_aux(&[(LAST_COMPLETED_KEY, &encoded_state[..])], &[]) { warn!(target: "afg", "Shutting down voter due to error bookkeeping last completed round in DB: {:?}", e); - Err(Error::CouldNotCompleteRound(e)) + Err(Error::Client(e).into()) } else { Ok(()) } } - fn finalize_block(&self, hash: Block::Hash, number: u32) -> Result<(), Self::Error> { - // TODO: don't unconditionally notify. + fn finalize_block(&self, hash: Block::Hash, number: NumberFor) -> Result<(), Self::Error> { + // ideally some handle to a synchronization oracle would be used + // to avoid unconditionally notifying. if let Err(e) = self.inner.finalize_block(BlockId::Hash(hash), true) { warn!(target: "afg", "Error applying finality to block {:?}: {:?}", (hash, number), e); + + // we return without error because not being able to finalize (temporarily) is + // non-fatal. + return Ok(()); } - // we return without error in all cases because not being able to finalize is - // non-fatal. - Ok(()) + // lock must be held through writing to DB to avoid race + let mut authority_set = self.authority_set.inner().write(); + let client = &self.inner; + let status = authority_set.apply_changes(number, |canon_number| { + client.block_hash_from_id(&BlockId::number(canon_number)) + .map(|h| h.expect("given number always less than newly-finalized number; \ + thus there is a block with that number finalized already; qed")) + })?; + + if status.changed { + // write new authority set state to disk. + let encoded_set = authority_set.encode(); + + let write_result = if let Some((ref canon_hash, ref canon_number)) = status.new_set_block { + // we also overwrite the "last completed round" entry with a blank slate + // because from the perspective of the finality gadget, the chain has + // reset. + let round_state = RoundState::genesis((*canon_hash, *canon_number)); + let last_completed: LastCompleted<_, _> = (0, round_state); + let encoded = last_completed.encode(); + + client.backend().insert_aux( + &[ + (AUTHORITY_SET_KEY, &encoded_set[..]), + (LAST_COMPLETED_KEY, &encoded[..]), + ], + &[] + ) + } else { + client.backend().insert_aux(&[(AUTHORITY_SET_KEY, &encoded_set[..])], &[]) + }; + + if let Err(e) = write_result { + warn!(target: "finality", "Failed to write updated authority set to disk. Bailing."); + warn!(target: "finality", "Node is in a potentially inconsistent state."); + + return Err(e.into()); + } + } + + if let Some((canon_hash, canon_number)) = status.new_set_block { + // the authority set has changed. + let (new_id, set_ref) = authority_set.current(); + + if set_ref.len() > 16 { + info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len()); + } else { + info!("Applying GRANDPA set change to new set {:?}", set_ref); + } + Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet { + canon_hash, + canon_number, + set_id: new_id, + authorities: set_ref.to_vec(), + })) + } else { + Ok(()) + } } fn prevote_equivocation( &self, _round: u64, - equivocation: ::grandpa::Equivocation, Self::Signature> + equivocation: ::grandpa::Equivocation, Self::Signature> ) { warn!(target: "afg", "Detected prevote equivocation in the finality worker: {:?}", equivocation); // nothing yet; this could craft misbehavior reports of some kind. @@ -537,232 +759,209 @@ impl, N, RA> voter::Environment for fn precommit_equivocation( &self, _round: u64, - equivocation: Equivocation, Self::Signature> + equivocation: Equivocation, Self::Signature> ) { warn!(target: "afg", "Detected precommit equivocation in the finality worker: {:?}", equivocation); // nothing yet } } -/// Run a GRANDPA voter as a task. The returned future should be executed in a tokio runtime. -pub fn run_grandpa, N, RA: Send + Sync + 'static>( - config: Config, + + +/// A block-import handler for GRANDPA. +/// +/// This scans each imported block for signals of changing authority set. +/// When using GRANDPA, the block import worker should be using this block import +/// object. +pub struct GrandpaBlockImport, RA> { + inner: Arc>, + authority_set: SharedAuthoritySet>, +} + +impl, RA> BlockImport for GrandpaBlockImport where + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + DigestFor: Encode, + RA: GrandpaApi + TaggedTransactionQueue, +{ + type Error = ClientError; + + fn import_block(&self, mut block: ImportBlock, new_authorities: Option>) + -> Result + { + use authorities::PendingChange; + + let maybe_change = self.inner.runtime_api().grandpa_pending_change( + &BlockId::hash(*block.header.parent_hash()), + &block.header.digest().clone(), + )?; + + // when we update the authorities, we need to hold the lock + // until the block is written to prevent a race if we need to restore + // the old authority set on error. + let just_in_case = maybe_change.map(|change| { + let hash = block.header.hash(); + let number = block.header.number().clone(); + + let mut authorities = self.authority_set.inner().write(); + let old_set = authorities.clone(); + authorities.add_pending_change(PendingChange { + next_authorities: change.next_authorities, + finalization_depth: change.delay, + canon_height: number, + canon_hash: hash, + }); + + block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode()))); + (old_set, authorities) + }); + + let result = self.inner.import_block(block, new_authorities); + if let Err(ref e) = result { + if let Some((old_set, mut authorities)) = just_in_case { + debug!(target: "afg", "Restoring old set after block import error: {:?}", e); + *authorities = old_set; + } + } + + result + } +} + +/// Half of a link between a block-import worker and a the background voter. +// This should remain non-clone. +pub struct LinkHalf, RA> { client: Arc>, - voters: HashMap, + authority_set: SharedAuthoritySet>, +} + +/// Make block importer and link half necessary to tie the background voter +/// to it. +pub fn block_import, RA>(client: Arc>) + -> Result<(GrandpaBlockImport, LinkHalf), ClientError> + where + B: Backend + 'static, + E: CallExecutor + 'static + Clone + Send + Sync, + RA: GrandpaApi, +{ + use runtime_primitives::traits::Zero; + let authority_set = match client.backend().get_aux(AUTHORITY_SET_KEY)? { + None => { + info!(target: "afg", "Loading GRANDPA authorities \ + from genesis on what appears to be first startup."); + + // no authority set on disk: fetch authorities from genesis state. + // if genesis state is not available, we may be a light client, but these + // are unsupported for following GRANDPA directly. + let genesis_authorities = client.runtime_api() + .grandpa_authorities(&BlockId::number(Zero::zero()))?; + + let authority_set = SharedAuthoritySet::genesis(genesis_authorities); + let encoded = authority_set.inner().read().encode(); + client.backend().insert_aux(&[(AUTHORITY_SET_KEY, &encoded[..])], &[])?; + + authority_set + } + Some(raw) => ::authorities::AuthoritySet::decode(&mut &raw[..]) + .ok_or_else(|| ::client::error::ErrorKind::Backend( + format!("GRANDPA authority set kept in invalid format") + ))? + .into(), + }; + + Ok(( + GrandpaBlockImport { inner: client.clone(), authority_set: authority_set.clone() }, + LinkHalf { client, authority_set }, + )) +} + +/// Run a GRANDPA voter as a task. Provide configuration and a link to a +/// block import worker that has already been instantiated with `block_import`. +pub fn run_grandpa, N, RA>( + config: Config, + link: LinkHalf, network: N, -) -> Result,client::error::Error> where +) -> ::client::error::Result> where Block::Hash: Ord, B: Backend + 'static, - E: CallExecutor + 'static, + E: CallExecutor + Send + Sync + 'static, N: Network + 'static, N::In: 'static, - NumberFor: As, + NumberFor: BlockNumberOps, + DigestFor: Encode, + RA: Send + Sync + 'static, { + use futures::future::{self, Loop as FutureLoop}; + use runtime_primitives::traits::Zero; + + let LinkHalf { client, authority_set } = link; let chain_info = client.info()?; let genesis_hash = chain_info.chain.genesis_hash; - let last_finalized = ( - chain_info.chain.finalized_hash, - chain_info.chain.finalized_number.as_() - ); let (last_round_number, last_state) = match client.backend().get_aux(LAST_COMPLETED_KEY)? { - None => (0, RoundState::genesis((genesis_hash, 0))), - Some(raw) => <(u64, RoundState)>::decode(&mut &raw[..]) + None => (0, RoundState::genesis((genesis_hash, >::zero()))), + Some(raw) => LastCompleted::decode(&mut &raw[..]) .ok_or_else(|| ::client::error::ErrorKind::Backend( format!("Last GRANDPA round state kept in invalid format") ))? }; - let environment = Arc::new(Environment { - inner: client, - config, - voters, - network, + let voters = authority_set.inner().read().current().1.iter() + .cloned() + .collect(); + + let initial_environment = Arc::new(Environment { + inner: client.clone(), + config: config.clone(), + voters: Arc::new(voters), + network: network.clone(), + set_id: authority_set.set_id(), + authority_set: authority_set.clone(), }); - let voter = voter::Voter::new( - environment, - last_round_number, - last_state, - last_finalized, - ); + let work = future::loop_fn((initial_environment, last_round_number, last_state), move |params| { + let (env, last_round_number, last_state) = params; + debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); - Ok(voter.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e))) -} + let chain_info = match client.info() { + Ok(i) => i, + Err(e) => return future::Either::B(future::err(Error::Client(e))), + }; -#[cfg(test)] -mod tests { - use super::*; - use network::test::*; - use parking_lot::Mutex; - use tokio::runtime::current_thread; - use keyring::Keyring; - use client::BlockchainEvents; - - #[derive(Clone)] - struct TestGrandpaNetwork { - inner: Arc>, - peer_id: usize, - } + let last_finalized = ( + chain_info.chain.finalized_hash, + chain_info.chain.finalized_number, + ); - impl TestGrandpaNetwork { - fn new(inner: Arc>, peer_id: usize,) -> Self { - TestGrandpaNetwork { - inner, - peer_id, + let voter = voter::Voter::new(env, last_round_number, last_state, last_finalized); + let client = client.clone(); + let config = config.clone(); + let network = network.clone(); + let authority_set = authority_set.clone(); + future::Either::A(voter.then(move |res| match res { + // voters don't conclude naturally; this could reasonably be an error. + Ok(()) => Ok(FutureLoop::Break(())), + Err(ExitOrError::Error(e)) => Err(e), + Err(ExitOrError::AuthoritiesChanged(new)) => { + let env = Arc::new(Environment { + inner: client, + config, + voters: Arc::new(new.authorities.into_iter().collect()), + set_id: new.set_id, + network, + authority_set, + }); + + // start the new authority set using the block where the + // set changed (not where the signal happened!) as the base. + Ok(FutureLoop::Continue(( + env, + 0, // always start at round 0 when changing sets. + RoundState::genesis((new.canon_hash, new.canon_number)), + ))) } - } - } - - fn round_to_topic(round: u64) -> Hash { - let mut hash = Hash::default(); - round.using_encoded(|s| { - let raw = hash.as_mut(); - raw[..8].copy_from_slice(s); - }); - hash - } - - impl Network for TestGrandpaNetwork { - type In = Box,Error=()>>; - - fn messages_for(&self, round: u64) -> Self::In { - let messages = self.inner.lock().peer(self.peer_id) - .with_spec(|spec, _| spec.gossip.messages_for(round_to_topic(round))); - - let messages = messages.map_err( - move |_| panic!("Messages for round {} dropped too early", round) - ); - - Box::new(messages) - } - - fn send_message(&self, round: u64, message: Vec) { - let mut inner = self.inner.lock(); - inner.peer(self.peer_id).gossip_message(round_to_topic(round), message); - inner.route(); - } - - fn drop_messages(&self, round: u64) { - let topic = round_to_topic(round); - self.inner.lock().peer(self.peer_id) - .with_spec(|spec, _| spec.gossip.collect_garbage(|t| t == &topic)); - } - } - - const TEST_GOSSIP_DURATION: Duration = Duration::from_millis(500); - const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); - - #[test] - fn finalize_20_unanimous_3_peers() { - let mut net = TestNet::new(3); - net.peer(0).push_blocks(20, false); - net.sync(); - - let net = Arc::new(Mutex::new(net)); - let peers = &[ - (0, Keyring::Alice), - (1, Keyring::Bob), - (2, Keyring::Charlie), - ]; - - let voters: Vec<_> = peers.iter() - .map(|&(_, ref key)| AuthorityId(key.to_raw_public())) - .collect(); - - let mut finality_notifications = Vec::new(); - - let mut runtime = current_thread::Runtime::new().unwrap(); - for (peer_id, key) in peers { - let client = net.lock().peer(*peer_id).client().clone(); - finality_notifications.push( - client.finality_notification_stream() - .take_while(|n| Ok(n.header.number() < &20)) - .for_each(move |_| Ok(())) - ); - let voter = run_grandpa( - Config { - gossip_duration: TEST_GOSSIP_DURATION, - voters: voters.clone(), - local_key: Some(Arc::new(key.clone().into())), - }, - client, - voters.iter().map(|&id| (id, 1)).collect(), - TestGrandpaNetwork::new(net.clone(), *peer_id), - ).expect("all in order with client and network"); - - runtime.spawn(voter); - } - - // wait for all finalized on each. - let wait_for = ::futures::future::join_all(finality_notifications) - .map(|_| ()) - .map_err(|_| ()); - - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) - .map(|_| ()) - .map_err(|_| ()); - - runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); - } - - #[test] - fn observer_can_finalize() { - let mut net = TestNet::new(4); - net.peer(0).push_blocks(20, false); - net.sync(); - - let net = Arc::new(Mutex::new(net)); - let peers = &[ - (0, Keyring::Alice), - (1, Keyring::Bob), - (2, Keyring::Charlie), - ]; - - let voters: HashMap<_, _> = peers.iter() - .map(|&(_, ref key)| (AuthorityId(key.to_raw_public()), 1)) - .collect(); - - let mut finality_notifications = Vec::new(); - - let mut runtime = current_thread::Runtime::new().unwrap(); - let all_peers = peers.iter() - .cloned() - .map(|(id, key)| (id, Some(Arc::new(key.into())))) - .chain(::std::iter::once((3, None))); - - for (peer_id, local_key) in all_peers { - let client = net.lock().peer(peer_id).client().clone(); - finality_notifications.push( - client.finality_notification_stream() - .take_while(|n| Ok(n.header.number() < &20)) - .for_each(move |_| Ok(())) - ); - let voter = run_grandpa( - Config { - gossip_duration: TEST_GOSSIP_DURATION, - voters: voters.keys().cloned().collect(), - local_key, - }, - client, - voters.clone(), - TestGrandpaNetwork::new(net.clone(), peer_id), - ).expect("all in order with client and network"); - - runtime.spawn(voter); - } - - // wait for all finalized on each. - let wait_for = ::futures::future::join_all(finality_notifications) - .map(|_| ()) - .map_err(|_| ()); - - let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) - .map(|_| ()) - .map_err(|_| ()); + })) + }); - runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); - } + Ok(work.map_err(|e| warn!("GRANDPA Voter failed: {:?}", e))) } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs new file mode 100644 index 000000000000..1d8c7db7c540 --- /dev/null +++ b/core/finality-grandpa/src/tests.rs @@ -0,0 +1,460 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Tests and test helpers for GRANDPA. + +use super::*; +use network::test::{Block, Hash, TestNetFactory, Peer, PeersClient}; +use network::import_queue::{PassThroughVerifier}; +use network::config::{ProtocolConfig, Roles}; +use parking_lot::Mutex; +use tokio::runtime::current_thread; +use keyring::Keyring; +use client::BlockchainEvents; +use test_client::{self, runtime::BlockNumber}; +use codec::Decode; +use consensus_common::BlockOrigin; +use std::collections::HashSet; + +use authorities::AuthoritySet; + +type PeerData = Mutex>>; +type GrandpaPeer = Peer; + +struct GrandpaTestNet { + peers: Vec>, + test_config: TestApi, + started: bool +} + +impl GrandpaTestNet { + fn new(test_config: TestApi, n_peers: usize) -> Self { + let mut net = GrandpaTestNet { + peers: Vec::with_capacity(n_peers), + started: false, + test_config, + }; + let config = Self::default_config(); + + for _ in 0..n_peers { + net.add_peer(&config); + } + + net + } +} + +impl TestNetFactory for GrandpaTestNet { + type Verifier = PassThroughVerifier; + type PeerData = PeerData; + + /// Create new test network with peers and given config. + fn from_config(_config: &ProtocolConfig) -> Self { + GrandpaTestNet { + peers: Vec::new(), + test_config: Default::default(), + started: false + } + } + + fn default_config() -> ProtocolConfig { + // the authority role ensures gossip hits all nodes here. + ProtocolConfig { + roles: Roles::AUTHORITY, + } + } + + fn make_verifier(&self, _client: Arc, _cfg: &ProtocolConfig) + -> Arc + { + Arc::new(PassThroughVerifier(false)) // use non-instant finality. + } + + fn make_block_import(&self, client: Arc) + -> (Arc + Send + Sync>, PeerData) + { + let (import, link) = block_import(client, self.test_config.clone()).expect("Could not create block import for fresh peer."); + (Arc::new(import), Mutex::new(Some(link))) + } + + fn peer(&self, i: usize) -> &GrandpaPeer { + &self.peers[i] + } + + fn peers(&self) -> &Vec> { + &self.peers + } + + fn mut_peers>)>(&mut self, closure: F) { + closure(&mut self.peers); + } + + fn started(&self) -> bool { + self.started + } + + fn set_started(&mut self, new: bool) { + self.started = new; + } +} + +#[derive(Clone)] +struct MessageRouting { + inner: Arc>, + peer_id: usize, +} + +impl MessageRouting { + fn new(inner: Arc>, peer_id: usize,) -> Self { + MessageRouting { + inner, + peer_id, + } + } +} + +fn make_topic(round: u64, set_id: u64) -> Hash { + let mut hash = Hash::default(); + round.using_encoded(|s| { + let raw = hash.as_mut(); + raw[..8].copy_from_slice(s); + }); + set_id.using_encoded(|s| { + let raw = hash.as_mut(); + raw[8..16].copy_from_slice(s); + }); + hash +} + +impl Network for MessageRouting { + type In = Box,Error=()>>; + + fn messages_for(&self, round: u64, set_id: u64) -> Self::In { + let messages = self.inner.lock().peer(self.peer_id) + .with_spec(|spec, _| spec.gossip.messages_for(make_topic(round, set_id))); + + let messages = messages.map_err( + move |_| panic!("Messages for round {} dropped too early", round) + ); + + Box::new(messages) + } + + fn send_message(&self, round: u64, set_id: u64, message: Vec) { + let mut inner = self.inner.lock(); + inner.peer(self.peer_id).gossip_message(make_topic(round, set_id), message); + inner.route_until_complete(); + } + + fn drop_messages(&self, round: u64, set_id: u64) { + let topic = make_topic(round, set_id); + self.inner.lock().peer(self.peer_id) + .with_spec(|spec, _| spec.gossip.collect_garbage(|t| t == &topic)); + } +} + +#[derive(Default, Clone)] +struct TestApi { + genesis_authorities: Vec<(AuthorityId, u64)>, + scheduled_changes: Arc>>>, +} + +impl TestApi { + fn new(genesis_authorities: Vec<(AuthorityId, u64)>) -> Self { + TestApi { + genesis_authorities, + scheduled_changes: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl ApiClient for TestApi { + fn genesis_authorities(&self) -> Result, ClientError> { + Ok(self.genesis_authorities.clone()) + } + + fn scheduled_change(&self, header: &::Header) + -> Result>>, ClientError> + { + // we take only scheduled changes at given block number where there are no + // extrinsics. + Ok(self.scheduled_changes.lock().get(&header.hash()).map(|c| c.clone())) + } +} + +const TEST_GOSSIP_DURATION: Duration = Duration::from_millis(500); +const TEST_ROUTING_INTERVAL: Duration = Duration::from_millis(50); + +fn make_ids(keys: &[Keyring]) -> Vec<(AuthorityId, u64)> { + keys.iter() + .map(|key| AuthorityId(key.to_raw_public())) + .map(|id| (id, 1)) + .collect() +} + +#[test] +fn finalize_3_voters_no_observers() { + let peers = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let voters = make_ids(peers); + + let mut net = GrandpaTestNet::new(TestApi::new(voters), 3); + net.peer(0).push_blocks(20, false); + net.sync(); + + for i in 0..3 { + assert_eq!(net.peer(i).client().info().unwrap().chain.best_number, 20, + "Peer #{} failed to sync", i); + } + + let net = Arc::new(Mutex::new(net)); + + let mut finality_notifications = Vec::new(); + let mut runtime = current_thread::Runtime::new().unwrap(); + + for (peer_id, key) in peers.iter().enumerate() { + let (client, link) = { + let mut net = net.lock(); + // temporary needed for some reason + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].client().clone(), + link, + ) + }; + finality_notifications.push( + client.finality_notification_stream() + .take_while(|n| Ok(n.header.number() < &20)) + .for_each(|_| Ok(())) + ); + let voter = run_grandpa( + Config { + gossip_duration: TEST_GOSSIP_DURATION, + local_key: Some(Arc::new(key.clone().into())), + name: Some(format!("peer#{}", peer_id)), + }, + link, + MessageRouting::new(net.clone(), peer_id), + ).expect("all in order with client and network"); + + runtime.spawn(voter); + } + + // wait for all finalized on each. + let wait_for = ::futures::future::join_all(finality_notifications) + .map(|_| ()) + .map_err(|_| ()); + + let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) + .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) + .map(|_| ()) + .map_err(|_| ()); + + runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); +} + +#[test] +fn finalize_3_voters_1_observer() { + let peers = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let voters = make_ids(peers); + + let mut net = GrandpaTestNet::new(TestApi::new(voters), 4); + net.peer(0).push_blocks(20, false); + net.sync(); + + let net = Arc::new(Mutex::new(net)); + let mut finality_notifications = Vec::new(); + + let mut runtime = current_thread::Runtime::new().unwrap(); + let all_peers = peers.iter() + .cloned() + .map(|key| Some(Arc::new(key.into()))) + .chain(::std::iter::once(None)); + + for (peer_id, local_key) in all_peers.enumerate() { + let (client, link) = { + let mut net = net.lock(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].client().clone(), + link, + ) + }; + finality_notifications.push( + client.finality_notification_stream() + .take_while(|n| Ok(n.header.number() < &20)) + .for_each(move |_| Ok(())) + ); + let voter = run_grandpa( + Config { + gossip_duration: TEST_GOSSIP_DURATION, + local_key, + name: Some(format!("peer#{}", peer_id)), + }, + link, + MessageRouting::new(net.clone(), peer_id), + ).expect("all in order with client and network"); + + runtime.spawn(voter); + } + + // wait for all finalized on each. + let wait_for = ::futures::future::join_all(finality_notifications) + .map(|_| ()) + .map_err(|_| ()); + + let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) + .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) + .map(|_| ()) + .map_err(|_| ()); + + runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); +} + +#[test] +fn transition_3_voters_twice_1_observer() { + let peers_a = &[ + Keyring::Alice, + Keyring::Bob, + Keyring::Charlie, + ]; + + let peers_b = &[ + Keyring::Dave, + Keyring::Eve, + Keyring::Ferdie, + ]; + + let peers_c = &[ + Keyring::Alice, + Keyring::Eve, + Keyring::Two, + ]; + + let observer = &[Keyring::One]; + + let genesis_voters = make_ids(peers_a); + + let api = TestApi::new(genesis_voters); + let transitions = api.scheduled_changes.clone(); + let add_transition = move |hash, change| { + transitions.lock().insert(hash, change); + }; + + let mut net = GrandpaTestNet::new(api, 9); + + // first 20 blocks: transition at 15, applied at 20. + { + net.peer(0).push_blocks(14, false); + net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { + let block = builder.bake().unwrap(); + add_transition(block.header.hash(), ScheduledChange { + next_authorities: make_ids(peers_b), + delay: 4, + }); + + block + }); + net.peer(0).push_blocks(5, false); + } + + // at block 21 we do another transition, but this time instant. + // add more until we have 30. + { + net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { + let block = builder.bake().unwrap(); + add_transition(block.header.hash(), ScheduledChange { + next_authorities: make_ids(peers_c), + delay: 0, + }); + + block + }); + + net.peer(0).push_blocks(9, false); + } + net.sync(); + + for (i, peer) in net.peers().iter().enumerate() { + assert_eq!(peer.client().info().unwrap().chain.best_number, 30, + "Peer #{} failed to sync", i); + + let set_raw = peer.client().backend().get_aux(::AUTHORITY_SET_KEY).unwrap().unwrap(); + let set = AuthoritySet::::decode(&mut &set_raw[..]).unwrap(); + + assert_eq!(set.current(), (0, make_ids(peers_a).as_slice())); + assert_eq!(set.pending_changes().len(), 2); + } + + let net = Arc::new(Mutex::new(net)); + let mut finality_notifications = Vec::new(); + + let mut runtime = current_thread::Runtime::new().unwrap(); + let all_peers = peers_a.iter() + .chain(peers_b) + .chain(peers_c) + .chain(observer) + .cloned() + .collect::>() // deduplicate + .into_iter() + .map(|key| Some(Arc::new(key.into()))) + .enumerate(); + + for (peer_id, local_key) in all_peers { + let (client, link) = { + let mut net = net.lock(); + let link = net.peers[peer_id].data.lock().take().expect("link initialized at startup; qed"); + ( + net.peers[peer_id].client().clone(), + link, + ) + }; + finality_notifications.push( + client.finality_notification_stream() + .take_while(|n| Ok(n.header.number() < &30)) + .for_each(move |_| Ok(())) + .map(move |()| { + let set_raw = client.backend().get_aux(::AUTHORITY_SET_KEY).unwrap().unwrap(); + let set = AuthoritySet::::decode(&mut &set_raw[..]).unwrap(); + + assert_eq!(set.current(), (2, make_ids(peers_c).as_slice())); + assert!(set.pending_changes().is_empty()); + }) + ); + let voter = run_grandpa( + Config { + gossip_duration: TEST_GOSSIP_DURATION, + local_key, + name: Some(format!("peer#{}", peer_id)), + }, + link, + MessageRouting::new(net.clone(), peer_id), + ).expect("all in order with client and network"); + + runtime.spawn(voter); + } + + // wait for all finalized on each. + let wait_for = ::futures::future::join_all(finality_notifications) + .map(|_| ()) + .map_err(|_| ()); + + let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) + .for_each(move |_| { net.lock().route_until_complete(); Ok(()) }) + .map(|_| ()) + .map_err(|_| ()); + + runtime.block_on(wait_for.select(drive_to_completion).map_err(|_| ())).unwrap(); +} diff --git a/core/network/src/import_queue.rs b/core/network/src/import_queue.rs index eb6cf24c9027..cb7bf86e697c 100644 --- a/core/network/src/import_queue.rs +++ b/core/network/src/import_queue.rs @@ -34,14 +34,16 @@ use primitives::AuthorityId; use runtime_primitives::traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero}; pub use blocks::BlockData; -use chain::Client; +use client::error::Error as ClientError; use error::{ErrorKind, Error}; use protocol::Context; use service::ExecuteInContext; use sync::ChainSync; -pub use consensus::{ImportBlock, ImportResult, BlockOrigin}; +pub use consensus::{ImportBlock, BlockImport, ImportResult, BlockOrigin}; +/// Shared block import struct used by the queue. +pub type SharedBlockImport = Arc + Send + Sync>; #[cfg(any(test, feature = "test-helpers"))] use std::cell::RefCell; @@ -66,14 +68,9 @@ pub trait ImportQueue: Send + Sync { /// /// This is called automatically by the network service when synchronization /// begins. - fn start( - &self, - _sync: Weak>>, - _service: Weak, - _chain: Weak> - ) -> Result<(), Error> where + fn start(&self, _link: L) -> Result<(), Error> where Self: Sized, - E: 'static + ExecuteInContext, + L: 'static + Link, { Ok(()) } @@ -103,6 +100,7 @@ pub struct BasicQueue> { handle: Mutex>>, data: Arc>, verifier: Arc, + block_import: SharedBlockImport, } /// Locks order: queue, queue_blocks, best_importing_number @@ -116,11 +114,12 @@ struct AsyncImportQueueData { impl> BasicQueue { /// Instantiate a new basic queue, with given verifier. - pub fn new(verifier: Arc) -> Self { + pub fn new(verifier: Arc, block_import: SharedBlockImport) -> Self { Self { handle: Mutex::new(None), data: Arc::new(AsyncImportQueueData::new()), verifier, + block_import, } } } @@ -138,18 +137,17 @@ impl AsyncImportQueueData { } impl> ImportQueue for BasicQueue { - fn start>( + fn start>( &self, - sync: Weak>>, - service: Weak, - chain: Weak> + link: L, ) -> Result<(), Error> { debug_assert!(self.handle.lock().is_none()); let qdata = self.data.clone(); let verifier = self.verifier.clone(); + let block_import = self.block_import.clone(); *self.handle.lock() = Some(::std::thread::Builder::new().name("ImportQueue".into()).spawn(move || { - import_thread(sync, service, chain, qdata, verifier) + import_thread(block_import, link, qdata, verifier) }).map_err(|err| Error::from(ErrorKind::Io(err)))?); Ok(()) } @@ -215,10 +213,9 @@ impl> Drop for BasicQueue { } /// Blocks import thread. -fn import_thread, V: Verifier>( - sync: Weak>>, - service: Weak, - chain: Weak>, +fn import_thread, V: Verifier>( + block_import: SharedBlockImport, + link: L, qdata: Arc>, verifier: Arc ) { @@ -243,91 +240,87 @@ fn import_thread, V: Verifier>( } }; - match (sync.upgrade(), service.upgrade(), chain.upgrade()) { - (Some(sync), Some(service), Some(chain)) => { - let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); - if !import_many_blocks( - &mut SyncLink{chain: &sync, client: &*chain, context: &*service}, - Some(&*qdata), - new_blocks, - verifier.clone(), - ) { - break; - } - - let mut queue_blocks = qdata.queue_blocks.write(); - for blocks_hash in blocks_hashes { - queue_blocks.remove(&blocks_hash); - } - }, - _ => break, + let blocks_hashes: Vec = new_blocks.1.iter().map(|b| b.block.hash.clone()).collect(); + if !import_many_blocks( + &*block_import, + &link, + Some(&*qdata), + new_blocks, + verifier.clone(), + ) { + break; + } + + let mut queue_blocks = qdata.queue_blocks.write(); + for blocks_hash in blocks_hashes { + queue_blocks.remove(&blocks_hash); } } trace!(target: "sync", "Stopping import thread"); } -/// ChainSync link trait. -trait SyncLinkApi { - /// Get chain reference. - fn chain(&self) -> &Client; + +/// Hooks that the verification queue can use to influence the synchronization +/// algorithm. +pub trait Link: Send { /// Block imported. - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor); + fn block_imported(&self, _hash: &B::Hash, _number: NumberFor) { } /// Maintain sync. - fn maintain_sync(&mut self); + fn maintain_sync(&self) { } /// Disconnect from peer. - fn useless_peer(&mut self, who: NodeIndex, reason: &str); + fn useless_peer(&self, _who: NodeIndex, _reason: &str) { } /// Disconnect from peer and restart sync. - fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str); + fn note_useless_and_restart_sync(&self, _who: NodeIndex, _reason: &str) { } /// Restart sync. - fn restart(&mut self); + fn restart(&self) { } } +/// A link implementation that does nothing. +pub struct NoopLink; -/// Link with the ChainSync service. -struct SyncLink<'a, B: 'a + BlockT, E: 'a + ExecuteInContext> { - pub chain: &'a RwLock>, - pub client: &'a Client, - pub context: &'a E, +impl Link for NoopLink { } + +/// A link implementation that connects to the network. +pub struct NetworkLink> { + /// The chain-sync handle + pub(crate) sync: Weak>>, + /// Network context. + pub(crate) context: Weak, } -impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext> SyncLink<'a, B, E> { +impl> NetworkLink { /// Execute closure with locked ChainSync. - fn with_sync, &mut Context)>(&mut self, closure: F) { - let service = self.context; - let sync = self.chain; - service.execute_in_context(move |protocol| { - let mut sync = sync.write(); - closure(&mut *sync, protocol) - }); + fn with_sync, &mut Context)>(&self, closure: F) { + if let (Some(sync), Some(service)) = (self.sync.upgrade(), self.context.upgrade()) { + service.execute_in_context(move |protocol| { + let mut sync = sync.write(); + closure(&mut *sync, protocol) + }); + } } } -impl<'a, B: 'static + BlockT, E: 'a + ExecuteInContext> SyncLinkApi for SyncLink<'a, B, E> { - - fn chain(&self) -> &Client { - self.client - } - - fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { +impl> Link for NetworkLink { + fn block_imported(&self, hash: &B::Hash, number: NumberFor) { self.with_sync(|sync, _| sync.block_imported(&hash, number)) } - fn maintain_sync(&mut self) { + fn maintain_sync(&self) { self.with_sync(|sync, protocol| sync.maintain_sync(protocol)) } - fn useless_peer(&mut self, who: NodeIndex, reason: &str) { + fn useless_peer(&self, who: NodeIndex, reason: &str) { self.with_sync(|_, protocol| protocol.report_peer(who, Severity::Useless(reason))) } - fn note_useless_and_restart_sync(&mut self, who: NodeIndex, reason: &str) { + fn note_useless_and_restart_sync(&self, who: NodeIndex, reason: &str) { self.with_sync(|sync, protocol| { protocol.report_peer(who, Severity::Useless(reason)); // is this actually malign or just useless? sync.restart(protocol); }) } - fn restart(&mut self) { + fn restart(&self) { self.with_sync(|sync, protocol| sync.restart(protocol)) } } @@ -360,7 +353,8 @@ enum BlockImportError { /// Import a bunch of blocks. fn import_many_blocks<'a, B: BlockT, V: Verifier>( - link: &mut SyncLinkApi, + import_handle: &BlockImport, + link: &Link, qdata: Option<&AsyncImportQueueData>, blocks: (BlockOrigin, Vec>), verifier: Arc @@ -383,7 +377,7 @@ fn import_many_blocks<'a, B: BlockT, V: Verifier>( // Blocks in the response/drain should be in ascending order. for block in blocks { let import_result = import_single_block( - link.chain(), + import_handle, blocks_origin.clone(), block, verifier.clone(), @@ -407,7 +401,7 @@ fn import_many_blocks<'a, B: BlockT, V: Verifier>( /// Single block import function. fn import_single_block>( - chain: &Client, + import_handle: &BlockImport, block_origin: BlockOrigin, block: BlockData, verifier: Arc @@ -449,7 +443,7 @@ fn import_single_block>( BlockImportError::VerificationFailed(peer, msg) })?; - match chain.import(import_block, new_authorities) { + match import_handle.import_block(import_block, new_authorities) { Ok(ImportResult::AlreadyInChain) => { trace!(target: "sync", "Block already in chain {}: {:?}", number, hash); Ok(BlockImportResult::ImportedKnown(hash, number)) @@ -478,8 +472,8 @@ fn import_single_block>( } /// Process single block import result. -fn process_import_result<'a, B: BlockT>( - link: &mut SyncLinkApi, +fn process_import_result( + link: &Link, result: Result::Header as HeaderT>::Number>, BlockImportError> ) -> usize { @@ -576,40 +570,61 @@ impl Verifier for PassThroughVerifier { } } -#[cfg(any(test, feature = "test-helpers"))] /// Blocks import queue that is importing blocks in the same thread. /// The boolean value indicates whether blocks should be imported without instant finality. -pub struct SyncImportQueue>(Arc, ImportCB); #[cfg(any(test, feature = "test-helpers"))] -impl> SyncImportQueue { - /// Create a new SyncImportQueue wrapping the given Verifier - pub fn new(verifier: Arc) -> Self { - SyncImportQueue(verifier, ImportCB::new()) +pub struct SyncImportQueue> { + verifier: Arc, + link: ImportCB, + block_import: SharedBlockImport, +} + +#[cfg(any(test, feature = "test-helpers"))] +impl> SyncImportQueue { + /// Create a new SyncImportQueue wrapping the given Verifier and block import + /// handle. + pub fn new(verifier: Arc, block_import: SharedBlockImport) -> Self { + let queue = SyncImportQueue { + verifier, + link: ImportCB::new(), + block_import, + }; + + let v = queue.verifier.clone(); + let import_handle = queue.block_import.clone(); + queue.link.set(Box::new(move |origin, new_blocks| { + let verifier = v.clone(); + import_many_blocks( + &*import_handle, + &NoopLink, + None, + (origin, new_blocks), + verifier, + ) + })); + + queue } } #[cfg(any(test, feature = "test-helpers"))] impl> ImportQueue for SyncImportQueue { - fn start>( + fn start>( &self, - sync: Weak>>, - service: Weak, - chain: Weak> + link: L, ) -> Result<(), Error> { - let v = self.0.clone(); - self.1.set(Box::new(move | origin, new_blocks | { + let v = self.verifier.clone(); + let import_handle = self.block_import.clone(); + self.link.set(Box::new(move |origin, new_blocks| { let verifier = v.clone(); - match (sync.upgrade(), service.upgrade(), chain.upgrade()) { - (Some(sync), Some(service), Some(chain)) => - import_many_blocks( - &mut SyncLink{chain: &sync, client: &*chain, context: &*service}, - None, - (origin, new_blocks), - verifier, - ), - _ => false - } + import_many_blocks( + &*import_handle, + &link, + None, + (origin, new_blocks), + verifier, + ) })); Ok(()) } @@ -629,7 +644,7 @@ impl> ImportQueue for SyncImpor } fn import_blocks(&self, origin: BlockOrigin, blocks: Vec>) { - self.1.call(origin, blocks); + self.link.call(origin, blocks); } } @@ -639,42 +654,49 @@ pub mod tests { use message; use test_client::{self, TestClient}; use test_client::runtime::{Block, Hash}; - use on_demand::tests::DummyExecutor; use runtime_primitives::generic::BlockId; + use std::cell::Cell; use super::*; - struct TestLink { - chain: Arc>, - imported: usize, - maintains: usize, - disconnects: usize, - restarts: usize, + imported: Cell, + maintains: Cell, + disconnects: Cell, + restarts: Cell, } impl TestLink { fn new() -> TestLink { TestLink { - chain: Arc::new(test_client::new()), - imported: 0, - maintains: 0, - disconnects: 0, - restarts: 0, + imported: Cell::new(0), + maintains: Cell::new(0), + disconnects: Cell::new(0), + restarts: Cell::new(0), } } fn total(&self) -> usize { - self.imported + self.maintains + self.disconnects + self.restarts + self.imported.get() + self.maintains.get() + self.disconnects.get() + self.restarts.get() } } - impl SyncLinkApi for TestLink { - fn chain(&self) -> &Client { &*self.chain } - fn block_imported(&mut self, _hash: &Hash, _number: NumberFor) { self.imported += 1; } - fn maintain_sync(&mut self) { self.maintains += 1; } - fn useless_peer(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; } - fn note_useless_and_restart_sync(&mut self, _: NodeIndex, _: &str) { self.disconnects += 1; self.restarts += 1; } - fn restart(&mut self) { self.restarts += 1; } + impl Link for TestLink { + fn block_imported(&self, _hash: &Hash, _number: NumberFor) { + self.imported.set(self.imported.get() + 1); + } + fn maintain_sync(&self) { + self.maintains.set(self.maintains.get() + 1); + } + fn useless_peer(&self, _: NodeIndex, _: &str) { + self.disconnects.set(self.disconnects.get() + 1); + } + fn note_useless_and_restart_sync(&self, id: NodeIndex, r: &str) { + self.useless_peer(id, r); + self.restart(); + } + fn restart(&self) { + self.restarts.set(self.restarts.get() + 1); + } } fn prepare_good_block() -> (client::Client, Hash, u64, BlockData) { @@ -735,39 +757,39 @@ pub mod tests { #[test] fn process_import_result_works() { - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); assert_eq!(link.total(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedKnown(Default::default(), 0))), 1); assert_eq!(link.total(), 1); - assert_eq!(link.imported, 1); + assert_eq!(link.imported.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Ok(BlockImportResult::ImportedUnknown(Default::default(), 0))), 1); assert_eq!(link.total(), 1); - assert_eq!(link.imported, 1); + assert_eq!(link.imported.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::IncompleteHeader(Some(0)))), 0); assert_eq!(link.total(), 1); - assert_eq!(link.disconnects, 1); + assert_eq!(link.disconnects.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::IncompleteJustification(Some(0)))), 0); assert_eq!(link.total(), 1); - assert_eq!(link.disconnects, 1); + assert_eq!(link.disconnects.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Err(BlockImportError::UnknownParent)), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::UnknownParent)), 0); assert_eq!(link.total(), 1); - assert_eq!(link.restarts, 1); + assert_eq!(link.restarts.get(), 1); - let mut link = TestLink::new(); - assert_eq!(process_import_result::(&mut link, Err(BlockImportError::Error)), 0); + let link = TestLink::new(); + assert_eq!(process_import_result::(&link, Err(BlockImportError::Error)), 0); assert_eq!(link.total(), 1); - assert_eq!(link.restarts, 1); + assert_eq!(link.restarts.get(), 1); } #[test] @@ -776,7 +798,9 @@ pub mod tests { let qdata = AsyncImportQueueData::new(); let verifier = Arc::new(PassThroughVerifier(true)); qdata.is_stopping.store(true, Ordering::SeqCst); + let client = test_client::new(); assert!(!import_many_blocks( + &client, &mut TestLink::new(), Some(&qdata), (BlockOrigin::File, vec![block.clone(), block]), @@ -789,10 +813,8 @@ pub mod tests { // Perform this test multiple times since it exhibits non-deterministic behavior. for _ in 0..100 { let verifier = Arc::new(PassThroughVerifier(true)); - let queue = BasicQueue::new(verifier); - let service = Arc::new(DummyExecutor); - let chain = Arc::new(test_client::new()); - queue.start(Weak::new(), Arc::downgrade(&service), Arc::downgrade(&chain) as Weak>).unwrap(); + let queue = BasicQueue::new(verifier, Arc::new(test_client::new())); + queue.start(TestLink::new()).unwrap(); drop(queue); } } diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 92c9beea9466..0b3d0f3f3207 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -84,7 +84,6 @@ impl, H: ExHashT> Service Result>, Error> { - let chain = params.chain.clone(); let import_queue = Arc::new(import_queue); let handler = Arc::new(Protocol::new( params.config, @@ -98,20 +97,22 @@ impl, H: ExHashT> Service; -pub struct Peer> { +pub struct Peer, D> { client: Arc, pub sync: Arc>, pub queue: Arc>>, import_queue: Arc>, executor: Arc, + /// Some custom data set up at initialization time. + pub data: D, } -impl> Peer { +impl, D> Peer { fn new( client: Arc, sync: Arc>, queue: Arc>>, import_queue: Arc>, + data: D, ) -> Self { let executor = Arc::new(DummyContextExecutor(sync.clone(), queue.clone())); - Peer { client, sync, queue, import_queue, executor} + Peer { client, sync, queue, import_queue, executor, data } } /// Called after blockchain has been populated to updated current state. fn start(&self) { // Update the sync state to the latest chain state. let info = self.client.info().expect("In-mem client does not fail"); let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); - self.import_queue.start( - Arc::downgrade(&self.sync.sync()), - Arc::downgrade(&self.executor), - Arc::downgrade(&self.sync.context_data().chain)).expect("Test ImportQueue always starts"); + let network_link = ::import_queue::NetworkLink { + sync: Arc::downgrade(self.sync.sync()), + context: Arc::downgrade(&self.executor), + }; + + self.import_queue.start(network_link).expect("Test ImportQueue always starts"); self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), info.chain.best_hash, &header); } @@ -187,6 +193,11 @@ impl> Peer { io.to_disconnect.clone() } + fn with_io<'a, F, U>(&'a self, f: F) -> U where F: FnOnce(&mut TestIo<'a>) -> U { + let mut io = TestIo::new(&self.queue, None); + f(&mut io) + } + /// Produce the next pending message to send to another peer. fn pending_message(&self) -> Option { self.flush(); @@ -229,25 +240,39 @@ impl> Peer { /// Add blocks to the peer -- edit the block before adding pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, mut edit_block: F) - where F: FnMut(&mut BlockBuilder) + where F: FnMut(BlockBuilder) -> Block { - for _ in 0 .. count { - let mut builder = self.client.new_block().unwrap(); - edit_block(&mut builder); - let block = builder.bake().unwrap(); + use blocks::BlockData; + + for _ in 0..count { + let builder = self.client.new_block().unwrap(); + let block = edit_block(builder); let hash = block.header.hash(); trace!("Generating {}, (#{}, parent={})", hash, block.header.number, block.header.parent_hash); let header = block.header.clone(); - self.client.justify_and_import(origin, block).unwrap(); - self.sync.on_block_imported(&mut TestIo::new(&self.queue, None), hash, &header); + + // NOTE: if we use a non-synchronous queue in the test-net in the future, + // this may not work. + self.import_queue.import_blocks(origin, vec![BlockData { + origin: None, + block: ::message::BlockData:: { + hash, + header: Some(header), + body: Some(block.extrinsics), + receipt: None, + message_queue: None, + justification: Some(Vec::new()), + }, + }]); } + } /// Push blocks to the peer (simplified: with or without a TX) pub fn push_blocks(&self, count: usize, with_tx: bool) { let mut nonce = 0; if with_tx { - self.generate_blocks(count, BlockOrigin::File, |builder| { + self.generate_blocks(count, BlockOrigin::File, |mut builder| { let transfer = Transfer { from: Keyring::Alice.to_raw_public().into(), to: Keyring::Alice.to_raw_public().into(), @@ -257,9 +282,10 @@ impl> Peer { let signature = Keyring::from_raw_public(transfer.from.to_fixed_bytes()).unwrap().sign(&transfer.encode()).into(); builder.push(Extrinsic { transfer, signature }).unwrap(); nonce = nonce + 1; + builder.bake().unwrap() }); } else { - self.generate_blocks(count, BlockOrigin::File, |_| ()); + self.generate_blocks(count, BlockOrigin::File, |builder| builder.bake().unwrap()); } } @@ -292,6 +318,7 @@ impl TransactionPool for EmptyTransactionPool { pub trait TestNetFactory: Sized { type Verifier: 'static + Verifier; + type PeerData: Default; /// These two need to be implemented! fn from_config(config: &ProtocolConfig) -> Self; @@ -299,13 +326,20 @@ pub trait TestNetFactory: Sized { /// Get reference to peer. - fn peer(&self, i: usize) -> &Peer; - fn peers(&self) -> &Vec>>; - fn mut_peers>>)>(&mut self, closure: F ); + fn peer(&self, i: usize) -> &Peer; + fn peers(&self) -> &Vec>>; + fn mut_peers>>)>(&mut self, closure: F); fn started(&self) -> bool; fn set_started(&mut self, now: bool); + /// Get custom block import handle for fresh client, along with peer data. + fn make_block_import(&self, client: Arc) + -> (Arc + Send + Sync>, Self::PeerData) + { + (client, Default::default()) + } + fn default_config() -> ProtocolConfig { ProtocolConfig::default() } @@ -326,7 +360,9 @@ pub trait TestNetFactory: Sized { let client = Arc::new(test_client::new()); let tx_pool = Arc::new(EmptyTransactionPool); let verifier = self.make_verifier(client.clone(), config); - let import_queue = Arc::new(SyncImportQueue::new(verifier)); + let (block_import, data) = self.make_block_import(client.clone()); + + let import_queue = Arc::new(SyncImportQueue::new(verifier, block_import)); let specialization = DummySpecialization { gossip: ConsensusGossip::new(), }; @@ -343,7 +379,8 @@ pub trait TestNetFactory: Sized { client, Arc::new(sync), Arc::new(RwLock::new(VecDeque::new())), - import_queue + import_queue, + data, )); self.mut_peers(|peers| { @@ -453,12 +490,13 @@ pub trait TestNetFactory: Sized { } pub struct TestNet { - peers: Vec>>, + peers: Vec>>, started: bool } impl TestNetFactory for TestNet { type Verifier = PassThroughVerifier; + type PeerData = (); /// Create new test network with peers and given config. fn from_config(_config: &ProtocolConfig) -> Self { @@ -474,15 +512,15 @@ impl TestNetFactory for TestNet { Arc::new(PassThroughVerifier(false)) } - fn peer(&self, i: usize) -> &Peer { + fn peer(&self, i: usize) -> &Peer { &self.peers[i] } - fn peers(&self) -> &Vec>> { + fn peers(&self) -> &Vec>> { &self.peers } - fn mut_peers>>)>(&mut self, closure: F ) { + fn mut_peers>>)>(&mut self, closure: F ) { closure(&mut self.peers); } diff --git a/core/network/src/test/sync.rs b/core/network/src/test/sync.rs index 479b96ecc013..22dedf7b3ed6 100644 --- a/core/network/src/test/sync.rs +++ b/core/network/src/test/sync.rs @@ -94,7 +94,10 @@ fn own_blocks_are_announced() { ::env_logger::init().ok(); let mut net = TestNet::new(3); net.sync(); // connect'em - net.peer(0).generate_blocks(1, BlockOrigin::Own, |_| ()); + net.peer(0).generate_blocks(1, BlockOrigin::Own, |builder| builder.bake().unwrap()); + + let header = net.peer(0).client().header(&BlockId::Number(1)).unwrap().unwrap(); + net.peer(0).with_io(|io| net.peer(0).sync.on_block_imported(io, header.hash(), &header)); net.sync(); assert_eq!(net.peer(0).client.backend().blockchain().info().unwrap().best_number, 1); assert_eq!(net.peer(1).client.backend().blockchain().info().unwrap().best_number, 1); diff --git a/core/service/src/chain_ops.rs b/core/service/src/chain_ops.rs index 221b05dd7744..4f2f10a39cb3 100644 --- a/core/service/src/chain_ops.rs +++ b/core/service/src/chain_ops.rs @@ -21,7 +21,7 @@ use futures::Future; use runtime_primitives::generic::{SignedBlock, BlockId}; use runtime_primitives::traits::{As, Block, Header}; -use network::import_queue::{ImportQueue, BlockData}; +use network::import_queue::{ImportQueue, Link, BlockData}; use network::message; use consensus_common::BlockOrigin; @@ -90,8 +90,12 @@ pub fn export_blocks(config: FactoryFullConfiguration, exit: E, mut pub fn import_blocks(config: FactoryFullConfiguration, exit: E, mut input: R) -> error::Result<()> where F: ServiceFactory, E: Future + Send + 'static, R: Read, { + struct DummyLink; + impl Link for DummyLink { } + let client = new_client::(&config)?; let queue = components::FullComponents::::build_import_queue(&config, client.clone())?; + queue.start(DummyLink)?; let (exit_send, exit_recv) = std::sync::mpsc::channel(); ::std::thread::spawn(move || { @@ -101,7 +105,7 @@ pub fn import_blocks(config: FactoryFullConfiguration, exit: E, mut let count: u32 = Decode::decode(&mut input).ok_or("Error reading file")?; info!("Importing {} blocks", count); - let mut block_count = 0; + let mut block_count = 0; for b in 0 .. count { if exit_recv.try_recv().is_ok() { break; diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index f64882bbe346..9b0283795fb8 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -472,8 +472,8 @@ impl network::TransactionPool, ComponentBlock< /// // The first one is for the initializing the full import queue and the second for the /// // light import queue. /// ImportQueue = BasicQueue -/// { |_, _| Ok(BasicQueue::new(Arc::new(NoneVerifier {}))) } -/// { |_, _| Ok(BasicQueue::new(Arc::new(NoneVerifier {}))) }, +/// { |_, client| Ok(BasicQueue::new(Arc::new(NoneVerifier {}, client))) } +/// { |_, client| Ok(BasicQueue::new(Arc::new(NoneVerifier {}, client))) }, /// } /// } /// ``` diff --git a/core/test-runtime/src/lib.rs b/core/test-runtime/src/lib.rs index 9fbdb1163878..686bb14e579e 100644 --- a/core/test-runtime/src/lib.rs +++ b/core/test-runtime/src/lib.rs @@ -33,7 +33,6 @@ extern crate sr_io as runtime_io; #[macro_use] extern crate sr_version as runtime_version; - #[cfg(test)] #[macro_use] extern crate hex_literal; diff --git a/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm b/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm index 76ec888e142a..75fc636a14c2 100644 Binary files a/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm and b/core/test-runtime/wasm/target/wasm32-unknown-unknown/release/substrate_test_runtime.compact.wasm differ diff --git a/node/runtime/Cargo.toml b/node/runtime/Cargo.toml index 1f6ccad0c31f..586592789366 100644 --- a/node/runtime/Cargo.toml +++ b/node/runtime/Cargo.toml @@ -14,7 +14,8 @@ parity-codec-derive = "2.1" sr-std = { path = "../../core/sr-std" } srml-support = { path = "../../srml/support" } substrate-primitives = { path = "../../core/primitives" } -substrate-client = { path = "../../core/client", optional = true } +substrate-fg-primitives = { path = "../../core/finality-grandpa/primitives" } +substrate-client = { path = "../../core/client" } substrate-keyring = { path = "../../core/keyring" } srml-balances = { path = "../../srml/balances" } srml-consensus = { path = "../../srml/consensus" } @@ -57,5 +58,6 @@ std = [ "serde_derive", "serde/std", "safe-mix/std", - "substrate-client", + "substrate-client/std", + "substrate-fg-primitives/std", ] diff --git a/node/runtime/src/lib.rs b/node/runtime/src/lib.rs index 84ffbb4ab082..dca769f4d45e 100644 --- a/node/runtime/src/lib.rs +++ b/node/runtime/src/lib.rs @@ -56,6 +56,7 @@ extern crate srml_upgrade_key as upgrade_key; #[macro_use] extern crate sr_version as version; extern crate node_primitives; +extern crate substrate_fg_primitives; #[cfg(feature = "std")] use codec::{Encode, Decode}; @@ -72,7 +73,7 @@ use client::runtime_api::ApiExt; use runtime_primitives::ApplyResult; use runtime_primitives::transaction_validity::TransactionValidity; use runtime_primitives::generic; -use runtime_primitives::traits::{Convert, BlakeTwo256, Block as BlockT}; +use runtime_primitives::traits::{Convert, BlakeTwo256, Block as BlockT, DigestFor, NumberFor}; #[cfg(feature = "std")] use runtime_primitives::traits::ApiRef; #[cfg(feature = "std")] @@ -84,6 +85,7 @@ use council::seats as council_seats; #[cfg(any(feature = "std", test))] use version::NativeVersion; use substrate_primitives::OpaqueMetadata; +use substrate_fg_primitives::{runtime::GrandpaApi, ScheduledChange}; #[cfg(any(feature = "std", test))] pub use runtime_primitives::BuildStorage; @@ -395,6 +397,19 @@ impl client::runtime_api::Metadata for ClientWithApi { } } +#[cfg(feature = "std")] +impl substrate_fg_primitives::GrandpaApi for ClientWithApi { + fn grandpa_pending_change(&self, at: &GBlockId, digest: &DigestFor) + -> Result>>, client::error::Error> { + self.call_api_at(at, "grandpa_pending_change", digest) + } + + fn grandpa_authorities(&self, at: &GBlockId) + -> Result, client::error::Error> { + self.call_api_at(at, "grandpa_authorities", &()) + } +} + impl_runtime_apis! { impl Core for Runtime { fn version() -> RuntimeVersion { @@ -447,4 +462,16 @@ impl_runtime_apis! { Executive::validate_transaction(tx) } } + + + impl GrandpaApi for ClientWithApi { + fn grandpa_pending_change(digest: DigestFor) + -> Option>> { + unimplemented!("Robert, where is the impl?") + } + + fn grandpa_authorities() -> Vec<(SessionKey, u64)> { + unimplemented!("Robert, where is the impl?") + } + } } diff --git a/node/runtime/wasm/Cargo.lock b/node/runtime/wasm/Cargo.lock index bb067bc40021..4eeae1fd9da8 100644 --- a/node/runtime/wasm/Cargo.lock +++ b/node/runtime/wasm/Cargo.lock @@ -526,6 +526,7 @@ dependencies = [ "srml-treasury 0.1.0", "srml-upgrade-key 0.1.0", "substrate-client 0.1.0", + "substrate-fg-primitives 0.1.0", "substrate-primitives 0.1.0", ] @@ -1272,6 +1273,18 @@ dependencies = [ "wasmi 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "substrate-fg-primitives" +version = "0.1.0" +dependencies = [ + "parity-codec 2.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec-derive 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-primitives 0.1.0", + "sr-std 0.1.0", + "substrate-client 0.1.0", + "substrate-primitives 0.1.0", +] + [[package]] name = "substrate-keyring" version = "0.1.0" diff --git a/node/runtime/wasm/Cargo.toml b/node/runtime/wasm/Cargo.toml index 9e74ff9d5d12..6e9cfd091079 100644 --- a/node/runtime/wasm/Cargo.toml +++ b/node/runtime/wasm/Cargo.toml @@ -12,6 +12,7 @@ safe-mix = { version = "1.0", default-features = false } parity-codec-derive = { version = "2.1" } parity-codec = { version = "2.1", default-features = false } substrate-primitives = { path = "../../../core/primitives", default-features = false } +substrate-fg-primitives = { path = "../../../core/finality-grandpa/primitives", default-features = false } substrate-client = { path = "../../../core/client", default-features = false } sr-std = { path = "../../../core/sr-std", default-features = false } srml-support = { path = "../../../srml/support", default-features = false } @@ -38,6 +39,7 @@ std = [ "parity-codec/std", "substrate-primitives/std", "substrate-client/std", + "substrate-fg-primitives/std", "sr-std/std", "srml-support/std", "srml-balances/std", diff --git a/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm b/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm index 766de4bda344..8a89da66f166 100644 Binary files a/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm and b/node/runtime/wasm/target/wasm32-unknown-unknown/release/node_runtime.compact.wasm differ