diff --git a/core/finality-grandpa/src/consensus_changes.rs b/core/finality-grandpa/src/consensus_changes.rs
new file mode 100644
index 0000000000000..c3f3c274e4683
--- /dev/null
+++ b/core/finality-grandpa/src/consensus_changes.rs
@@ -0,0 +1,69 @@
+// Copyright 2018-2019 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use std::sync::Arc;
+
+/// Consensus-related data changes tracker.
+#[derive(Clone, Debug, Encode, Decode)]
+pub(crate) struct ConsensusChanges {
+ pending_changes: Vec<(N, H)>,
+}
+
+impl ConsensusChanges {
+ /// Create empty consensus changes.
+ pub(crate) fn empty() -> Self {
+ ConsensusChanges { pending_changes: Vec::new(), }
+ }
+
+ /// Note unfinalized change of consensus-related data.
+ pub(crate) fn note_change(&mut self, at: (N, H)) {
+ let idx = self.pending_changes
+ .binary_search_by_key(&at.0, |change| change.0)
+ .unwrap_or_else(|i| i);
+ self.pending_changes.insert(idx, at);
+ }
+
+ /// Finalize all pending consensus changes that are finalized by given block.
+ /// Returns true if there any changes were finalized.
+ pub(crate) fn finalize ::client::error::Result>>(
+ &mut self,
+ block: (N, H),
+ canonical_at_height: F,
+ ) -> ::client::error::Result<(bool, bool)> {
+ let (split_idx, has_finalized_changes) = self.pending_changes.iter()
+ .enumerate()
+ .take_while(|(_, &(at_height, _))| at_height <= block.0)
+ .fold((None, Ok(false)), |(_, has_finalized_changes), (idx, ref at)|
+ (
+ Some(idx),
+ has_finalized_changes
+ .and_then(|has_finalized_changes| if has_finalized_changes {
+ Ok(has_finalized_changes)
+ } else {
+ canonical_at_height(at.0).map(|can_hash| Some(at.1) == can_hash)
+ }),
+ ));
+
+ let altered_changes = split_idx.is_some();
+ if let Some(split_idx) = split_idx {
+ self.pending_changes = self.pending_changes.split_off(split_idx + 1);
+ }
+ has_finalized_changes.map(|has_finalized_changes| (altered_changes, has_finalized_changes))
+ }
+}
+
+/// Thread-safe consensus changes tracker reference.
+pub(crate) type SharedConsensusChanges = Arc>>;
diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs
new file mode 100644
index 0000000000000..7c52da3e3445b
--- /dev/null
+++ b/core/finality-grandpa/src/environment.rs
@@ -0,0 +1,600 @@
+// Copyright 2018-2019 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use std::fmt;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use codec::Encode;
+use futures::prelude::*;
+use tokio::timer::Delay;
+
+use client::{
+ backend::Backend, BlockchainEvents, CallExecutor, Client, error::Error as ClientError
+};
+use grandpa::{
+ BlockNumberOps, Equivocation, Error as GrandpaError, round::State as RoundState, voter, VoterSet,
+};
+use runtime_primitives::generic::BlockId;
+use runtime_primitives::traits::{
+ As, Block as BlockT, Header as HeaderT, NumberFor, One, Zero,
+};
+use substrate_primitives::{Blake2Hasher, ed25519,Ed25519AuthorityId, H256};
+
+use crate::{
+ AUTHORITY_SET_KEY, CONSENSUS_CHANGES_KEY, LAST_COMPLETED_KEY,
+ Commit, Config, Error, Network, Precommit, Prevote, LastCompleted,
+};
+use authorities::{AuthoritySet, SharedAuthoritySet};
+use consensus_changes::SharedConsensusChanges;
+use justification::GrandpaJustification;
+use until_imported::UntilVoteTargetImported;
+
+/// The environment we run GRANDPA in.
+pub(crate) struct Environment, RA> {
+ pub(crate) inner: Arc>,
+ pub(crate) voters: Arc>,
+ pub(crate) config: Config,
+ pub(crate) authority_set: SharedAuthoritySet>,
+ pub(crate) consensus_changes: SharedConsensusChanges>,
+ pub(crate) network: N,
+ pub(crate) set_id: u64,
+}
+
+impl, B, E, N, RA> grandpa::Chain> for Environment where
+ Block: 'static,
+ B: Backend + 'static,
+ E: CallExecutor + 'static,
+ N: Network + 'static,
+ N::In: 'static,
+ NumberFor: BlockNumberOps,
+{
+ fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result, GrandpaError> {
+ if base == block { return Err(GrandpaError::NotDescendent) }
+
+ let tree_route_res = ::client::blockchain::tree_route(
+ self.inner.backend().blockchain(),
+ BlockId::Hash(block),
+ BlockId::Hash(base),
+ );
+
+ let tree_route = match tree_route_res {
+ Ok(tree_route) => tree_route,
+ Err(e) => {
+ debug!(target: "afg", "Encountered error computing ancestry between block {:?} and base {:?}: {:?}",
+ block, base, e);
+
+ return Err(GrandpaError::NotDescendent);
+ }
+ };
+
+ if tree_route.common_block().hash != base {
+ return Err(GrandpaError::NotDescendent);
+ }
+
+ // skip one because our ancestry is meant to start from the parent of `block`,
+ // and `tree_route` includes it.
+ Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
+ }
+
+ fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor)> {
+ // NOTE: when we finalize an authority set change through the sync protocol the voter is
+ // signalled asynchronously. therefore the voter could still vote in the next round
+ // before activating the new set. the `authority_set` is updated immediately thus we
+ // restrict the voter based on that.
+ if self.set_id != self.authority_set.inner().read().current().0 {
+ return None;
+ }
+
+ // we refuse to vote beyond the current limit number where transitions are scheduled to
+ // occur.
+ // once blocks are finalized that make that transition irrelevant or activate it,
+ // we will proceed onwards. most of the time there will be no pending transition.
+ let limit = self.authority_set.current_limit();
+ debug!(target: "afg", "Finding best chain containing block {:?} with number limit {:?}", block, limit);
+
+ match self.inner.best_containing(block, None) {
+ Ok(Some(mut best_hash)) => {
+ let base_header = self.inner.header(&BlockId::Hash(block)).ok()?
+ .expect("Header known to exist after `best_containing` call; qed");
+
+ if let Some(limit) = limit {
+ // this is a rare case which might cause issues,
+ // might be better to return the header itself.
+ if *base_header.number() > limit {
+ debug!(target: "afg", "Encountered error finding best chain containing {:?} with limit {:?}: target block is after limit",
+ block,
+ limit,
+ );
+ return None;
+ }
+ }
+
+ let mut best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()?
+ .expect("Header known to exist after `best_containing` call; qed");
+
+ // we target a vote towards 3/4 of the unfinalized chain (rounding up)
+ let target = {
+ let two = NumberFor::::one() + One::one();
+ let three = two + One::one();
+ let four = three + One::one();
+
+ let diff = *best_header.number() - *base_header.number();
+ let diff = ((diff * three) + two) / four;
+
+ *base_header.number() + diff
+ };
+
+ // unless our vote is currently being limited due to a pending change
+ let target = limit.map(|limit| limit.min(target)).unwrap_or(target);
+
+ // walk backwards until we find the target block
+ loop {
+ if *best_header.number() < target { unreachable!(); }
+ if *best_header.number() == target {
+ return Some((best_hash, *best_header.number()));
+ }
+
+ best_hash = *best_header.parent_hash();
+ best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()?
+ .expect("Header known to exist after `best_containing` call; qed");
+ }
+ },
+ Ok(None) => {
+ debug!(target: "afg", "Encountered error finding best chain containing {:?}: couldn't find target block", block);
+ None
+ }
+ Err(e) => {
+ debug!(target: "afg", "Encountered error finding best chain containing {:?}: {:?}", block, e);
+ None
+ }
+ }
+ }
+}
+
+/// A new authority set along with the canonical block it changed at.
+#[derive(Debug)]
+pub(crate) struct NewAuthoritySet {
+ pub(crate) canon_number: N,
+ pub(crate) canon_hash: H,
+ pub(crate) set_id: u64,
+ pub(crate) authorities: Vec<(Ed25519AuthorityId, u64)>,
+}
+
+/// Signals either an early exit of a voter or an error.
+#[derive(Debug)]
+pub(crate) enum ExitOrError {
+ /// An error occurred.
+ Error(Error),
+ /// Early exit of the voter: the new set ID and the new authorities along with respective weights.
+ AuthoritiesChanged(NewAuthoritySet),
+}
+
+impl From for ExitOrError {
+ fn from(e: Error) -> Self {
+ ExitOrError::Error(e)
+ }
+}
+
+impl From for ExitOrError {
+ fn from(e: ClientError) -> Self {
+ ExitOrError::Error(Error::Client(e))
+ }
+}
+
+impl From for ExitOrError {
+ fn from(e: grandpa::Error) -> Self {
+ ExitOrError::Error(Error::from(e))
+ }
+}
+
+impl ::std::error::Error for ExitOrError { }
+
+impl fmt::Display for ExitOrError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ ExitOrError::Error(ref e) => write!(f, "{:?}", e),
+ ExitOrError::AuthoritiesChanged(_) => write!(f, "restarting voter on new authorities"),
+ }
+ }
+}
+
+
+impl, N, RA> voter::Environment> for Environment where
+ Block: 'static,
+ B: Backend + 'static,
+ E: CallExecutor + 'static + Send + Sync,
+ N: Network + 'static + Send,
+ N::In: 'static + Send,
+ RA: 'static + Send + Sync,
+ NumberFor: BlockNumberOps,
+{
+ type Timer = Box + Send>;
+ type Id = Ed25519AuthorityId;
+ type Signature = ed25519::Signature;
+
+ // regular round message streams
+ type In = Box, Self::Signature, Self::Id>,
+ Error = Self::Error,
+ > + Send>;
+ type Out = Box>,
+ SinkError = Self::Error,
+ > + Send>;
+
+ type Error = ExitOrError>;
+
+ fn round_data(
+ &self,
+ round: u64
+ ) -> voter::RoundData {
+ let now = Instant::now();
+ let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
+ let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
+
+ let incoming = ::communication::checked_message_stream::(
+ round,
+ self.set_id,
+ self.network.messages_for(round, self.set_id),
+ self.voters.clone(),
+ );
+
+ let local_key = self.config.local_key.as_ref()
+ .filter(|pair| self.voters.contains_key(&pair.public().into()));
+
+ let (out_rx, outgoing) = ::communication::outgoing_messages::(
+ round,
+ self.set_id,
+ local_key.cloned(),
+ self.voters.clone(),
+ self.network.clone(),
+ );
+
+ // schedule incoming messages from the network to be held until
+ // corresponding blocks are imported.
+ let incoming = UntilVoteTargetImported::new(
+ self.inner.import_notification_stream(),
+ self.inner.clone(),
+ incoming,
+ );
+
+ // join incoming network messages with locally originating ones.
+ let incoming = Box::new(out_rx.select(incoming).map_err(Into::into));
+
+ // schedule network message cleanup when sink drops.
+ let outgoing = Box::new(outgoing.sink_map_err(Into::into));
+
+ voter::RoundData {
+ prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())),
+ precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())),
+ incoming,
+ outgoing,
+ }
+ }
+
+ fn completed(&self, round: u64, state: RoundState>) -> Result<(), Self::Error> {
+ debug!(
+ target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
+ self.config.name(),
+ round,
+ self.set_id,
+ state.estimate.as_ref().map(|e| e.1),
+ state.finalized.as_ref().map(|e| e.1),
+ );
+
+ let encoded_state = (round, state).encode();
+ let res = Backend::insert_aux(&**self.inner.backend(), &[(LAST_COMPLETED_KEY, &encoded_state[..])], &[]);
+ if let Err(e) = res {
+ warn!(target: "afg", "Shutting down voter due to error bookkeeping last completed round in DB: {:?}", e);
+ Err(Error::Client(e).into())
+ } else {
+ Ok(())
+ }
+ }
+
+ fn finalize_block(&self, hash: Block::Hash, number: NumberFor, round: u64, commit: Commit) -> Result<(), Self::Error> {
+ finalize_block(
+ &*self.inner,
+ &self.authority_set,
+ &self.consensus_changes,
+ Some(As::sa(self.config.justification_period)),
+ hash,
+ number,
+ (round, commit).into(),
+ )
+ }
+
+ fn round_commit_timer(&self) -> Self::Timer {
+ use rand::{thread_rng, Rng};
+
+ //random between 0-1 seconds.
+ let delay: u64 = thread_rng().gen_range(0, 1000);
+ Box::new(Delay::new(
+ Instant::now() + Duration::from_millis(delay)
+ ).map_err(|e| Error::Timer(e).into()))
+ }
+
+ fn prevote_equivocation(
+ &self,
+ _round: u64,
+ equivocation: ::grandpa::Equivocation, Self::Signature>
+ ) {
+ warn!(target: "afg", "Detected prevote equivocation in the finality worker: {:?}", equivocation);
+ // nothing yet; this could craft misbehavior reports of some kind.
+ }
+
+ fn precommit_equivocation(
+ &self,
+ _round: u64,
+ equivocation: Equivocation, Self::Signature>
+ ) {
+ warn!(target: "afg", "Detected precommit equivocation in the finality worker: {:?}", equivocation);
+ // nothing yet
+ }
+}
+
+pub(crate) enum JustificationOrCommit {
+ Justification(GrandpaJustification),
+ Commit((u64, Commit)),
+}
+
+impl From<(u64, Commit)> for JustificationOrCommit {
+ fn from(commit: (u64, Commit)) -> JustificationOrCommit {
+ JustificationOrCommit::Commit(commit)
+ }
+}
+
+impl From> for JustificationOrCommit {
+ fn from(justification: GrandpaJustification) -> JustificationOrCommit {
+ JustificationOrCommit::Justification(justification)
+ }
+}
+
+/// Finalize the given block and apply any authority set changes. If an
+/// authority set change is enacted then a justification is created (if not
+/// given) and stored with the block when finalizing it.
+pub(crate) fn finalize_block, E, RA>(
+ client: &Client,
+ authority_set: &SharedAuthoritySet>,
+ consensus_changes: &SharedConsensusChanges>,
+ justification_period: Option>,
+ hash: Block::Hash,
+ number: NumberFor,
+ justification_or_commit: JustificationOrCommit,
+) -> Result<(), ExitOrError>> where
+ B: Backend,
+ E: CallExecutor + Send + Sync,
+ RA: Send + Sync,
+{
+ // lock must be held through writing to DB to avoid race
+ let mut authority_set = authority_set.inner().write();
+
+ // FIXME #1483: clone only when changed
+ let old_authority_set = authority_set.clone();
+ // needed in case there is an authority set change, used for reverting in
+ // case of error
+ let mut old_last_completed = None;
+
+ let mut consensus_changes = consensus_changes.lock();
+ let status = authority_set.apply_changes(number, |canon_number| {
+ canonical_at_height(client, (hash, number), canon_number)
+ })?;
+
+ if status.changed {
+ // write new authority set state to disk.
+ let encoded_set = authority_set.encode();
+
+ let write_result = if let Some((ref canon_hash, ref canon_number)) = status.new_set_block {
+ // we also overwrite the "last completed round" entry with a blank slate
+ // because from the perspective of the finality gadget, the chain has
+ // reset.
+ let round_state = RoundState::genesis((*canon_hash, *canon_number));
+ let last_completed: LastCompleted<_, _> = (0, round_state);
+ let encoded = last_completed.encode();
+
+ old_last_completed = Backend::get_aux(&**client.backend(), LAST_COMPLETED_KEY)?;
+
+ Backend::insert_aux(
+ &**client.backend(),
+ &[
+ (AUTHORITY_SET_KEY, &encoded_set[..]),
+ (LAST_COMPLETED_KEY, &encoded[..]),
+ ],
+ &[]
+ )
+ } else {
+ Backend::insert_aux(&**client.backend(), &[(AUTHORITY_SET_KEY, &encoded_set[..])], &[])
+ };
+
+ if let Err(e) = write_result {
+ warn!(target: "finality", "Failed to write updated authority set to disk. Bailing.");
+ warn!(target: "finality", "Node is in a potentially inconsistent state.");
+
+ return Err(e.into());
+ }
+ }
+
+ // check if this is this is the first finalization of some consensus changes
+ let (alters_consensus_changes, finalizes_consensus_changes) = consensus_changes
+ .finalize((number, hash), |at_height| canonical_at_height(client, (hash, number), at_height))?;
+
+ // holds the old consensus changes in case it is changed below, needed for
+ // reverting in case of failure
+ let mut old_consensus_changes = None;
+ if alters_consensus_changes {
+ old_consensus_changes = Some(consensus_changes.clone());
+
+ let encoded = consensus_changes.encode();
+ let write_result = Backend::insert_aux(&**client.backend(), &[(CONSENSUS_CHANGES_KEY, &encoded[..])], &[]);
+ if let Err(e) = write_result {
+ warn!(target: "finality", "Failed to write updated consensus changes to disk. Bailing.");
+ warn!(target: "finality", "Node is in a potentially inconsistent state.");
+
+ return Err(e.into());
+ }
+ }
+
+ let aux = |authority_set: &AuthoritySet>| {
+ // NOTE: this code assumes that honest voters will never vote past a
+ // transition block, thus we don't have to worry about the case where
+ // we have a transition with `effective_block = N`, but we finalize
+ // `N+1`. this assumption is required to make sure we store
+ // justifications for transition blocks which will be requested by
+ // syncing clients.
+ let justification = match justification_or_commit {
+ JustificationOrCommit::Justification(justification) => Some(justification.encode()),
+ JustificationOrCommit::Commit((round_number, commit)) => {
+ let mut justification_required =
+ // justification is always required when block that enacts new authorities
+ // set is finalized
+ status.new_set_block.is_some() ||
+ // justification is required when consensus changes are finalized
+ finalizes_consensus_changes;
+
+ // justification is required every N blocks to be able to prove blocks
+ // finalization to remote nodes
+ if !justification_required {
+ if let Some(justification_period) = justification_period {
+ let last_finalized_number = client.info()?.chain.finalized_number;
+ justification_required =
+ (!last_finalized_number.is_zero() || number - last_finalized_number == justification_period) &&
+ (last_finalized_number / justification_period != number / justification_period);
+ }
+ }
+
+ if justification_required {
+ let justification = GrandpaJustification::from_commit(
+ client,
+ round_number,
+ commit,
+ )?;
+
+ Some(justification.encode())
+ } else {
+ None
+ }
+ },
+ };
+
+ debug!(target: "afg", "Finalizing blocks up to ({:?}, {})", number, hash);
+
+ // ideally some handle to a synchronization oracle would be used
+ // to avoid unconditionally notifying.
+ client.finalize_block(BlockId::Hash(hash), justification, true).map_err(|e| {
+ warn!(target: "finality", "Error applying finality to block {:?}: {:?}", (hash, number), e);
+ e
+ })?;
+
+ if let Some((canon_hash, canon_number)) = status.new_set_block {
+ // the authority set has changed.
+ let (new_id, set_ref) = authority_set.current();
+
+ if set_ref.len() > 16 {
+ info!("Applying GRANDPA set change to new set with {} authorities", set_ref.len());
+ } else {
+ info!("Applying GRANDPA set change to new set {:?}", set_ref);
+ }
+
+ Err(ExitOrError::AuthoritiesChanged(NewAuthoritySet {
+ canon_hash,
+ canon_number,
+ set_id: new_id,
+ authorities: set_ref.to_vec(),
+ }))
+ } else {
+ Ok(())
+ }
+ };
+
+ match aux(&authority_set) {
+ Err(ExitOrError::Error(err)) => {
+ debug!(target: "afg", "Reverting authority set and/or consensus changes after block finalization error: {:?}", err);
+
+ let mut revert_aux = Vec::new();
+
+ if status.changed {
+ revert_aux.push((AUTHORITY_SET_KEY, old_authority_set.encode()));
+ if let Some(old_last_completed) = old_last_completed {
+ revert_aux.push((LAST_COMPLETED_KEY, old_last_completed));
+ }
+
+ *authority_set = old_authority_set.clone();
+ }
+
+ if let Some(old_consensus_changes) = old_consensus_changes {
+ revert_aux.push((CONSENSUS_CHANGES_KEY, old_consensus_changes.encode()));
+
+ *consensus_changes = old_consensus_changes;
+ }
+
+ let write_result = Backend::insert_aux(
+ &**client.backend(),
+ revert_aux.iter().map(|(k, v)| (*k, &**v)).collect::>().iter(),
+ &[],
+ );
+
+ if let Err(e) = write_result {
+ warn!(target: "finality", "Failed to revert consensus changes to disk. Bailing.");
+ warn!(target: "finality", "Node is in a potentially inconsistent state.");
+
+ return Err(e.into());
+ }
+
+ Err(ExitOrError::Error(err))
+ },
+ res => res,
+ }
+}
+
+/// Using the given base get the block at the given height on this chain. The
+/// target block must be an ancestor of base, therefore `height <= base.height`.
+pub(crate) fn canonical_at_height, RA>(
+ client: &Client,
+ base: (Block::Hash, NumberFor),
+ height: NumberFor,
+) -> Result, ClientError> where
+ B: Backend,
+ E: CallExecutor + Send + Sync,
+{
+ use runtime_primitives::traits::{One, Zero};
+
+ if height > base.1 {
+ return Ok(None);
+ }
+
+ if height == base.1 {
+ return Ok(Some(base.0));
+ }
+
+ let mut current = match client.header(&BlockId::Hash(base.0))? {
+ Some(header) => header,
+ _ => return Ok(None),
+ };
+
+ let mut steps = base.1 - height;
+
+ while steps > NumberFor::::zero() {
+ current = match client.header(&BlockId::Hash(*current.parent_hash()))? {
+ Some(header) => header,
+ _ => return Ok(None),
+ };
+
+ steps -= NumberFor::::one();
+ }
+
+ Ok(Some(current.hash()))
+}
diff --git a/core/finality-grandpa/src/finality_proof.rs b/core/finality-grandpa/src/finality_proof.rs
index 3ba9973c0a8b7..6335d9bac3e3a 100644
--- a/core/finality-grandpa/src/finality_proof.rs
+++ b/core/finality-grandpa/src/finality_proof.rs
@@ -44,7 +44,7 @@ use runtime_primitives::traits::{
};
use substrate_primitives::{Ed25519AuthorityId, H256};
-use GrandpaJustification;
+use justification::GrandpaJustification;
/// Prepare proof-of-finality for the given block.
///
diff --git a/core/finality-grandpa/src/import.rs b/core/finality-grandpa/src/import.rs
new file mode 100644
index 0000000000000..aad4041185238
--- /dev/null
+++ b/core/finality-grandpa/src/import.rs
@@ -0,0 +1,335 @@
+// Copyright 2018-2019 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use std::sync::Arc;
+
+use codec::Encode;
+use futures::sync::mpsc;
+
+use client::{CallExecutor, Client};
+use client::backend::Backend;
+use consensus_common::{
+ BlockImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind,
+ ImportBlock, ImportResult, JustificationImport,
+};
+use fg_primitives::GrandpaApi;
+use runtime_primitives::Justification;
+use runtime_primitives::generic::BlockId;
+use runtime_primitives::traits::{
+ Block as BlockT, DigestFor, DigestItemFor, DigestItem,
+ Header as HeaderT, NumberFor, ProvideRuntimeApi,
+};
+use substrate_primitives::{H256, Ed25519AuthorityId, Blake2Hasher};
+
+use crate::{AUTHORITY_SET_KEY, Error};
+use authorities::SharedAuthoritySet;
+use consensus_changes::SharedConsensusChanges;
+use environment::{canonical_at_height, finalize_block, ExitOrError, NewAuthoritySet};
+use justification::GrandpaJustification;
+
+/// A block-import handler for GRANDPA.
+///
+/// This scans each imported block for signals of changing authority set.
+/// If the block being imported enacts an authority set change then:
+/// - If the current authority set is still live: we import the block
+/// - Otherwise, the block must include a valid justification.
+///
+/// When using GRANDPA, the block import worker should be using this block import
+/// object.
+pub struct GrandpaBlockImport, RA, PRA> {
+ inner: Arc>,
+ authority_set: SharedAuthoritySet>,
+ authority_set_change: mpsc::UnboundedSender>>,
+ consensus_changes: SharedConsensusChanges>,
+ api: Arc,
+}
+
+impl, RA, PRA> JustificationImport
+ for GrandpaBlockImport where
+ NumberFor: grandpa::BlockNumberOps,
+ B: Backend + 'static,
+ E: CallExecutor + 'static + Clone + Send + Sync,
+ DigestFor: Encode,
+ DigestItemFor: DigestItem,
+ RA: Send + Sync,
+ PRA: ProvideRuntimeApi,
+ PRA::Api: GrandpaApi,
+{
+ type Error = ConsensusError;
+
+ fn on_start(&self, link: &::consensus_common::import_queue::Link) {
+ let chain_info = match self.inner.info() {
+ Ok(info) => info.chain,
+ _ => return,
+ };
+
+ // request justifications for all pending changes for which change blocks have already been imported
+ for pending_change in self.authority_set.inner().read().pending_changes() {
+ if pending_change.effective_number() > chain_info.finalized_number &&
+ pending_change.effective_number() <= chain_info.best_number
+ {
+ let effective_block_hash = self.inner.best_containing(
+ pending_change.canon_hash,
+ Some(pending_change.effective_number()),
+ );
+
+ if let Ok(Some(hash)) = effective_block_hash {
+ if let Ok(Some(header)) = self.inner.header(&BlockId::Hash(hash)) {
+ if *header.number() == pending_change.effective_number() {
+ link.request_justification(&header.hash(), *header.number());
+ }
+ }
+ }
+ }
+ }
+ }
+
+ fn import_justification(
+ &self,
+ hash: Block::Hash,
+ number: NumberFor,
+ justification: Justification,
+ ) -> Result<(), Self::Error> {
+ self.import_justification(hash, number, justification, false)
+ }
+}
+
+impl, RA, PRA> BlockImport
+ for GrandpaBlockImport where
+ NumberFor: grandpa::BlockNumberOps,
+ B: Backend + 'static,
+ E: CallExecutor + 'static + Clone + Send + Sync,
+ DigestFor: Encode,
+ DigestItemFor: DigestItem,
+ RA: Send + Sync,
+ PRA: ProvideRuntimeApi,
+ PRA::Api: GrandpaApi,
+{
+ type Error = ConsensusError;
+
+ fn import_block(&self, mut block: ImportBlock, new_authorities: Option>)
+ -> Result
+ {
+ use authorities::PendingChange;
+
+ let hash = block.post_header().hash();
+ let number = block.header.number().clone();
+
+ let maybe_change = self.api.runtime_api().grandpa_pending_change(
+ &BlockId::hash(*block.header.parent_hash()),
+ &block.header.digest().clone(),
+ );
+
+ let maybe_change = match maybe_change {
+ Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
+ Ok(maybe) => maybe,
+ };
+
+ // when we update the authorities, we need to hold the lock
+ // until the block is written to prevent a race if we need to restore
+ // the old authority set on error.
+ let just_in_case = if let Some(change) = maybe_change {
+ let parent_hash = *block.header.parent_hash();
+
+ let mut authorities = self.authority_set.inner().write();
+ let old_set = authorities.clone();
+
+ let is_equal_or_descendent_of = |base: &Block::Hash| -> Result<(), ConsensusError> {
+ let error = || {
+ debug!(target: "afg", "rejecting change: {} is in the same chain as {}", hash, base);
+ Err(ConsensusErrorKind::ClientImport("Incorrect base hash".to_string()).into())
+ };
+
+ if *base == hash { return error(); }
+ if *base == parent_hash { return error(); }
+
+ let tree_route = ::client::blockchain::tree_route(
+ self.inner.backend().blockchain(),
+ BlockId::Hash(parent_hash),
+ BlockId::Hash(*base),
+ );
+
+ let tree_route = match tree_route {
+ Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
+ Ok(route) => route,
+ };
+
+ if tree_route.common_block().hash == *base {
+ return error();
+ }
+
+ Ok(())
+ };
+
+ authorities.add_pending_change(
+ PendingChange {
+ next_authorities: change.next_authorities,
+ finalization_depth: change.delay,
+ canon_height: number,
+ canon_hash: hash,
+ },
+ is_equal_or_descendent_of,
+ )?;
+
+ block.auxiliary.push((AUTHORITY_SET_KEY.to_vec(), Some(authorities.encode())));
+ Some((old_set, authorities))
+ } else {
+ None
+ };
+
+ // we don't want to finalize on `inner.import_block`
+ let justification = block.justification.take();
+ let enacts_consensus_change = new_authorities.is_some();
+ let import_result = self.inner.import_block(block, new_authorities);
+
+ let import_result = {
+ // we scope this so that `just_in_case` is dropped eagerly and releases the authorities lock
+ let revert_authorities = || if let Some((old_set, mut authorities)) = just_in_case {
+ *authorities = old_set;
+ };
+
+ match import_result {
+ Ok(ImportResult::Queued) => ImportResult::Queued,
+ Ok(r) => {
+ debug!(target: "afg", "Restoring old authority set after block import result: {:?}", r);
+ revert_authorities();
+ return Ok(r);
+ },
+ Err(e) => {
+ debug!(target: "afg", "Restoring old authority set after block import error: {:?}", e);
+ revert_authorities();
+ return Err(ConsensusErrorKind::ClientImport(e.to_string()).into());
+ },
+ }
+ };
+
+ let enacts_change = self.authority_set.inner().read().enacts_change(number, |canon_number| {
+ canonical_at_height(&self.inner, (hash, number), canon_number)
+ }).map_err(|e| ConsensusError::from(ConsensusErrorKind::ClientImport(e.to_string())))?;
+
+ if !enacts_change && !enacts_consensus_change {
+ return Ok(import_result);
+ }
+
+ match justification {
+ Some(justification) => {
+ self.import_justification(hash, number, justification, enacts_change)?;
+ },
+ None => {
+ if enacts_change {
+ trace!(
+ target: "finality",
+ "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
+ number,
+ );
+ }
+
+ // we have imported block with consensus data changes, but without justification
+ // => remember to create justification when next block will be finalized
+ if enacts_consensus_change {
+ self.consensus_changes.lock().note_change((number, hash));
+ }
+
+ return Ok(ImportResult::NeedsJustification);
+ }
+ }
+
+ Ok(import_result)
+ }
+}
+
+impl, RA, PRA> GrandpaBlockImport {
+ pub(crate) fn new(
+ inner: Arc>,
+ authority_set: SharedAuthoritySet>,
+ authority_set_change: mpsc::UnboundedSender>>,
+ consensus_changes: SharedConsensusChanges>,
+ api: Arc,
+ ) -> GrandpaBlockImport {
+ GrandpaBlockImport {
+ inner,
+ authority_set,
+ authority_set_change,
+ consensus_changes,
+ api,
+ }
+ }
+}
+
+impl, RA, PRA>
+ GrandpaBlockImport where
+ NumberFor: grandpa::BlockNumberOps,
+ B: Backend + 'static,
+ E: CallExecutor + 'static + Clone + Send + Sync,
+ RA: Send + Sync,
+{
+
+ /// Import a block justification and finalize the block.
+ ///
+ /// If `enacts_change` is set to true, then finalizing this block *must*
+ /// enact an authority set change, the function will panic otherwise.
+ fn import_justification(
+ &self,
+ hash: Block::Hash,
+ number: NumberFor,
+ justification: Justification,
+ enacts_change: bool,
+ ) -> Result<(), ConsensusError> {
+ let justification = GrandpaJustification::decode_and_verify(
+ justification,
+ self.authority_set.set_id(),
+ &self.authority_set.current_authorities(),
+ );
+
+ let justification = match justification {
+ Err(e) => return Err(ConsensusErrorKind::ClientImport(e.to_string()).into()),
+ Ok(justification) => justification,
+ };
+
+ let result = finalize_block(
+ &*self.inner,
+ &self.authority_set,
+ &self.consensus_changes,
+ None,
+ hash,
+ number,
+ justification.into(),
+ );
+
+ match result {
+ Err(ExitOrError::AuthoritiesChanged(new)) => {
+ info!(target: "finality", "Imported justification for block #{} that enacts authority set change, signalling voter.", number);
+ if let Err(e) = self.authority_set_change.unbounded_send(new) {
+ return Err(ConsensusErrorKind::ClientImport(e.to_string()).into());
+ }
+ },
+ Err(ExitOrError::Error(e)) => {
+ return Err(match e {
+ Error::Grandpa(error) => ConsensusErrorKind::ClientImport(error.to_string()),
+ Error::Network(error) => ConsensusErrorKind::ClientImport(error),
+ Error::Blockchain(error) => ConsensusErrorKind::ClientImport(error),
+ Error::Client(error) => ConsensusErrorKind::ClientImport(error.to_string()),
+ Error::Timer(error) => ConsensusErrorKind::ClientImport(error.to_string()),
+ }.into());
+ },
+ Ok(_) => {
+ assert!(!enacts_change, "returns Ok when no authority set change should be enacted; qed;");
+ },
+ }
+
+ Ok(())
+ }
+}
diff --git a/core/finality-grandpa/src/justification.rs b/core/finality-grandpa/src/justification.rs
new file mode 100644
index 0000000000000..867b0adb78772
--- /dev/null
+++ b/core/finality-grandpa/src/justification.rs
@@ -0,0 +1,218 @@
+// Copyright 2018-2019 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use std::collections::{HashMap, HashSet};
+
+use client::{CallExecutor, Client};
+use client::backend::Backend;
+use client::blockchain::HeaderBackend;
+use client::error::{Error as ClientError, ErrorKind as ClientErrorKind};
+use codec::Decode;
+use grandpa::VoterSet;
+use grandpa::{Error as GrandpaError};
+use runtime_primitives::generic::BlockId;
+use runtime_primitives::traits::{NumberFor, Block as BlockT, Header as HeaderT};
+use substrate_primitives::{H256, Ed25519AuthorityId, Blake2Hasher};
+
+use crate::{Commit, Error};
+use communication;
+
+/// A GRANDPA justification for block finality, it includes a commit message and
+/// an ancestry proof including all headers routing all precommit target blocks
+/// to the commit target block. Due to the current voting strategy the precommit
+/// targets should be the same as the commit target, since honest voters don't
+/// vote past authority set change blocks.
+///
+/// This is meant to be stored in the db and passed around the network to other
+/// nodes, and are used by syncing nodes to prove authority set handoffs.
+#[derive(Encode, Decode)]
+pub(crate) struct GrandpaJustification {
+ round: u64,
+ pub(crate) commit: Commit,
+ votes_ancestries: Vec,
+}
+
+impl> GrandpaJustification {
+ /// Create a GRANDPA justification from the given commit. This method
+ /// assumes the commit is valid and well-formed.
+ pub(crate) fn from_commit(
+ client: &Client,
+ round: u64,
+ commit: Commit,
+ ) -> Result, Error> where
+ B: Backend,
+ E: CallExecutor + Send + Sync,
+ RA: Send + Sync,
+ {
+ let mut votes_ancestries_hashes = HashSet::new();
+ let mut votes_ancestries = Vec::new();
+
+ let error = || {
+ let msg = "invalid precommits for target commit".to_string();
+ Err(Error::Client(ClientErrorKind::BadJustification(msg).into()))
+ };
+
+ for signed in commit.precommits.iter() {
+ let mut current_hash = signed.precommit.target_hash.clone();
+ loop {
+ if current_hash == commit.target_hash { break; }
+
+ match client.backend().blockchain().header(BlockId::Hash(current_hash))? {
+ Some(current_header) => {
+ if *current_header.number() <= commit.target_number {
+ return error();
+ }
+
+ let parent_hash = current_header.parent_hash().clone();
+ if votes_ancestries_hashes.insert(current_hash) {
+ votes_ancestries.push(current_header);
+ }
+ current_hash = parent_hash;
+ },
+ _ => return error(),
+ }
+ }
+ }
+
+ Ok(GrandpaJustification { round, commit, votes_ancestries })
+ }
+
+ /// Decode a GRANDPA justification and validate the commit and the votes'
+ /// ancestry proofs.
+ pub(crate) fn decode_and_verify(
+ encoded: Vec,
+ set_id: u64,
+ voters: &VoterSet,
+ ) -> Result, ClientError> where
+ NumberFor: grandpa::BlockNumberOps,
+ {
+ GrandpaJustification::::decode(&mut &*encoded).ok_or_else(|| {
+ let msg = "failed to decode grandpa justification".to_string();
+ ClientErrorKind::BadJustification(msg).into()
+ }).and_then(|just| just.verify(set_id, voters).map(|_| just))
+ }
+
+ /// Validate the commit and the votes' ancestry proofs.
+ pub(crate) fn verify(&self, set_id: u64, voters: &VoterSet) -> Result<(), ClientError>
+ where
+ NumberFor: grandpa::BlockNumberOps,
+ {
+ use grandpa::Chain;
+
+ let ancestry_chain = AncestryChain::::new(&self.votes_ancestries);
+
+ match grandpa::validate_commit(
+ &self.commit,
+ voters,
+ &ancestry_chain,
+ ) {
+ Ok(Some(_)) => {},
+ _ => {
+ let msg = "invalid commit in grandpa justification".to_string();
+ return Err(ClientErrorKind::BadJustification(msg).into());
+ }
+ }
+
+ let mut visited_hashes = HashSet::new();
+ for signed in self.commit.precommits.iter() {
+ if let Err(_) = communication::check_message_sig::(
+ &grandpa::Message::Precommit(signed.precommit.clone()),
+ &signed.id,
+ &signed.signature,
+ self.round,
+ set_id,
+ ) {
+ return Err(ClientErrorKind::BadJustification(
+ "invalid signature for precommit in grandpa justification".to_string()).into());
+ }
+
+ if self.commit.target_hash == signed.precommit.target_hash {
+ continue;
+ }
+
+ match ancestry_chain.ancestry(self.commit.target_hash, signed.precommit.target_hash) {
+ Ok(route) => {
+ // ancestry starts from parent hash but the precommit target hash has been visited
+ visited_hashes.insert(signed.precommit.target_hash);
+ for hash in route {
+ visited_hashes.insert(hash);
+ }
+ },
+ _ => {
+ return Err(ClientErrorKind::BadJustification(
+ "invalid precommit ancestry proof in grandpa justification".to_string()).into());
+ },
+ }
+ }
+
+ let ancestry_hashes = self.votes_ancestries
+ .iter()
+ .map(|h: &Block::Header| h.hash())
+ .collect();
+
+ if visited_hashes != ancestry_hashes {
+ return Err(ClientErrorKind::BadJustification(
+ "invalid precommit ancestries in grandpa justification with unused headers".to_string()).into());
+ }
+
+ Ok(())
+ }
+}
+
+/// A utility trait implementing `grandpa::Chain` using a given set of headers.
+/// This is useful when validating commits, using the given set of headers to
+/// verify a valid ancestry route to the target commit block.
+struct AncestryChain {
+ ancestry: HashMap,
+}
+
+impl AncestryChain {
+ fn new(ancestry: &[Block::Header]) -> AncestryChain {
+ let ancestry: HashMap<_, _> = ancestry
+ .iter()
+ .cloned()
+ .map(|h: Block::Header| (h.hash(), h))
+ .collect();
+
+ AncestryChain { ancestry }
+ }
+}
+
+impl grandpa::Chain> for AncestryChain where
+ NumberFor: grandpa::BlockNumberOps
+{
+ fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result, GrandpaError> {
+ let mut route = Vec::new();
+ let mut current_hash = block;
+ loop {
+ if current_hash == base { break; }
+ match self.ancestry.get(¤t_hash) {
+ Some(current_header) => {
+ current_hash = *current_header.parent_hash();
+ route.push(current_hash);
+ },
+ _ => return Err(GrandpaError::NotDescendent),
+ }
+ }
+ route.pop(); // remove the base
+
+ Ok(route)
+ }
+
+ fn best_chain_containing(&self, _block: Block::Hash) -> Option<(Block::Hash, NumberFor)> {
+ None
+ }
+}
diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs
index 08e95a2955de0..9da7a96d9acba 100644
--- a/core/finality-grandpa/src/lib.rs
+++ b/core/finality-grandpa/src/lib.rs
@@ -87,39 +87,35 @@ use futures::prelude::*;
use futures::sync::mpsc;
use client::{
BlockchainEvents, CallExecutor, Client, backend::Backend,
- error::Error as ClientError, error::ErrorKind as ClientErrorKind,
+ error::Error as ClientError,
};
use client::blockchain::HeaderBackend;
use codec::{Encode, Decode};
-use consensus_common::{BlockImport, JustificationImport, Error as ConsensusError, ErrorKind as ConsensusErrorKind, ImportBlock, ImportResult};
-use runtime_primitives::Justification;
use runtime_primitives::traits::{
NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT,
- DigestItemFor, DigestItem, As, One, Zero,
+ DigestItemFor, DigestItem,
};
use fg_primitives::GrandpaApi;
use runtime_primitives::generic::BlockId;
use substrate_primitives::{ed25519, H256, Ed25519AuthorityId, Blake2Hasher};
-use tokio::timer::Delay;
use grandpa::Error as GrandpaError;
-use grandpa::{voter, round::State as RoundState, Equivocation, BlockNumberOps, VoterSet};
+use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet};
use network::{Service as NetworkService, ExHashT};
-use network::consensus_gossip::{ConsensusMessage};
-use std::collections::{HashMap, HashSet};
-use std::fmt;
+use network::consensus_gossip::ConsensusMessage;
use std::sync::Arc;
-use std::time::{Instant, Duration};
-
-use authorities::SharedAuthoritySet;
-use until_imported::{UntilCommitBlocksImported, UntilVoteTargetImported};
+use std::time::Duration;
pub use fg_primitives::ScheduledChange;
mod authorities;
mod communication;
+mod consensus_changes;
+mod environment;
mod finality_proof;
+mod import;
+mod justification;
mod until_imported;
#[cfg(feature="service-integration")]
@@ -127,7 +123,12 @@ mod service_integration;
#[cfg(feature="service-integration")]
pub use service_integration::{LinkHalfForService, BlockImportForService};
+use authorities::SharedAuthoritySet;
+use consensus_changes::{ConsensusChanges, SharedConsensusChanges};
+use environment::{Environment, ExitOrError, NewAuthoritySet};
pub use finality_proof::{prove_finality, check_finality_proof};
+use import::GrandpaBlockImport;
+use until_imported::UntilCommitBlocksImported;
#[cfg(test)]
mod tests;
@@ -331,1031 +332,6 @@ impl, RA> BlockStatus for Arc {
- pending_changes: Vec<(N, H)>,
-}
-
-impl ConsensusChanges {
- /// Create empty consensus changes.
- pub fn empty() -> Self {
- ConsensusChanges { pending_changes: Vec::new(), }
- }
-
- /// Note unfinalized change of consensus-related data.
- pub fn note_change(&mut self, at: (N, H)) {
- let idx = self.pending_changes
- .binary_search_by_key(&at.0, |change| change.0)
- .unwrap_or_else(|i| i);
- self.pending_changes.insert(idx, at);
- }
-
- /// Finalize all pending consensus changes that are finalized by given block.
- /// Returns true if there any changes were finalized.
- pub fn finalize ::client::error::Result>>(
- &mut self,
- block: (N, H),
- canonical_at_height: F,
- ) -> ::client::error::Result<(bool, bool)> {
- let (split_idx, has_finalized_changes) = self.pending_changes.iter()
- .enumerate()
- .take_while(|(_, &(at_height, _))| at_height <= block.0)
- .fold((None, Ok(false)), |(_, has_finalized_changes), (idx, ref at)|
- (
- Some(idx),
- has_finalized_changes
- .and_then(|has_finalized_changes| if has_finalized_changes {
- Ok(has_finalized_changes)
- } else {
- canonical_at_height(at.0).map(|can_hash| Some(at.1) == can_hash)
- }),
- ));
-
- let altered_changes = split_idx.is_some();
- if let Some(split_idx) = split_idx {
- self.pending_changes = self.pending_changes.split_off(split_idx + 1);
- }
- has_finalized_changes.map(|has_finalized_changes| (altered_changes, has_finalized_changes))
- }
-}
-
-/// Thread-safe consensus changes tracker reference.
-type SharedConsensusChanges = Arc>>;
-
-/// The environment we run GRANDPA in.
-struct Environment, RA> {
- inner: Arc>,
- voters: Arc>,
- config: Config,
- authority_set: SharedAuthoritySet>,
- consensus_changes: SharedConsensusChanges>,
- network: N,
- set_id: u64,
-}
-
-impl, B, E, N, RA> grandpa::Chain> for Environment where
- Block: 'static,
- B: Backend + 'static,
- E: CallExecutor + 'static,
- N: Network + 'static,
- N::In: 'static,
- NumberFor: BlockNumberOps,
-{
- fn ancestry(&self, base: Block::Hash, block: Block::Hash) -> Result, GrandpaError> {
- if base == block { return Err(GrandpaError::NotDescendent) }
-
- let tree_route_res = ::client::blockchain::tree_route(
- self.inner.backend().blockchain(),
- BlockId::Hash(block),
- BlockId::Hash(base),
- );
-
- let tree_route = match tree_route_res {
- Ok(tree_route) => tree_route,
- Err(e) => {
- debug!(target: "afg", "Encountered error computing ancestry between block {:?} and base {:?}: {:?}",
- block, base, e);
-
- return Err(GrandpaError::NotDescendent);
- }
- };
-
- if tree_route.common_block().hash != base {
- return Err(GrandpaError::NotDescendent);
- }
-
- // skip one because our ancestry is meant to start from the parent of `block`,
- // and `tree_route` includes it.
- Ok(tree_route.retracted().iter().skip(1).map(|e| e.hash).collect())
- }
-
- fn best_chain_containing(&self, block: Block::Hash) -> Option<(Block::Hash, NumberFor)> {
- // NOTE: when we finalize an authority set change through the sync protocol the voter is
- // signalled asynchronously. therefore the voter could still vote in the next round
- // before activating the new set. the `authority_set` is updated immediately thus we
- // restrict the voter based on that.
- if self.set_id != self.authority_set.inner().read().current().0 {
- return None;
- }
-
- // we refuse to vote beyond the current limit number where transitions are scheduled to
- // occur.
- // once blocks are finalized that make that transition irrelevant or activate it,
- // we will proceed onwards. most of the time there will be no pending transition.
- let limit = self.authority_set.current_limit();
- debug!(target: "afg", "Finding best chain containing block {:?} with number limit {:?}", block, limit);
-
- match self.inner.best_containing(block, None) {
- Ok(Some(mut best_hash)) => {
- let base_header = self.inner.header(&BlockId::Hash(block)).ok()?
- .expect("Header known to exist after `best_containing` call; qed");
-
- if let Some(limit) = limit {
- // this is a rare case which might cause issues,
- // might be better to return the header itself.
- if *base_header.number() > limit {
- debug!(target: "afg", "Encountered error finding best chain containing {:?} with limit {:?}: target block is after limit",
- block,
- limit,
- );
- return None;
- }
- }
-
- let mut best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()?
- .expect("Header known to exist after `best_containing` call; qed");
-
- // we target a vote towards 3/4 of the unfinalized chain (rounding up)
- let target = {
- let two = NumberFor::::one() + One::one();
- let three = two + One::one();
- let four = three + One::one();
-
- let diff = *best_header.number() - *base_header.number();
- let diff = ((diff * three) + two) / four;
-
- *base_header.number() + diff
- };
-
- // unless our vote is currently being limited due to a pending change
- let target = limit.map(|limit| limit.min(target)).unwrap_or(target);
-
- // walk backwards until we find the target block
- loop {
- if *best_header.number() < target { unreachable!(); }
- if *best_header.number() == target {
- return Some((best_hash, *best_header.number()));
- }
-
- best_hash = *best_header.parent_hash();
- best_header = self.inner.header(&BlockId::Hash(best_hash)).ok()?
- .expect("Header known to exist after `best_containing` call; qed");
- }
- },
- Ok(None) => {
- debug!(target: "afg", "Encountered error finding best chain containing {:?}: couldn't find target block", block);
- None
- }
- Err(e) => {
- debug!(target: "afg", "Encountered error finding best chain containing {:?}: {:?}", block, e);
- None
- }
- }
- }
-}
-
-/// A new authority set along with the canonical block it changed at.
-#[derive(Debug)]
-struct NewAuthoritySet {
- canon_number: N,
- canon_hash: H,
- set_id: u64,
- authorities: Vec<(Ed25519AuthorityId, u64)>,
-}
-
-/// Signals either an early exit of a voter or an error.
-#[derive(Debug)]
-enum ExitOrError {
- /// An error occurred.
- Error(Error),
- /// Early exit of the voter: the new set ID and the new authorities along with respective weights.
- AuthoritiesChanged(NewAuthoritySet),
-}
-
-impl From for ExitOrError {
- fn from(e: Error) -> Self {
- ExitOrError::Error(e)
- }
-}
-
-impl From for ExitOrError {
- fn from(e: ClientError) -> Self {
- ExitOrError::Error(Error::Client(e))
- }
-}
-
-impl From for ExitOrError {
- fn from(e: grandpa::Error) -> Self {
- ExitOrError::Error(Error::from(e))
- }
-}
-
-impl ::std::error::Error for ExitOrError { }
-
-impl fmt::Display for ExitOrError {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match *self {
- ExitOrError::Error(ref e) => write!(f, "{:?}", e),
- ExitOrError::AuthoritiesChanged(_) => write!(f, "restarting voter on new authorities"),
- }
- }
-}
-
-impl, N, RA> voter::Environment> for Environment where
- Block: 'static,
- B: Backend + 'static,
- E: CallExecutor + 'static + Send + Sync,
- N: Network + 'static + Send,
- N::In: 'static + Send,
- RA: 'static + Send + Sync,
- NumberFor: BlockNumberOps,
-{
- type Timer = Box + Send>;
- type Id = Ed25519AuthorityId;
- type Signature = ed25519::Signature;
-
- // regular round message streams
- type In = Box, Self::Signature, Self::Id>,
- Error = Self::Error,
- > + Send>;
- type Out = Box>,
- SinkError = Self::Error,
- > + Send>;
-
- type Error = ExitOrError>;
-
- fn round_data(
- &self,
- round: u64
- ) -> voter::RoundData {
- let now = Instant::now();
- let prevote_timer = Delay::new(now + self.config.gossip_duration * 2);
- let precommit_timer = Delay::new(now + self.config.gossip_duration * 4);
-
- let incoming = ::communication::checked_message_stream::(
- round,
- self.set_id,
- self.network.messages_for(round, self.set_id),
- self.voters.clone(),
- );
-
- let local_key = self.config.local_key.as_ref()
- .filter(|pair| self.voters.contains_key(&pair.public().into()));
-
- let (out_rx, outgoing) = ::communication::outgoing_messages::(
- round,
- self.set_id,
- local_key.cloned(),
- self.voters.clone(),
- self.network.clone(),
- );
-
- // schedule incoming messages from the network to be held until
- // corresponding blocks are imported.
- let incoming = UntilVoteTargetImported::new(
- self.inner.import_notification_stream(),
- self.inner.clone(),
- incoming,
- );
-
- // join incoming network messages with locally originating ones.
- let incoming = Box::new(out_rx.select(incoming).map_err(Into::into));
-
- // schedule network message cleanup when sink drops.
- let outgoing = Box::new(outgoing.sink_map_err(Into::into));
-
- voter::RoundData {
- prevote_timer: Box::new(prevote_timer.map_err(|e| Error::Timer(e).into())),
- precommit_timer: Box::new(precommit_timer.map_err(|e| Error::Timer(e).into())),
- incoming,
- outgoing,
- }
- }
-
- fn completed(&self, round: u64, state: RoundState>) -> Result<(), Self::Error> {
- debug!(
- target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}",
- self.config.name(),
- round,
- self.set_id,
- state.estimate.as_ref().map(|e| e.1),
- state.finalized.as_ref().map(|e| e.1),
- );
-
- let encoded_state = (round, state).encode();
- let res = Backend::insert_aux(&**self.inner.backend(), &[(LAST_COMPLETED_KEY, &encoded_state[..])], &[]);
- if let Err(e) = res {
- warn!(target: "afg", "Shutting down voter due to error bookkeeping last completed round in DB: {:?}", e);
- Err(Error::Client(e).into())
- } else {
- Ok(())
- }
- }
-
- fn finalize_block(&self, hash: Block::Hash, number: NumberFor, round: u64, commit: Commit) -> Result<(), Self::Error> {
- finalize_block(
- &*self.inner,
- &self.authority_set,
- &self.consensus_changes,
- Some(As::sa(self.config.justification_period)),
- hash,
- number,
- (round, commit).into(),
- )
- }
-
- fn round_commit_timer(&self) -> Self::Timer {
- use rand::{thread_rng, Rng};
-
- //random between 0-1 seconds.
- let delay: u64 = thread_rng().gen_range(0, 1000);
- Box::new(Delay::new(
- Instant::now() + Duration::from_millis(delay)
- ).map_err(|e| Error::Timer(e).into()))
- }
-
- fn prevote_equivocation(
- &self,
- _round: u64,
- equivocation: ::grandpa::Equivocation, Self::Signature>
- ) {
- warn!(target: "afg", "Detected prevote equivocation in the finality worker: {:?}", equivocation);
- // nothing yet; this could craft misbehavior reports of some kind.
- }
-
- fn precommit_equivocation(
- &self,
- _round: u64,
- equivocation: Equivocation, Self::Signature>
- ) {
- warn!(target: "afg", "Detected precommit equivocation in the finality worker: {:?}", equivocation);
- // nothing yet
- }
-}
-
-/// A GRANDPA justification for block finality, it includes a commit message and
-/// an ancestry proof including all headers routing all precommit target blocks
-/// to the commit target block. Due to the current voting strategy the precommit
-/// targets should be the same as the commit target, since honest voters don't
-/// vote past authority set change blocks.
-///
-/// This is meant to be stored in the db and passed around the network to other
-/// nodes, and are used by syncing nodes to prove authority set handoffs.
-#[derive(Encode, Decode)]
-struct GrandpaJustification {
- round: u64,
- commit: Commit,
- votes_ancestries: Vec,
-}
-
-impl> GrandpaJustification {
- /// Create a GRANDPA justification from the given commit. This method
- /// assumes the commit is valid and well-formed.
- fn from_commit(
- client: &Client,
- round: u64,
- commit: Commit,
- ) -> Result, Error> where
- B: Backend,
- E: CallExecutor + Send + Sync,
- RA: Send + Sync,
- {
- let mut votes_ancestries_hashes = HashSet::new();
- let mut votes_ancestries = Vec::new();
-
- let error = || {
- let msg = "invalid precommits for target commit".to_string();
- Err(Error::Client(ClientErrorKind::BadJustification(msg).into()))
- };
-
- for signed in commit.precommits.iter() {
- let mut current_hash = signed.precommit.target_hash.clone();
- loop {
- if current_hash == commit.target_hash { break; }
-
- match client.backend().blockchain().header(BlockId::Hash(current_hash))? {
- Some(current_header) => {
- if *current_header.number() <= commit.target_number {
- return error();
- }
-
- let parent_hash = current_header.parent_hash().clone();
- if votes_ancestries_hashes.insert(current_hash) {
- votes_ancestries.push(current_header);
- }
- current_hash = parent_hash;
- },
- _ => return error(),
- }
- }
- }
-
- Ok(GrandpaJustification { round, commit, votes_ancestries })
- }
-
- /// Decode a GRANDPA justification and validate the commit and the votes'
- /// ancestry proofs.
- fn decode_and_verify(
- encoded: Vec,
- set_id: u64,
- voters: &VoterSet,
- ) -> Result, ClientError> where
- NumberFor: grandpa::BlockNumberOps,
- {
- GrandpaJustification::::decode(&mut &*encoded).ok_or_else(|| {
- let msg = "failed to decode grandpa justification".to_string();
- ClientErrorKind::BadJustification(msg).into()
- }).and_then(|just| just.verify(set_id, voters).map(|_| just))
- }
-
- /// Validate the commit and the votes' ancestry proofs.
- fn verify(&self, set_id: u64, voters: &VoterSet) -> Result<(), ClientError>
- where
- NumberFor: grandpa::BlockNumberOps,
- {
- use grandpa::Chain;
-
- let ancestry_chain = AncestryChain::::new(&self.votes_ancestries);
-
- match grandpa::validate_commit(
- &self.commit,
- voters,
- &ancestry_chain,
- ) {
- Ok(Some(_)) => {},
- _ => {
- let msg = "invalid commit in grandpa justification".to_string();
- return Err(ClientErrorKind::BadJustification(msg).into());
- }
- }
-
- let mut visited_hashes = HashSet::new();
- for signed in self.commit.precommits.iter() {
- if let Err(_) = communication::check_message_sig::(
- &grandpa::Message::Precommit(signed.precommit.clone()),
- &signed.id,
- &signed.signature,
- self.round,
- set_id,
- ) {
- return Err(ClientErrorKind::BadJustification(
- "invalid signature for precommit in grandpa justification".to_string()).into());
- }
-
- if self.commit.target_hash == signed.precommit.target_hash {
- continue;
- }
-
- match ancestry_chain.ancestry(self.commit.target_hash, signed.precommit.target_hash) {
- Ok(route) => {
- // ancestry starts from parent hash but the precommit target hash has been visited
- visited_hashes.insert(signed.precommit.target_hash);
- for hash in route {
- visited_hashes.insert(hash);
- }
- },
- _ => {
- return Err(ClientErrorKind::BadJustification(
- "invalid precommit ancestry proof in grandpa justification".to_string()).into());
- },
- }
- }
-
- let ancestry_hashes = self.votes_ancestries
- .iter()
- .map(|h: &Block::Header| h.hash())
- .collect();
-
- if visited_hashes != ancestry_hashes {
- return Err(ClientErrorKind::BadJustification(
- "invalid precommit ancestries in grandpa justification with unused headers".to_string()).into());
- }
-
- Ok(())
- }
-}
-
-enum JustificationOrCommit {
- Justification(GrandpaJustification),
- Commit((u64, Commit)),
-}
-
-impl From<(u64, Commit)> for JustificationOrCommit {
- fn from(commit: (u64, Commit)) -> JustificationOrCommit {
- JustificationOrCommit::Commit(commit)
- }
-}
-
-impl