diff --git a/Cargo.lock b/Cargo.lock index 4d12fdad7590c..58d657332c491 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -843,6 +843,14 @@ name = "foreign-types-shared" version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "fork-tree" +version = "0.1.0" +dependencies = [ + "parity-codec 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", + "parity-codec-derive 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "fs-swap" version = "0.2.4" @@ -3851,6 +3859,7 @@ version = "0.1.0" dependencies = [ "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "finality-grandpa 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fork-tree 0.1.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.0.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3925,6 +3934,7 @@ dependencies = [ "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fork-tree 0.1.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "linked-hash-map 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 98e3a439d918d..e6e6921e15fd9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ members = [ "core/transaction-pool", "core/transaction-pool/graph", "core/inherents", + "core/util/fork-tree", "srml/support", "srml/support/procedural", "srml/support/procedural/tools", diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index 6f22aeaa5c62d..d94f950287ee5 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +fork-tree = { path = "../../core/util/fork-tree" } futures = "0.1" log = "0.4" parking_lot = "0.7.1" diff --git a/core/finality-grandpa/src/authorities.rs b/core/finality-grandpa/src/authorities.rs index 71793c45aa97b..d8849acc36ce0 100644 --- a/core/finality-grandpa/src/authorities.rs +++ b/core/finality-grandpa/src/authorities.rs @@ -16,6 +16,7 @@ //! Utilities for dealing with authorities, authority sets, and handoffs. +use fork_tree::ForkTree; use parking_lot::RwLock; use substrate_primitives::Ed25519AuthorityId; use grandpa::VoterSet; @@ -38,7 +39,10 @@ impl Clone for SharedAuthoritySet { } } -impl SharedAuthoritySet { +impl SharedAuthoritySet +where H: PartialEq, + N: Ord, +{ /// The genesis authority set. pub(crate) fn genesis(initial: Vec<(Ed25519AuthorityId, u64)>) -> Self { SharedAuthoritySet { @@ -53,9 +57,8 @@ impl SharedAuthoritySet { } impl SharedAuthoritySet -where - N: Add + Ord + Clone + Debug, - H: Debug +where N: Add + Ord + Clone + Debug, + H: Clone + Debug { /// Get the earliest limit-block number, if any. pub(crate) fn current_limit(&self) -> Option { @@ -80,6 +83,7 @@ impl From> for SharedAuthoritySet { } /// Status of the set after changes were applied. +#[derive(Debug)] pub(crate) struct Status { /// Whether internal changes were made. pub(crate) changed: bool, @@ -93,16 +97,19 @@ pub(crate) struct Status { pub(crate) struct AuthoritySet { current_authorities: Vec<(Ed25519AuthorityId, u64)>, set_id: u64, - pending_changes: Vec>, + pending_changes: ForkTree>, } -impl AuthoritySet { +impl AuthoritySet +where H: PartialEq, + N: Ord, +{ /// Get a genesis set with given authorities. pub(crate) fn genesis(initial: Vec<(Ed25519AuthorityId, u64)>) -> Self { AuthoritySet { current_authorities: initial, set_id: 0, - pending_changes: Vec::new(), + pending_changes: ForkTree::new(), } } @@ -115,138 +122,131 @@ impl AuthoritySet { impl AuthoritySet where N: Add + Ord + Clone + Debug, - H: Debug + H: Clone + Debug { - /// Note an upcoming pending transition. This makes sure that there isn't - /// already any pending change for the same chain. Multiple pending changes - /// are allowed but they must be signalled in different forks. The closure - /// should return an error if the pending change block is equal to or a - /// descendent of the given block. - pub(crate) fn add_pending_change( + /// Note an upcoming pending transition. Multiple pending changes on the + /// same branch can be added as long as they don't overlap. This method + /// assumes that changes on the same branch will be added in-order. The + /// given function `is_descendent_of` should return `true` if the second + /// hash (target) is a descendent of the first hash (base). + pub(crate) fn add_pending_change( &mut self, pending: PendingChange, - is_equal_or_descendent_of: F, - ) -> Result<(), E> where - F: Fn(&H) -> Result<(), E>, + is_descendent_of: &F, + ) -> Result<(), fork_tree::Error> where + F: Fn(&H, &H) -> Result, + E: std::error::Error, { - for change in self.pending_changes.iter() { - is_equal_or_descendent_of(&change.canon_hash)?; - } - - // ordered first by effective number and then by signal-block number. - let key = (pending.effective_number(), pending.canon_height.clone()); - let idx = self.pending_changes - .binary_search_by_key(&key, |change| ( - change.effective_number(), - change.canon_height.clone(), - )) - .unwrap_or_else(|i| i); - - debug!(target: "afg", "Inserting potential set change at block {:?}.", - (&pending.canon_height, &pending.canon_hash)); - - self.pending_changes.insert(idx, pending); - - debug!(target: "afg", "There are now {} pending changes.", self.pending_changes.len()); + let hash = pending.canon_hash.clone(); + let number = pending.canon_height.clone(); + + debug!(target: "afg", "Inserting potential set change signalled at block {:?} \ + (delayed by {:?} blocks).", + (&number, &hash), pending.finalization_depth); + + self.pending_changes.import( + hash.clone(), + number.clone(), + pending, + is_descendent_of, + )?; + + debug!(target: "afg", "There are now {} alternatives for the next pending change (roots), \ + and a total of {} pending changes (across all forks).", + self.pending_changes.roots().count(), + self.pending_changes.iter().count(), + ); Ok(()) } - /// Inspect pending changes. - pub(crate) fn pending_changes(&self) -> &[PendingChange] { - &self.pending_changes + /// Inspect pending changes. Pending changes in the tree are traversed in pre-order. + pub(crate) fn pending_changes(&self) -> impl Iterator> { + self.pending_changes.iter().map(|(_, _, c)| c) } - /// Get the earliest limit-block number, if any. + /// Get the earliest limit-block number, if any. If there are pending + /// changes across different forks, this method will return the earliest + /// effective number (across the different branches). pub(crate) fn current_limit(&self) -> Option { - self.pending_changes.get(0).map(|change| change.effective_number().clone()) + self.pending_changes.roots() + .min_by_key(|&(_, _, c)| c.effective_number()) + .map(|(_, _, c)| c.effective_number()) } - /// Apply or prune any pending transitions. Provide a closure that can be used to check for the - /// finalized block with given number. + /// Apply or prune any pending transitions. This method ensures that if + /// there are multiple changes in the same branch, finalizing this block + /// won't finalize past multiple transitions (i.e. transitions must be + /// finalized in-order). The given function `is_descendent_of` should return + /// `true` if the second hash (target) is a descendent of the first hash + /// (base). /// - /// When the set has changed, the return value will be `Ok(Some((H, N)))` which is the canonical - /// block where the set last changed. - pub(crate) fn apply_changes(&mut self, just_finalized: N, mut canonical: F) - -> Result, E> - where F: FnMut(N) -> Result, E> + /// When the set has changed, the return value will be `Ok(Some((H, N)))` + /// which is the canonical block where the set last changed (i.e. the given + /// hash and number). + pub(crate) fn apply_changes( + &mut self, + finalized_hash: H, + finalized_number: N, + is_descendent_of: &F, + ) -> Result, fork_tree::Error> + where F: Fn(&H, &H) -> Result, + E: std::error::Error, { let mut status = Status { changed: false, new_set_block: None, }; - loop { - let remove_up_to = match self.pending_changes.first() { - None => break, - Some(change) => { - let effective_number = change.effective_number(); - if effective_number > just_finalized { break } - - // check if the block that signalled the change is canonical in - // our chain. - let canonical_hash = canonical(change.canon_height.clone())?; - let effective_hash = canonical(effective_number.clone())?; - - debug!(target: "afg", "Evaluating potential set change at block {:?}. Our canonical hash is {:?}", - (&change.canon_height, &change.canon_hash), canonical_hash); - - match (canonical_hash, effective_hash) { - (Some(canonical_hash), Some(effective_hash)) => { - if canonical_hash == change.canon_hash { - // apply this change: make the set canonical - info!(target: "finality", "Applying authority set change scheduled at block #{:?}", - change.canon_height); - - self.current_authorities = change.next_authorities.clone(); - self.set_id += 1; - - status.new_set_block = Some(( - effective_hash, - effective_number.clone(), - )); - - // discard all signalled changes since they're - // necessarily from other forks - self.pending_changes.len() - } else { - 1 // prune out this entry; it's no longer relevant. - } - }, - _ => 1, // prune out this entry; it's no longer relevant. - } - } - }; - let remove_up_to = ::std::cmp::min(remove_up_to, self.pending_changes.len()); - self.pending_changes.drain(..remove_up_to); - status.changed = true; // always changed because we strip at least the first change. + match self.pending_changes.finalize_with_descendent_if( + &finalized_hash, + finalized_number.clone(), + is_descendent_of, + |change| change.effective_number() <= finalized_number + )? { + fork_tree::FinalizationResult::Changed(change) => { + status.changed = true; + + if let Some(change) = change { + info!(target: "finality", "Applying authority set change scheduled at block #{:?}", + change.canon_height); + + self.current_authorities = change.next_authorities; + self.set_id += 1; + + status.new_set_block = Some(( + finalized_hash, + finalized_number, + )); + } + }, + fork_tree::FinalizationResult::Unchanged => {}, } Ok(status) } /// Check whether the given finalized block number enacts any authority set - /// change (without triggering it). Provide a closure that can be used to - /// check for the canonical block with a given number. - pub fn enacts_change(&self, just_finalized: N, mut canonical: F) - -> Result - where F: FnMut(N) -> Result, E> + /// change (without triggering it), ensuring that if there are multiple + /// changes in the same branch, finalizing this block won't finalize past + /// multiple transitions (i.e. transitions must be finalized in-order). The + /// given function `is_descendent_of` should return `true` if the second + /// hash (target) is a descendent of the first hash (base). + pub fn enacts_change( + &self, + finalized_hash: H, + finalized_number: N, + is_descendent_of: &F, + ) -> Result> + where F: Fn(&H, &H) -> Result, + E: std::error::Error, { - for change in self.pending_changes.iter() { - if change.effective_number() > just_finalized { break }; - - if change.effective_number() == just_finalized { - // check if the block that signalled the change is canonical in - // our chain. - match canonical(change.canon_height.clone())? { - Some(ref canonical_hash) if *canonical_hash == change.canon_hash => - return Ok(true), - _ => (), - } - } - } - - Ok(false) + self.pending_changes.finalizes_any_with_descendent_if( + &finalized_hash, + finalized_number.clone(), + is_descendent_of, + |change| change.effective_number() == finalized_number + ) } } @@ -278,16 +278,24 @@ impl + Clone> PendingChange { mod tests { use super::*; - fn ignore_existing_changes(_a: &A) -> Result<(), crate::Error> { - Ok(()) + fn static_is_descendent_of(value: bool) + -> impl Fn(&A, &A) -> Result + { + move |_, _| Ok(value) + } + + fn is_descendent_of(f: F) -> impl Fn(&A, &A) -> Result + where F: Fn(&A, &A) -> bool + { + move |base, hash| Ok(f(base, hash)) } #[test] - fn changes_sorted_in_correct_order() { + fn changes_iterated_in_pre_order() { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: Vec::new(), + pending_changes: ForkTree::new(), }; let change_a = PendingChange { @@ -300,7 +308,7 @@ mod tests { let change_b = PendingChange { next_authorities: Vec::new(), finalization_depth: 0, - canon_height: 16, + canon_height: 5, canon_hash: "hash_b", }; @@ -311,11 +319,18 @@ mod tests { canon_hash: "hash_c", }; - authorities.add_pending_change(change_a.clone(), ignore_existing_changes).unwrap(); - authorities.add_pending_change(change_b.clone(), ignore_existing_changes).unwrap(); - authorities.add_pending_change(change_c.clone(), ignore_existing_changes).unwrap(); - - assert_eq!(authorities.pending_changes, vec![change_a, change_c, change_b]); + authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(false)).unwrap(); + authorities.add_pending_change(change_b.clone(), &static_is_descendent_of(false)).unwrap(); + authorities.add_pending_change(change_c.clone(), &is_descendent_of(|base, hash| match (*base, *hash) { + ("hash_a", "hash_c") => true, + ("hash_b", "hash_c") => false, + _ => unreachable!(), + })).unwrap(); + + assert_eq!( + authorities.pending_changes().collect::>(), + vec![&change_b, &change_a, &change_c], + ); } #[test] @@ -323,12 +338,13 @@ mod tests { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: Vec::new(), + pending_changes: ForkTree::new(), }; let set_a = vec![([1; 32].into(), 5)]; let set_b = vec![([2; 32].into(), 5)]; + // two competing changes at the same height on different forks let change_a = PendingChange { next_authorities: set_a.clone(), finalization_depth: 10, @@ -343,35 +359,54 @@ mod tests { canon_hash: "hash_b", }; - authorities.add_pending_change(change_a.clone(), ignore_existing_changes).unwrap(); - authorities.add_pending_change(change_b.clone(), ignore_existing_changes).unwrap(); + authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(true)).unwrap(); + authorities.add_pending_change(change_b.clone(), &static_is_descendent_of(true)).unwrap(); - authorities.apply_changes(10, |_| Err(())).unwrap(); - assert!(authorities.current_authorities.is_empty()); + assert_eq!( + authorities.pending_changes().collect::>(), + vec![&change_b, &change_a], + ); + + // finalizing "hash_c" won't enact the change signalled at "hash_a" but it will prune out "hash_b" + let status = authorities.apply_changes("hash_c", 11, &is_descendent_of(|base, hash| match (*base, *hash) { + ("hash_a", "hash_c") => true, + ("hash_b", "hash_c") => false, + _ => unreachable!(), + })).unwrap(); + + assert!(status.changed); + assert_eq!(status.new_set_block, None); + assert_eq!( + authorities.pending_changes().collect::>(), + vec![&change_a], + ); - authorities.apply_changes(15, |n| match n { - 5 => Ok(Some("hash_a")), - 15 => Ok(Some("hash_15_canon")), - _ => Err(()), - }).unwrap(); + // finalizing "hash_d" will enact the change signalled at "hash_a" + let status = authorities.apply_changes("hash_d", 15, &is_descendent_of(|base, hash| match (*base, *hash) { + ("hash_a", "hash_d") => true, + _ => unreachable!(), + })).unwrap(); + + assert!(status.changed); + assert_eq!(status.new_set_block, Some(("hash_d", 15))); assert_eq!(authorities.current_authorities, set_a); assert_eq!(authorities.set_id, 1); - assert!(authorities.pending_changes.is_empty()); + assert_eq!(authorities.pending_changes().count(), 0); } #[test] - fn disallow_multiple_changes_on_same_fork() { + fn disallow_multiple_changes_being_finalized_at_once() { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: Vec::new(), + pending_changes: ForkTree::new(), }; let set_a = vec![([1; 32].into(), 5)]; - let set_b = vec![([2; 32].into(), 5)]; - let set_c = vec![([3; 32].into(), 5)]; + let set_c = vec![([2; 32].into(), 5)]; + // two competing changes at the same height on different forks let change_a = PendingChange { next_authorities: set_a.clone(), finalization_depth: 10, @@ -379,65 +414,48 @@ mod tests { canon_hash: "hash_a", }; - let change_b = PendingChange { - next_authorities: set_b.clone(), - finalization_depth: 10, - canon_height: 16, - canon_hash: "hash_b", - }; - let change_c = PendingChange { next_authorities: set_c.clone(), finalization_depth: 10, - canon_height: 16, + canon_height: 30, canon_hash: "hash_c", }; - let is_equal_or_descendent_of = |base, block| -> Result<(), ()> { - match (base, block) { - ("hash_a", "hash_b") => return Err(()), - ("hash_a", "hash_c") => return Ok(()), - ("hash_c", "hash_b") => return Ok(()), - _ => unreachable!(), - } - }; + authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(true)).unwrap(); + authorities.add_pending_change(change_c.clone(), &static_is_descendent_of(true)).unwrap(); - authorities.add_pending_change( - change_a.clone(), - |base| is_equal_or_descendent_of(base, change_a.canon_hash), - ).unwrap(); - - // change b is on the same chain has the unfinalized change a so it should error - assert!( - authorities.add_pending_change( - change_b.clone(), - |base| is_equal_or_descendent_of(base, change_b.canon_hash), - ).is_err() - ); + let is_descendent_of = is_descendent_of(|base, hash| match (*base, *hash) { + ("hash_a", "hash_b") => true, + ("hash_a", "hash_c") => true, + ("hash_a", "hash_d") => true, - // change c is accepted because it's on a different fork - authorities.add_pending_change( - change_c.clone(), - |base| is_equal_or_descendent_of(base, change_c.canon_hash) - ).unwrap(); + ("hash_c", "hash_b") => false, + ("hash_c", "hash_d") => true, - authorities.apply_changes(15, |n| match n { - 5 => Ok(Some("hash_a")), - 15 => Ok(Some("hash_a15")), - _ => Err(()), - }).unwrap(); + ("hash_b", "hash_c") => true, + _ => unreachable!(), + }); + + // trying to finalize past `change_c` without finalizing `change_a` first + match authorities.apply_changes("hash_d", 40, &is_descendent_of) { + Err(fork_tree::Error::UnfinalizedAncestor) => {}, + _ => unreachable!(), + } + + let status = authorities.apply_changes("hash_b", 15, &is_descendent_of).unwrap(); + assert!(status.changed); + assert_eq!(status.new_set_block, Some(("hash_b", 15))); assert_eq!(authorities.current_authorities, set_a); + assert_eq!(authorities.set_id, 1); - // pending change c has been removed since it was on a different fork - // and can no longer be enacted - assert!(authorities.pending_changes.is_empty()); + // after finalizing `change_a` it should be possible to finalize `change_c` + let status = authorities.apply_changes("hash_d", 40, &is_descendent_of).unwrap(); + assert!(status.changed); + assert_eq!(status.new_set_block, Some(("hash_d", 40))); - // pending change b can now be added - authorities.add_pending_change( - change_b.clone(), - |base| is_equal_or_descendent_of(base, change_b.canon_hash), - ).unwrap(); + assert_eq!(authorities.current_authorities, set_c); + assert_eq!(authorities.set_id, 2); } #[test] @@ -445,7 +463,7 @@ mod tests { let mut authorities = AuthoritySet { current_authorities: Vec::new(), set_id: 0, - pending_changes: Vec::new(), + pending_changes: ForkTree::new(), }; let set_a = vec![([1; 32].into(), 5)]; @@ -457,19 +475,21 @@ mod tests { canon_hash: "hash_a", }; - authorities.add_pending_change(change_a.clone(), |_| Err(())).unwrap(); + authorities.add_pending_change(change_a.clone(), &static_is_descendent_of(false)).unwrap(); - let canonical = |n| match n { - 5 => Ok(Some("hash_a")), - 15 => Ok(Some("hash_a15")), - _ => Err(()), - }; + let is_descendent_of = is_descendent_of(|base, hash| match (*base, *hash) { + ("hash_a", "hash_b") => true, + ("hash_a", "hash_c") => false, + _ => unreachable!(), + }); + + // "hash_c" won't finalize the existing change since it isn't a descendent + assert!(!authorities.enacts_change("hash_c", 15, &is_descendent_of).unwrap()); - // there's an effective change triggered at block 15 - assert!(authorities.enacts_change(15, canonical).unwrap()); + // "hash_b" at depth 14 won't work either + assert!(!authorities.enacts_change("hash_b", 14, &is_descendent_of).unwrap()); - // block 16 is already past the change, we assume 15 will be finalized - // first and enact the change - assert!(!authorities.enacts_change(16, canonical).unwrap()); + // but it should work at depth 15 (change height + depth) + assert!(authorities.enacts_change("hash_b", 15, &is_descendent_of).unwrap()); } } diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index eab2311be6246..d210f0fa8b01b 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -368,6 +368,7 @@ impl From> for JustificationOrCommit< /// Finalize the given block and apply any authority set changes. If an /// authority set change is enacted then a justification is created (if not /// given) and stored with the block when finalizing it. +/// This method assumes that the block being finalized has already been imported. pub(crate) fn finalize_block, E, RA>( client: &Client, authority_set: &SharedAuthoritySet>, @@ -391,9 +392,11 @@ pub(crate) fn finalize_block, E, RA>( let mut old_last_completed = None; let mut consensus_changes = consensus_changes.lock(); - let status = authority_set.apply_changes(number, |canon_number| { - canonical_at_height(client, (hash, number), canon_number) - })?; + let status = authority_set.apply_changes( + hash, + number, + &is_descendent_of(client, None), + ).map_err(|e| Error::Safety(e.to_string()))?; if status.changed { // write new authority set state to disk. @@ -599,3 +602,41 @@ pub(crate) fn canonical_at_height, RA>( Ok(Some(current.hash())) } + +/// Returns a function for checking block ancestry, the returned function will +/// return `true` if the given hash (second parameter) is a descendent of the +/// base (first parameter). If the `current` parameter is defined, it should +/// represent the current block `hash` and its `parent hash`, if given the +/// function that's returned will assume that `hash` isn't part of the local DB +/// yet, and all searches in the DB will instead reference the parent. +pub fn is_descendent_of<'a, B, E, Block: BlockT, RA>( + client: &'a Client, + current: Option<(&'a H256, &'a H256)>, +) -> impl Fn(&H256, &H256) -> Result + 'a +where B: Backend, + E: CallExecutor + Send + Sync, +{ + move |base, hash| { + if base == hash { return Ok(false); } + + let mut hash = hash; + if let Some((current_hash, current_parent_hash)) = current { + if base == current_hash { return Ok(false); } + if hash == current_hash { + if base == current_parent_hash { + return Ok(true); + } else { + hash = current_parent_hash; + } + } + } + + let tree_route = client::blockchain::tree_route( + client.backend().blockchain(), + BlockId::Hash(*hash), + BlockId::Hash(*base), + )?; + + Ok(tree_route.common_block().hash == *base) + } +} diff --git a/core/finality-grandpa/src/import.rs b/core/finality-grandpa/src/import.rs index 003e27033a762..11306f72195c5 100644 --- a/core/finality-grandpa/src/import.rs +++ b/core/finality-grandpa/src/import.rs @@ -38,7 +38,7 @@ use substrate_primitives::{H256, Ed25519AuthorityId, Blake2Hasher}; use crate::{AUTHORITY_SET_KEY, Error}; use crate::authorities::SharedAuthoritySet; use crate::consensus_changes::SharedConsensusChanges; -use crate::environment::{canonical_at_height, finalize_block, ExitOrError, NewAuthoritySet}; +use crate::environment::{finalize_block, is_descendent_of, ExitOrError, NewAuthoritySet}; use crate::justification::GrandpaJustification; /// A block-import handler for GRANDPA. @@ -78,7 +78,8 @@ impl, RA, PRA> JustificationImport }; // request justifications for all pending changes for which change blocks have already been imported - for pending_change in self.authority_set.inner().read().pending_changes() { + let authorities = self.authority_set.inner().read(); + for pending_change in authorities.pending_changes() { if pending_change.effective_number() > chain_info.finalized_number && pending_change.effective_number() <= chain_info.best_number { @@ -128,7 +129,8 @@ impl, RA, PRA> BlockImport use client::blockchain::HeaderBackend; let hash = block.post_header().hash(); - let number = block.header.number().clone(); + let parent_hash = *block.header.parent_hash(); + let number = *block.header.number(); // early exit if block already in chain, otherwise the check for // authority changes will error when trying to re-import a change block @@ -139,7 +141,7 @@ impl, RA, PRA> BlockImport } let maybe_change = self.api.runtime_api().grandpa_pending_change( - &BlockId::hash(*block.header.parent_hash()), + &BlockId::hash(parent_hash), &block.header.digest().clone(), ); @@ -151,39 +153,11 @@ impl, RA, PRA> BlockImport // when we update the authorities, we need to hold the lock // until the block is written to prevent a race if we need to restore // the old authority set on error. + let is_descendent_of = is_descendent_of(&self.inner, Some((&hash, &parent_hash))); let just_in_case = if let Some(change) = maybe_change { - let parent_hash = *block.header.parent_hash(); - let mut authorities = self.authority_set.inner().write(); let old_set = authorities.clone(); - let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ConsensusError> { - let error = || { - debug!(target: "afg", "rejecting change: {} is in the same chain as {}", hash, base); - Err(ConsensusErrorKind::ClientImport("Incorrect base hash".to_string()).into()) - }; - - if *base == hash { return error(); } - if *base == parent_hash { return error(); } - - let tree_route = blockchain::tree_route( - self.inner.backend().blockchain(), - BlockId::Hash(parent_hash), - BlockId::Hash(*base), - ); - - let tree_route = match tree_route { - Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()), - Ok(route) => route, - }; - - if tree_route.common_block().hash == *base { - return error(); - } - - Ok(()) - }; - authorities.add_pending_change( PendingChange { next_authorities: change.next_authorities, @@ -191,8 +165,8 @@ impl, RA, PRA> BlockImport canon_height: number, canon_hash: hash, }, - is_equal_or_descendent_of, - )?; + &is_descendent_of, + ).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?; block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode()))); Some((old_set, authorities)) @@ -226,9 +200,11 @@ impl, RA, PRA> BlockImport } }; - let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| { - canonical_at_height(&self.inner, (hash, number), canon_number) - }).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?; + let enacts_change = self.authority_set.inner().read().enacts_change( + hash, + number, + &is_descendent_of, + ).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?; if !enacts_change && !enacts_consensus_change { return Ok(import_result); @@ -340,6 +316,7 @@ impl, RA, PRA> Error::Network(error) => ConsensusErrorKind::ClientImport(error), Error::Blockchain(error) => ConsensusErrorKind::ClientImport(error), Error::Client(error) => ConsensusErrorKind::ClientImport(error.to_string()), + Error::Safety(error) => ConsensusErrorKind::ClientImport(error), Error::Timer(error) => ConsensusErrorKind::ClientImport(error.to_string()), }.into()); }, diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index 45e363bc033b7..5400ae0dd017d 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -45,12 +45,12 @@ //! logic is complex to compute because it requires looking arbitrarily far //! back in the chain. //! -//! Instead, we keep track of a list of all signals we've seen so far, -//! sorted ascending by the block number they would be applied at. We never vote -//! on chains with number higher than the earliest handoff block number -//! (this is num(signal) + N). When finalizing a block, we either apply or prune -//! any signaled changes based on whether the signaling block is included in the -//! newly-finalized chain. +//! Instead, we keep track of a list of all signals we've seen so far (across +//! all forks), sorted ascending by the block number they would be applied at. +//! We never vote on chains with number higher than the earliest handoff block +//! number (this is num(signal) + N). When finalizing a block, we either apply +//! or prune any signaled changes based on whether the signaling block is +//! included in the newly-finalized chain. use futures::prelude::*; use log::{debug, info, warn}; @@ -169,6 +169,8 @@ pub enum Error { Blockchain(String), /// Could not complete a round on disk. Client(ClientError), + /// An invariant has been violated (e.g. not finalizing pending change blocks in-order) + Safety(String), /// A timer failed to fire. Timer(::tokio::timer::Error), } diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 8e10d9c9a9826..ac64b7d24899d 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -416,7 +416,12 @@ fn run_to_completion(blocks: u64, net: Arc>, peers: &[Keyr .map_err(|_| ()); let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) - .for_each(move |_| { net.lock().route_fast(); Ok(()) }) + .for_each(move |_| { + net.lock().send_import_notifications(); + net.lock().send_finality_notifications(); + net.lock().route_fast(); + Ok(()) + }) .map(|_| ()) .map_err(|_| ()); @@ -550,7 +555,7 @@ fn transition_3_voters_twice_1_observer() { let set = AuthoritySet::::decode(&mut &set_raw[..]).unwrap(); assert_eq!(set.current(), (0, make_ids(peers_a).as_slice())); - assert_eq!(set.pending_changes().len(), 0); + assert_eq!(set.pending_changes().count(), 0); } { @@ -636,7 +641,7 @@ fn transition_3_voters_twice_1_observer() { let set = AuthoritySet::::decode(&mut &set_raw[..]).unwrap(); assert_eq!(set.current(), (2, make_ids(peers_c).as_slice())); - assert!(set.pending_changes().is_empty()); + assert_eq!(set.pending_changes().count(), 0); }) ); let voter = run_grandpa( @@ -771,12 +776,71 @@ fn sync_justifications_on_change_blocks() { } // the last peer should get the justification by syncing from other peers - assert!(net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none()); while net.lock().peer(3).client().justification(&BlockId::Number(21)).unwrap().is_none() { net.lock().route_fast(); } } +#[test] +fn finalizes_multiple_pending_changes_in_order() { + env_logger::init(); + + let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; + let peers_b = &[Keyring::Dave, Keyring::Eve, Keyring::Ferdie]; + let peers_c = &[Keyring::Dave, Keyring::Alice, Keyring::Bob]; + + let all_peers = &[ + Keyring::Alice, Keyring::Bob, Keyring::Charlie, + Keyring::Dave, Keyring::Eve, Keyring::Ferdie, + ]; + let genesis_voters = make_ids(peers_a); + + // 6 peers, 3 of them are authorities and participate in grandpa from genesis + let api = TestApi::new(genesis_voters); + let transitions = api.scheduled_changes.clone(); + let mut net = GrandpaTestNet::new(api, 6); + + // add 20 blocks + net.peer(0).push_blocks(20, false); + + // at block 21 we do add a transition which is instant + net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { + let block = builder.bake().unwrap(); + transitions.lock().insert(*block.header.parent_hash(), ScheduledChange { + next_authorities: make_ids(peers_b), + delay: 0, + }); + block + }); + + // add more blocks on top of it (until we have 25) + net.peer(0).push_blocks(4, false); + + // at block 26 we add another which is enacted at block 30 + net.peer(0).generate_blocks(1, BlockOrigin::File, |builder| { + let block = builder.bake().unwrap(); + transitions.lock().insert(*block.header.parent_hash(), ScheduledChange { + next_authorities: make_ids(peers_c), + delay: 4, + }); + block + }); + + // add more blocks on top of it (until we have 30) + net.peer(0).push_blocks(4, false); + + net.sync(); + + // all peers imported both change blocks + for i in 0..6 { + assert_eq!(net.peer(i).client().info().unwrap().chain.best_number, 30, + "Peer #{} failed to sync", i); + } + + let net = Arc::new(Mutex::new(net)); + run_to_completion(30, net.clone(), all_peers); +} + #[test] fn doesnt_vote_on_the_tip_of_the_chain() { let peers_a = &[Keyring::Alice, Keyring::Bob, Keyring::Charlie]; diff --git a/core/network/Cargo.toml b/core/network/Cargo.toml index 108e0c8b86828..c760af92dbdc8 100644 --- a/core/network/Cargo.toml +++ b/core/network/Cargo.toml @@ -19,6 +19,7 @@ linked-hash-map = "0.5" lru-cache = "0.1.1" rustc-hex = "2.0" rand = "0.6" +fork-tree = { path = "../../core/util/fork-tree" } primitives = { package = "substrate-primitives", path = "../../core/primitives" } consensus = { package = "substrate-consensus-common", path = "../../core/consensus/common" } client = { package = "substrate-client", path = "../../core/client" } diff --git a/core/network/src/chain.rs b/core/network/src/chain.rs index 40edbfbadb9fe..51057df4dc9b5 100644 --- a/core/network/src/chain.rs +++ b/core/network/src/chain.rs @@ -68,6 +68,9 @@ pub trait Client: Send + Sync { max: Block::Hash, key: &StorageKey ) -> Result, Error>; + + /// Returns `true` if the given `block` is a descendent of `base`. + fn is_descendent_of(&self, base: &Block::Hash, block: &Block::Hash) -> Result; } impl Client for SubstrateClient where @@ -129,4 +132,18 @@ impl Client for SubstrateClient where ) -> Result, Error> { (self as &SubstrateClient).key_changes_proof(first, last, min, max, key) } + + fn is_descendent_of(&self, base: &Block::Hash, block: &Block::Hash) -> Result { + if base == block { + return Ok(false); + } + + let tree_route = ::client::blockchain::tree_route( + self.backend().blockchain(), + BlockId::Hash(*block), + BlockId::Hash(*base), + )?; + + Ok(tree_route.common_block().hash == *base) + } } diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index c7e5dd9d2068c..2da465cce2d76 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -955,7 +955,11 @@ impl, H: ExHashT> Protocol { } fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { - self.sync.block_finalized(&hash, *header.number()); + self.sync.on_block_finalized( + &hash, + *header.number(), + &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), + ); } fn on_remote_call_request( diff --git a/core/network/src/sync.rs b/core/network/src/sync.rs index 213b1b65867b3..536cf304e7964 100644 --- a/core/network/src/sync.rs +++ b/core/network/src/sync.rs @@ -14,10 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::time::{Duration, Instant}; -use log::{trace, debug}; +use log::{debug, trace, warn}; use crate::protocol::Context; +use fork_tree::ForkTree; use network_libp2p::{Severity, NodeIndex}; use client::{BlockStatus, ClientInfo}; use consensus::BlockOrigin; @@ -65,9 +66,13 @@ enum PeerSyncState { /// Pending justification request for the given block (hash and number). type PendingJustification = (::Hash, NumberFor); -/// Manages pending block justification requests. +/// Manages pending block justification requests. Multiple justifications may be +/// requested for competing forks, or for the same branch at different +/// (increasing) heights. This structure will guarantee that justifications are +/// fetched in-order, and that obsolete changes are pruned (when finalizing a +/// competing fork). struct PendingJustifications { - justifications: HashSet>, + justifications: ForkTree, ()>, pending_requests: VecDeque>, peer_requests: HashMap>, previous_requests: HashMap, Vec<(NodeIndex, Instant)>>, @@ -76,7 +81,7 @@ struct PendingJustifications { impl PendingJustifications { fn new() -> PendingJustifications { PendingJustifications { - justifications: HashSet::new(), + justifications: ForkTree::new(), pending_requests: VecDeque::new(), peer_requests: HashMap::new(), previous_requests: HashMap::new(), @@ -183,11 +188,26 @@ impl PendingJustifications { } /// Queue a justification request (without dispatching it). - fn queue_request(&mut self, justification: &PendingJustification) { - if !self.justifications.insert(*justification) { - return; - } - self.pending_requests.push_back(*justification); + fn queue_request( + &mut self, + justification: &PendingJustification, + is_descendent_of: F, + ) where F: Fn(&B::Hash, &B::Hash) -> Result { + match self.justifications.import(justification.0.clone(), justification.1.clone(), (), &is_descendent_of) { + Ok(true) => { + // this is a new root so we add it to the current `pending_requests` + self.pending_requests.push_back((justification.0, justification.1)); + }, + Err(err) => { + warn!(target: "sync", "Failed to insert requested justification {:?} {:?} into tree: {:?}", + justification.0, + justification.1, + err, + ); + return; + }, + _ => {}, + }; } /// Retry any pending request if a peer disconnected. @@ -202,8 +222,20 @@ impl PendingJustifications { fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { let request = (hash, number); if success { - self.justifications.remove(&request); - self.previous_requests.remove(&request); + if self.justifications.finalize_root(&request.0).is_none() { + warn!(target: "sync", "Imported justification for {:?} {:?} which isn't a root in the tree: {:?}", + request.0, + request.1, + self.justifications.roots().collect::>(), + ); + return; + }; + + self.previous_requests.clear(); + self.peer_requests.clear(); + self.pending_requests = + self.justifications.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect(); + return; } self.pending_requests.push_front(request); @@ -226,6 +258,7 @@ impl PendingJustifications { import_queue.import_justification(who.clone(), request.0, request.1, justification); return } + self.previous_requests .entry(request) .or_insert(Vec::new()) @@ -236,11 +269,25 @@ impl PendingJustifications { /// Removes any pending justification requests for blocks lower than the /// given best finalized. - fn collect_garbage(&mut self, best_finalized: NumberFor) { - self.justifications.retain(|(_, n)| *n > best_finalized); - self.pending_requests.retain(|(_, n)| *n > best_finalized); - self.peer_requests.retain(|_, (_, n)| *n > best_finalized); - self.previous_requests.retain(|(_, n), _| *n > best_finalized); + fn on_block_finalized( + &mut self, + best_finalized_hash: &B::Hash, + best_finalized_number: NumberFor, + is_descendent_of: F, + ) -> Result<(), fork_tree::Error> + where F: Fn(&B::Hash, &B::Hash) -> Result + { + use std::collections::HashSet; + + self.justifications.finalize(best_finalized_hash, best_finalized_number, &is_descendent_of)?; + + let roots = self.justifications.roots().collect::>(); + + self.pending_requests.retain(|(h, n)| roots.contains(&(h, n, &()))); + self.peer_requests.retain(|_, (h, n)| roots.contains(&(h, n, &()))); + self.previous_requests.retain(|(h, n), _| roots.contains(&(h, n, &()))); + + Ok(()) } } @@ -579,7 +626,11 @@ impl ChainSync { /// /// Queues a new justification request and tries to dispatch all pending requests. pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut Context) { - self.justifications.queue_request(&(*hash, number)); + self.justifications.queue_request( + &(*hash, number), + |base, block| protocol.client().is_descendent_of(base, block), + ); + self.justifications.dispatch(&mut self.peers, protocol); } @@ -598,8 +649,14 @@ impl ChainSync { } /// Notify about finalization of the given block. - pub fn block_finalized(&mut self, _hash: &B::Hash, number: NumberFor) { - self.justifications.collect_garbage(number); + pub fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor, protocol: &mut Context) { + if let Err(err) = self.justifications.on_block_finalized( + hash, + number, + |base, block| protocol.client().is_descendent_of(base, block), + ) { + warn!(target: "sync", "Error cleaning up pending justification requests: {:?}", err); + }; } fn block_queued(&mut self, hash: &B::Hash, number: NumberFor) { diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index ade77ee51d03d..71cead8da9bfd 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -124,6 +124,8 @@ pub struct Peer { pub import_queue: Box>, network_sender: NetworkChan, pub data: D, + best_hash: Mutex>, + finalized_hash: Mutex>, } impl Peer { @@ -143,6 +145,8 @@ impl Peer { network_sender, network_port, data, + best_hash: Mutex::new(None), + finalized_hash: Mutex::new(None), } } /// Called after blockchain has been populated to updated current state. @@ -222,19 +226,39 @@ impl Peer { /// Send block import notifications. fn send_import_notifications(&self) { let info = self.client.info().expect("In-mem client does not fail"); + + let mut best_hash = self.best_hash.lock(); + match *best_hash { + None => {}, + Some(hash) if hash != info.chain.best_hash => {}, + _ => return, + } + let header = self.client.header(&BlockId::Hash(info.chain.best_hash)).unwrap().unwrap(); let _ = self .protocol_sender .send(ProtocolMsg::BlockImported(info.chain.best_hash, header)); + + *best_hash = Some(info.chain.best_hash); } /// Send block finalization notifications. pub fn send_finality_notifications(&self) { let info = self.client.info().expect("In-mem client does not fail"); + + let mut finalized_hash = self.finalized_hash.lock(); + match *finalized_hash { + None => {}, + Some(hash) if hash != info.chain.finalized_hash => {}, + _ => return, + } + let header = self.client.header(&BlockId::Hash(info.chain.finalized_hash)).unwrap().unwrap(); let _ = self .protocol_sender .send(ProtocolMsg::BlockFinalized(info.chain.finalized_hash, header.clone())); + + *finalized_hash = Some(info.chain.finalized_hash); } /// Restart sync for a peer. @@ -296,7 +320,7 @@ impl Peer { } /// Add blocks to the peer -- edit the block before adding - pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, edit_block: F) + pub fn generate_blocks(&self, count: usize, origin: BlockOrigin, edit_block: F) -> H256 where F: FnMut(BlockBuilder) -> Block { let best_hash = self.client.info().unwrap().chain.best_hash; @@ -305,11 +329,12 @@ impl Peer { /// Add blocks to the peer -- edit the block before adding. The chain will /// start at the given block iD. - pub fn generate_blocks_at(&self, mut at: BlockId, count: usize, origin: BlockOrigin, mut edit_block: F) + pub fn generate_blocks_at(&self, at: BlockId, count: usize, origin: BlockOrigin, mut edit_block: F) -> H256 where F: FnMut(BlockBuilder) -> Block { + let mut at = self.client.header(&at).unwrap().unwrap().hash(); for _ in 0..count { - let builder = self.client.new_block_at(&at).unwrap(); + let builder = self.client.new_block_at(&BlockId::Hash(at)).unwrap(); let block = edit_block(builder); let hash = block.header.hash(); trace!( @@ -319,7 +344,7 @@ impl Peer { block.header.parent_hash ); let header = block.header.clone(); - at = BlockId::Hash(hash); + at = hash; self.import_queue.import_blocks( origin, @@ -336,17 +361,18 @@ impl Peer { thread::sleep(Duration::from_millis(20)); } } + at } /// Push blocks to the peer (simplified: with or without a TX) - pub fn push_blocks(&self, count: usize, with_tx: bool) { + pub fn push_blocks(&self, count: usize, with_tx: bool) -> H256 { let best_hash = self.client.info().unwrap().chain.best_hash; - self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx); + self.push_blocks_at(BlockId::Hash(best_hash), count, with_tx) } /// Push blocks to the peer (simplified: with or without a TX) starting from /// given hash. - pub fn push_blocks_at(&self, at: BlockId, count: usize, with_tx: bool) { + pub fn push_blocks_at(&self, at: BlockId, count: usize, with_tx: bool) -> H256 { let mut nonce = 0; if with_tx { self.generate_blocks_at(at, count, BlockOrigin::File, |mut builder| { @@ -360,17 +386,17 @@ impl Peer { builder.push(Extrinsic::Transfer(transfer, signature)).unwrap(); nonce = nonce + 1; builder.bake().unwrap() - }); + }) } else { - self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap()); + self.generate_blocks_at(at, count, BlockOrigin::File, |builder| builder.bake().unwrap()) } } - pub fn push_authorities_change_block(&self, new_authorities: Vec) { + pub fn push_authorities_change_block(&self, new_authorities: Vec) -> H256 { self.generate_blocks(1, BlockOrigin::File, |mut builder| { builder.push(Extrinsic::AuthoritiesChange(new_authorities.clone())).unwrap(); builder.bake().unwrap() - }); + }) } /// Get a reference to the client. @@ -571,7 +597,7 @@ pub trait TestNetFactory: Sized { let mut done = 0; loop { - if done > 10 { break; } + if done > 3 { break; } if self.done() { done += 1; } else { diff --git a/core/network/src/test/sync.rs b/core/network/src/test/sync.rs index 64d74b002fe51..ecea0494fcd40 100644 --- a/core/network/src/test/sync.rs +++ b/core/network/src/test/sync.rs @@ -80,11 +80,46 @@ fn sync_justifications() { assert_eq!(net.peer(0).client().justification(&BlockId::Number(10)).unwrap(), None); assert_eq!(net.peer(1).client().justification(&BlockId::Number(10)).unwrap(), None); - // we finalize block #10 for peer 0 with a justification + // we finalize block #10, #15 and #20 for peer 0 with a justification net.peer(0).client().finalize_block(BlockId::Number(10), Some(Vec::new()), true).unwrap(); + net.peer(0).client().finalize_block(BlockId::Number(15), Some(Vec::new()), true).unwrap(); + net.peer(0).client().finalize_block(BlockId::Number(20), Some(Vec::new()), true).unwrap(); - let header = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap(); - net.peer(1).request_justification(&header.hash().into(), 10); + let h1 = net.peer(1).client().header(&BlockId::Number(10)).unwrap().unwrap(); + let h2 = net.peer(1).client().header(&BlockId::Number(15)).unwrap().unwrap(); + let h3 = net.peer(1).client().header(&BlockId::Number(20)).unwrap().unwrap(); + + // peer 1 should get the justifications from the network + net.peer(1).request_justification(&h1.hash().into(), 10); + net.peer(1).request_justification(&h2.hash().into(), 15); + net.peer(1).request_justification(&h3.hash().into(), 20); + + net.sync(); + + for height in (10..21).step_by(5) { + assert_eq!(net.peer(0).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new())); + assert_eq!(net.peer(1).client().justification(&BlockId::Number(height)).unwrap(), Some(Vec::new())); + } +} + +#[test] +fn sync_justifications_across_forks() { + let _ = ::env_logger::try_init(); + let mut net = JustificationTestNet::new(3); + // we push 5 blocks + net.peer(0).push_blocks(5, false); + // and then two forks 5 and 6 blocks long + let f1_best = net.peer(0).push_blocks_at(BlockId::Number(5), 5, false); + let f2_best = net.peer(0).push_blocks_at(BlockId::Number(5), 6, false); + + // peer 1 will only see the longer fork. but we'll request justifications + // for both and finalize the small fork instead. + net.sync(); + + net.peer(0).client().finalize_block(BlockId::Hash(f1_best), Some(Vec::new()), true).unwrap(); + + net.peer(1).request_justification(&f1_best, 10); + net.peer(1).request_justification(&f2_best, 11); net.sync(); diff --git a/core/util/fork-tree/Cargo.toml b/core/util/fork-tree/Cargo.toml new file mode 100644 index 0000000000000..8be64bedab735 --- /dev/null +++ b/core/util/fork-tree/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "fork-tree" +version = "0.1.0" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +parity-codec = "3.0" +parity-codec-derive = "3.0" diff --git a/core/util/fork-tree/src/lib.rs b/core/util/fork-tree/src/lib.rs new file mode 100644 index 0000000000000..2fac80db79955 --- /dev/null +++ b/core/util/fork-tree/src/lib.rs @@ -0,0 +1,784 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Utility library for managing tree-like ordered data with logic for pruning +//! the tree while finalizing nodes. + +#![warn(missing_docs)] + +use std::fmt; +use parity_codec_derive::{Decode, Encode}; + +/// Error occured when interating with the tree. +#[derive(Clone, Debug, PartialEq)] +pub enum Error { + /// Adding duplicate node to tree. + Duplicate, + /// Finalizing descendent of tree node without finalizing ancestor(s). + UnfinalizedAncestor, + /// Imported or finalized node that is an ancestor of previously finalized node. + Revert, + /// Error throw by client when checking for node ancestry. + Client(E), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use std::error::Error; + write!(f, "{}", self.description()) + } +} + +impl std::error::Error for Error { + fn description(&self) -> &str { + match *self { + Error::Duplicate => "Hash already exists in Tree", + Error::UnfinalizedAncestor => "Finalized descendent of Tree node without finalizing its ancestor(s) first", + Error::Revert => "Tried to import or finalize node that is an ancestor of a previously finalized node", + Error::Client(ref err) => err.description(), + } + } + + fn cause(&self) -> Option<&std::error::Error> { + None + } +} + +impl From for Error { + fn from(err: E) -> Error { + Error::Client(err) + } +} + +/// Result of finalizing a node (that could be a part of the tree or not). +#[derive(Debug, PartialEq)] +pub enum FinalizationResult { + /// The tree has changed, optionally return the value associated with the finalized node. + Changed(Option), + /// The tree has not changed. + Unchanged, +} + +/// A tree data structure that stores several nodes across multiple branches. +/// Top-level branches are called roots. The tree has functionality for +/// finalizing nodes, which means that that node is traversed, and all competing +/// branches are pruned. It also guarantees that nodes in the tree are finalized +/// in order. Each node is uniquely identified by its hash but can be ordered by +/// its number. In order to build the tree an external function must be provided +/// when interacting with the tree to establish a node's ancestry. +#[derive(Clone, Debug, Decode, Encode)] +pub struct ForkTree { + roots: Vec>, + best_finalized_number: Option, +} + +impl ForkTree where + H: PartialEq, + N: Ord, +{ + /// Create a new empty tree. + pub fn new() -> ForkTree { + ForkTree { + roots: Vec::new(), + best_finalized_number: None, + } + } + + /// Import a new node into the tree. The given function `is_descendent_of` + /// should return `true` if the second hash (target) is a descendent of the + /// first hash (base). This method assumes that nodes in the same branch are + /// imported in order. + pub fn import( + &mut self, + mut hash: H, + mut number: N, + mut data: V, + is_descendent_of: &F, + ) -> Result> + where E: std::error::Error, + F: Fn(&H, &H) -> Result, + { + if let Some(ref best_finalized_number) = self.best_finalized_number { + if number <= *best_finalized_number { + return Err(Error::Revert); + } + } + + for root in self.roots.iter_mut() { + if root.hash == hash { + return Err(Error::Duplicate); + } + + match root.import(hash, number, data, is_descendent_of)? { + Some((h, n, d)) => { + hash = h; + number = n; + data = d; + }, + None => return Ok(false), + } + } + + self.roots.push(Node { + data, + hash: hash, + number: number, + children: Vec::new(), + }); + + Ok(true) + } + + /// Iterates over the existing roots in the tree. + pub fn roots(&self) -> impl Iterator { + self.roots.iter().map(|node| (&node.hash, &node.number, &node.data)) + } + + fn node_iter(&self) -> impl Iterator> { + ForkTreeIterator { stack: self.roots.iter().collect() } + } + + /// Iterates the nodes in the tree in pre-order. + pub fn iter(&self) -> impl Iterator { + self.node_iter().map(|node| (&node.hash, &node.number, &node.data)) + } + + /// Finalize a root in the tree and return it, return `None` in case no root + /// with the given hash exists. All other roots are pruned, and the children + /// of the finalized node become the new roots. + pub fn finalize_root(&mut self, hash: &H) -> Option { + if let Some(position) = self.roots.iter().position(|node| node.hash == *hash) { + let node = self.roots.swap_remove(position); + self.roots = node.children; + self.best_finalized_number = Some(node.number); + return Some(node.data); + } + + None + } + + /// Finalize a node in the tree. This method will make sure that the node + /// being finalized is either an existing root (an return its data), or a + /// node from a competing branch (not in the tree), tree pruning is done + /// accordingly. The given function `is_descendent_of` should return `true` + /// if the second hash (target) is a descendent of the first hash (base). + pub fn finalize( + &mut self, + hash: &H, + number: N, + is_descendent_of: &F, + ) -> Result, Error> + where E: std::error::Error, + F: Fn(&H, &H) -> Result + { + if let Some(ref best_finalized_number) = self.best_finalized_number { + if number <= *best_finalized_number { + return Err(Error::Revert); + } + } + + // check if one of the current roots is being finalized + if let Some(root) = self.finalize_root(hash) { + return Ok(FinalizationResult::Changed(Some(root))); + } + + // make sure we're not finalizing a descendent of any root + for root in self.roots.iter() { + if number > root.number && is_descendent_of(&root.hash, hash)? { + return Err(Error::UnfinalizedAncestor); + } + } + + // we finalized a block earlier than any existing root (or possibly + // another fork not part of the tree). make sure to only keep roots that + // are part of the finalized branch + let mut changed = false; + self.roots.retain(|root| { + let retain = root.number > number && is_descendent_of(hash, &root.hash).unwrap_or(false); + + if !retain { + changed = true; + } + + retain + }); + + self.best_finalized_number = Some(number); + + if changed { + Ok(FinalizationResult::Changed(None)) + } else { + Ok(FinalizationResult::Unchanged) + } + } + + /// Checks if any node in the tree is finalized by either finalizing the + /// node itself or a child node that's not in the tree, guaranteeing that + /// the node being finalized isn't a descendent of any of the node's + /// children. The given `predicate` is checked on the prospective finalized + /// root and must pass for finalization to occur. The given function + /// `is_descendent_of` should return `true` if the second hash (target) is a + /// descendent of the first hash (base). + pub fn finalizes_any_with_descendent_if( + &self, + hash: &H, + number: N, + is_descendent_of: &F, + predicate: P, + ) -> Result> + where E: std::error::Error, + F: Fn(&H, &H) -> Result, + P: Fn(&V) -> bool, + { + if let Some(ref best_finalized_number) = self.best_finalized_number { + if number <= *best_finalized_number { + return Err(Error::Revert); + } + } + + // check if the given hash is equal or a descendent of any node in the + // tree, if we find a valid node that passes the predicate then we must + // ensure that we're not finalizing past any of its child nodes. + for node in self.node_iter() { + if node.hash == *hash || is_descendent_of(&node.hash, hash)? { + if predicate(&node.data) { + for node in node.children.iter() { + if node.number <= number && is_descendent_of(&node.hash, &hash)? { + return Err(Error::UnfinalizedAncestor); + } + } + + return Ok(true); + } + } + } + + Ok(false) + } + + /// Finalize a root in the tree by either finalizing the node itself or a + /// child node that's not in the tree, guaranteeing that the node being + /// finalized isn't a descendent of any of the root's children. The given + /// `predicate` is checked on the prospective finalized root and must pass for + /// finalization to occur. The given function `is_descendent_of` should + /// return `true` if the second hash (target) is a descendent of the first + /// hash (base). + pub fn finalize_with_descendent_if( + &mut self, + hash: &H, + number: N, + is_descendent_of: &F, + predicate: P, + ) -> Result, Error> + where E: std::error::Error, + F: Fn(&H, &H) -> Result, + P: Fn(&V) -> bool, + { + if let Some(ref best_finalized_number) = self.best_finalized_number { + if number <= *best_finalized_number { + return Err(Error::Revert); + } + } + + // check if the given hash is equal or a a descendent of any root, if we + // find a valid root that passes the predicate then we must ensure that + // we're not finalizing past any children node. + let mut position = None; + for (i, root) in self.roots.iter().enumerate() { + if root.hash == *hash || is_descendent_of(&root.hash, hash)? { + if predicate(&root.data) { + for node in root.children.iter() { + if node.number <= number && is_descendent_of(&node.hash, &hash)? { + return Err(Error::UnfinalizedAncestor); + } + } + + position = Some(i); + break; + } + } + } + + let node_data = position.map(|i| { + let node = self.roots.swap_remove(i); + self.roots = node.children; + self.best_finalized_number = Some(node.number); + node.data + }); + + // if the block being finalized is earlier than a given root, then it + // must be its ancestor, otherwise we can prune the root. if there's a + // root at the same height then the hashes must match. otherwise the + // node being finalized is higher than the root so it must be its + // descendent (in this case the node wasn't finalized earlier presumably + // because the predicate didn't pass). + let mut changed = false; + self.roots.retain(|root| { + let retain = + root.number > number && is_descendent_of(hash, &root.hash).unwrap_or(false) || + root.number == number && root.hash == *hash || + is_descendent_of(&root.hash, hash).unwrap_or(false); + + if !retain { + changed = true; + } + + retain + }); + + self.best_finalized_number = Some(number); + + match (node_data, changed) { + (Some(data), _) => Ok(FinalizationResult::Changed(Some(data))), + (None, true) => Ok(FinalizationResult::Changed(None)), + (None, false) => Ok(FinalizationResult::Unchanged), + } + } +} + +#[derive(Clone, Debug, Decode, Encode)] +#[cfg_attr(test, derive(PartialEq))] +struct Node { + hash: H, + number: N, + data: V, + children: Vec>, +} + +impl Node { + fn import( + &mut self, + mut hash: H, + mut number: N, + mut data: V, + is_descendent_of: &F, + ) -> Result, Error> + where E: fmt::Debug, + F: Fn(&H, &H) -> Result, + { + if self.hash == hash { + return Err(Error::Duplicate); + }; + + if number <= self.number { return Ok(Some((hash, number, data))); } + + for node in self.children.iter_mut() { + match node.import(hash, number, data, is_descendent_of)? { + Some((h, n, d)) => { + hash = h; + number = n; + data = d; + }, + None => return Ok(None), + } + } + + if is_descendent_of(&self.hash, &hash)? { + self.children.push(Node { + data, + hash: hash, + number: number, + children: Vec::new(), + }); + + Ok(None) + } else { + Ok(Some((hash, number, data))) + } + } +} + +struct ForkTreeIterator<'a, H, N, V> { + stack: Vec<&'a Node>, +} + +impl<'a, H, N, V> Iterator for ForkTreeIterator<'a, H, N, V> { + type Item = &'a Node; + + fn next(&mut self) -> Option { + self.stack.pop().map(|node| { + self.stack.extend(node.children.iter()); + node + }) + } +} + +#[cfg(test)] +mod test { + use super::{FinalizationResult, ForkTree, Error}; + + #[derive(Debug, PartialEq)] + struct TestError; + + impl std::fmt::Display for TestError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "TestError") + } + } + + impl std::error::Error for TestError {} + + fn test_fork_tree<'a>() -> (ForkTree<&'a str, u64, ()>, impl Fn(&&str, &&str) -> Result) { + let mut tree = ForkTree::new(); + + // + // - B - C - D - E + // / + // / - G + // / / + // A - F - H - I + // \ + // — J - K + // + let is_descendent_of = |base: &&str, block: &&str| -> Result { + let letters = vec!["B", "C", "D", "E", "F", "G", "H", "I", "J", "K"]; + match (*base, *block) { + ("A", b) => Ok(letters.into_iter().any(|n| n == b)), + ("B", b) => Ok(b == "C" || b == "D" || b == "E"), + ("C", b) => Ok(b == "D" || b == "E"), + ("D", b) => Ok(b == "E"), + ("E", _) => Ok(false), + ("F", b) => Ok(b == "G" || b == "H" || b == "I"), + ("G", _) => Ok(false), + ("H", b) => Ok(b == "I"), + ("I", _) => Ok(false), + ("J", b) => Ok(b == "K"), + ("K", _) => Ok(false), + ("0", _) => Ok(true), + _ => Ok(false), + } + }; + + tree.import("A", 1, (), &is_descendent_of).unwrap(); + + tree.import("B", 2, (), &is_descendent_of).unwrap(); + tree.import("C", 3, (), &is_descendent_of).unwrap(); + tree.import("D", 4, (), &is_descendent_of).unwrap(); + tree.import("E", 5, (), &is_descendent_of).unwrap(); + + tree.import("F", 2, (), &is_descendent_of).unwrap(); + tree.import("G", 3, (), &is_descendent_of).unwrap(); + + tree.import("H", 3, (), &is_descendent_of).unwrap(); + tree.import("I", 4, (), &is_descendent_of).unwrap(); + + tree.import("J", 2, (), &is_descendent_of).unwrap(); + tree.import("K", 3, (), &is_descendent_of).unwrap(); + + (tree, is_descendent_of) + } + + #[test] + fn import_doesnt_revert() { + let (mut tree, is_descendent_of) = test_fork_tree(); + + tree.finalize_root(&"A"); + + assert_eq!( + tree.best_finalized_number, + Some(1), + ); + + assert_eq!( + tree.import("A", 1, (), &is_descendent_of), + Err(Error::Revert), + ); + } + + #[test] + fn import_doesnt_add_duplicates() { + let (mut tree, is_descendent_of) = test_fork_tree(); + + assert_eq!( + tree.import("A", 1, (), &is_descendent_of), + Err(Error::Duplicate), + ); + + assert_eq!( + tree.import("I", 4, (), &is_descendent_of), + Err(Error::Duplicate), + ); + + assert_eq!( + tree.import("G", 3, (), &is_descendent_of), + Err(Error::Duplicate), + ); + + assert_eq!( + tree.import("K", 3, (), &is_descendent_of), + Err(Error::Duplicate), + ); + } + + #[test] + fn finalize_root_works() { + let finalize_a = || { + let (mut tree, ..) = test_fork_tree(); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("A", 1)], + ); + + // finalizing "A" opens up three possible forks + tree.finalize_root(&"A"); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("B", 2), ("F", 2), ("J", 2)], + ); + + tree + }; + + { + let mut tree = finalize_a(); + + // finalizing "B" will progress on its fork and remove any other competing forks + tree.finalize_root(&"B"); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("C", 3)], + ); + + // all the other forks have been pruned + assert!(tree.roots.len() == 1); + } + + { + let mut tree = finalize_a(); + + // finalizing "J" will progress on its fork and remove any other competing forks + tree.finalize_root(&"J"); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("K", 3)], + ); + + // all the other forks have been pruned + assert!(tree.roots.len() == 1); + } + } + + #[test] + fn finalize_works() { + let (mut tree, is_descendent_of) = test_fork_tree(); + + let original_roots = tree.roots.clone(); + + // finalizing a block prior to any in the node doesn't change the tree + assert_eq!( + tree.finalize(&"0", 0, &is_descendent_of), + Ok(FinalizationResult::Unchanged), + ); + + assert_eq!(tree.roots, original_roots); + + // finalizing "A" opens up three possible forks + assert_eq!( + tree.finalize(&"A", 1, &is_descendent_of), + Ok(FinalizationResult::Changed(Some(()))), + ); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("B", 2), ("F", 2), ("J", 2)], + ); + + // finalizing anything lower than what we observed will fail + assert_eq!( + tree.best_finalized_number, + Some(1), + ); + + assert_eq!( + tree.finalize(&"Z", 1, &is_descendent_of), + Err(Error::Revert), + ); + + // trying to finalize a node without finalizing its ancestors first will fail + assert_eq!( + tree.finalize(&"H", 3, &is_descendent_of), + Err(Error::UnfinalizedAncestor), + ); + + // after finalizing "F" we can finalize "H" + assert_eq!( + tree.finalize(&"F", 2, &is_descendent_of), + Ok(FinalizationResult::Changed(Some(()))), + ); + + assert_eq!( + tree.finalize(&"H", 3, &is_descendent_of), + Ok(FinalizationResult::Changed(Some(()))), + ); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("I", 4)], + ); + + // finalizing a node from another fork that isn't part of the tree clears the tree + assert_eq!( + tree.finalize(&"Z", 5, &is_descendent_of), + Ok(FinalizationResult::Changed(None)), + ); + + assert!(tree.roots.is_empty()); + } + + #[test] + fn finalize_with_descendent_works() { + #[derive(Debug, PartialEq)] + struct Change { effective: u64 }; + + let (mut tree, is_descendent_of) = { + let mut tree = ForkTree::new(); + + let is_descendent_of = |base: &&str, block: &&str| -> Result { + + // + // A0 #1 - (B #2) - (C #5) - D #10 - E #15 - (F #100) + // \ + // - (G #100) + // + // A1 #1 + // + // Nodes B, C, F and G are not part of the tree. + match (*base, *block) { + ("A0", b) => Ok(b == "B" || b == "C" || b == "D" || b == "G"), + ("A1", _) => Ok(false), + ("C", b) => Ok(b == "D"), + ("D", b) => Ok(b == "E" || b == "F" || b == "G"), + ("E", b) => Ok(b == "F"), + _ => Ok(false), + } + }; + + tree.import("A0", 1, Change { effective: 5 }, &is_descendent_of).unwrap(); + tree.import("A1", 1, Change { effective: 5 }, &is_descendent_of).unwrap(); + tree.import("D", 10, Change { effective: 10 }, &is_descendent_of).unwrap(); + tree.import("E", 15, Change { effective: 50 }, &is_descendent_of).unwrap(); + + (tree, is_descendent_of) + }; + + assert_eq!( + tree.finalizes_any_with_descendent_if( + &"B", + 2, + &is_descendent_of, + |c| c.effective <= 2, + ), + Ok(false), + ); + + // finalizing "B" doesn't finalize "A0" since the predicate doesn't pass, + // although it will clear out "A1" from the tree + assert_eq!( + tree.finalize_with_descendent_if( + &"B", + 2, + &is_descendent_of, + |c| c.effective <= 2, + ), + Ok(FinalizationResult::Changed(None)), + ); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("A0", 1)], + ); + + // finalizing "C" will finalize the node "A0" and prune it out of the tree + assert_eq!( + tree.finalizes_any_with_descendent_if( + &"C", + 5, + &is_descendent_of, + |c| c.effective <= 5, + ), + Ok(true), + ); + + assert_eq!( + tree.finalize_with_descendent_if( + &"C", + 5, + &is_descendent_of, + |c| c.effective <= 5, + ), + Ok(FinalizationResult::Changed(Some(Change { effective: 5 }))), + ); + + assert_eq!( + tree.roots().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![("D", 10)], + ); + + // finalizing "F" will fail since it would finalize past "E" without finalizing "D" first + assert_eq!( + tree.finalizes_any_with_descendent_if( + &"F", + 100, + &is_descendent_of, + |c| c.effective <= 100, + ), + Err(Error::UnfinalizedAncestor), + ); + + // it will work with "G" though since it is not in the same branch as "E" + assert_eq!( + tree.finalizes_any_with_descendent_if( + &"G", + 100, + &is_descendent_of, + |c| c.effective <= 100, + ), + Ok(true), + ); + + assert_eq!( + tree.finalize_with_descendent_if( + &"G", + 100, + &is_descendent_of, + |c| c.effective <= 100, + ), + Ok(FinalizationResult::Changed(Some(Change { effective: 10 }))), + ); + + // "E" will be pruned out + assert_eq!(tree.roots().count(), 0); + } + + #[test] + fn iter_iterates_in_preorder() { + let (tree, ..) = test_fork_tree(); + assert_eq!( + tree.iter().map(|(h, n, _)| (h.clone(), n.clone())).collect::>(), + vec![ + ("A", 1), + ("J", 2), ("K", 3), + ("F", 2), ("H", 3), ("I", 4), + ("G", 3), + ("B", 2), ("C", 3), ("D", 4), ("E", 5), + ], + ); + } +}