diff --git a/Cargo.lock b/Cargo.lock index 29886a032fea..e7afec8f6a79 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -752,10 +752,11 @@ dependencies = [ [[package]] name = "finality-grandpa" -version = "0.6.0" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", + "hashmap_core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "num-traits 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -3867,7 +3868,7 @@ name = "substrate-finality-grandpa" version = "1.0.0" dependencies = [ "env_logger 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", - "finality-grandpa 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "finality-grandpa 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "fork-tree 1.0.0", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5175,7 +5176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum failure_derive 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "ea1063915fd7ef4309e222a5a07cf9c319fb9c7836b1f89b85458672dbb127e1" "checksum fake-simd 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" -"checksum finality-grandpa 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d415e902db2b87bd5a7df7a2b2de97a4566727a23b95ff39e1bfec25a66d4d1c" +"checksum finality-grandpa 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c9da35679ad45649e32e6344a08a36e71ba5f5305ba02d18c34d262c49ce0072" "checksum fixed-hash 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a557e80084b05c32b455963ff565a9de6f2866da023d6671705c6aff6f65e01c" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" diff --git a/core/finality-grandpa/Cargo.toml b/core/finality-grandpa/Cargo.toml index 46f5bbc538cf..4dcf199e1ea4 100644 --- a/core/finality-grandpa/Cargo.toml +++ b/core/finality-grandpa/Cargo.toml @@ -22,7 +22,7 @@ network = { package = "substrate-network", path = "../network" } service = { package = "substrate-service", path = "../service", optional = true } srml-finality-tracker = { path = "../../srml/finality-tracker" } fg_primitives = { package = "substrate-finality-grandpa-primitives", path = "primitives" } -grandpa = { package = "finality-grandpa", version = "0.6.0", features = ["derive-codec"] } +grandpa = { package = "finality-grandpa", version = "0.7.1", features = ["derive-codec"] } [dev-dependencies] network = { package = "substrate-network", path = "../network", features = ["test-helpers"] } diff --git a/core/finality-grandpa/src/authorities.rs b/core/finality-grandpa/src/authorities.rs index ffded9a1ab35..5379cfe4ef6e 100644 --- a/core/finality-grandpa/src/authorities.rs +++ b/core/finality-grandpa/src/authorities.rs @@ -19,7 +19,7 @@ use fork_tree::ForkTree; use parking_lot::RwLock; use substrate_primitives::ed25519; -use grandpa::VoterSet; +use grandpa::voter_set::VoterSet; use parity_codec::{Encode, Decode}; use log::{debug, info}; use substrate_telemetry::{telemetry, CONSENSUS_INFO}; diff --git a/core/finality-grandpa/src/aux_schema.rs b/core/finality-grandpa/src/aux_schema.rs index cb41d481e360..104284538c25 100644 --- a/core/finality-grandpa/src/aux_schema.rs +++ b/core/finality-grandpa/src/aux_schema.rs @@ -23,10 +23,12 @@ use client::backend::AuxStore; use client::error::{Result as ClientResult, Error as ClientError, ErrorKind as ClientErrorKind}; use fork_tree::ForkTree; use grandpa::round::State as RoundState; +use runtime_primitives::traits::{Block as BlockT, NumberFor}; use log::{info, warn}; use crate::authorities::{AuthoritySet, SharedAuthoritySet, PendingChange, DelayKind}; use crate::consensus_changes::{SharedConsensusChanges, ConsensusChanges}; +use crate::environment::{CompletedRound, CompletedRounds, HasVoted, SharedVoterSetState, VoterSetState}; use crate::NewAuthoritySet; use substrate_primitives::ed25519::Public as AuthorityId; @@ -36,28 +38,18 @@ const SET_STATE_KEY: &[u8] = b"grandpa_completed_round"; const AUTHORITY_SET_KEY: &[u8] = b"grandpa_voters"; const CONSENSUS_CHANGES_KEY: &[u8] = b"grandpa_consensus_changes"; -const CURRENT_VERSION: u32 = 1; +const CURRENT_VERSION: u32 = 2; /// The voter set state. #[derive(Debug, Clone, Encode, Decode)] #[cfg_attr(test, derive(PartialEq))] -pub enum VoterSetState { +pub enum V1VoterSetState { /// The voter set state, currently paused. Paused(u64, RoundState), /// The voter set state, currently live. Live(u64, RoundState), } -impl VoterSetState { - /// Yields the current state. - pub(crate) fn round(&self) -> (u64, RoundState) { - match *self { - VoterSetState::Paused(n, ref s) => (n, s.clone()), - VoterSetState::Live(n, ref s) => (n, s.clone()), - } - } -} - type V0VoterSetState = (u64, RoundState); #[derive(Debug, Clone, Encode, Decode, PartialEq)] @@ -124,72 +116,205 @@ fn load_decode(backend: &B, key: &[u8]) -> ClientResult< } /// Persistent data kept between runs. -pub(crate) struct PersistentData { - pub(crate) authority_set: SharedAuthoritySet, - pub(crate) consensus_changes: SharedConsensusChanges, - pub(crate) set_state: VoterSetState, +pub(crate) struct PersistentData { + pub(crate) authority_set: SharedAuthoritySet>, + pub(crate) consensus_changes: SharedConsensusChanges>, + pub(crate) set_state: SharedVoterSetState, +} + +fn migrate_from_version0( + backend: &B, + genesis_round: &G, +) -> ClientResult>, + VoterSetState, +)>> where B: AuxStore, + G: Fn() -> RoundState>, +{ + CURRENT_VERSION.using_encoded(|s| + backend.insert_aux(&[(VERSION_KEY, s)], &[]) + )?; + + if let Some(old_set) = load_decode::<_, V0AuthoritySet>>( + backend, + AUTHORITY_SET_KEY, + )? { + let new_set: AuthoritySet> = old_set.into(); + backend.insert_aux(&[(AUTHORITY_SET_KEY, new_set.encode().as_slice())], &[])?; + + let (last_round_number, last_round_state) = match load_decode::<_, V0VoterSetState>>( + backend, + SET_STATE_KEY, + )? { + Some((number, state)) => (number, state), + None => (0, genesis_round()), + }; + + let base = last_round_state.prevote_ghost + .expect("state is for completed round; completed rounds must have a prevote ghost; qed."); + + let set_state = VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: last_round_number, + state: last_round_state, + votes: Vec::new(), + base, + }), + current_round: HasVoted::No, + }; + + backend.insert_aux(&[(SET_STATE_KEY, set_state.encode().as_slice())], &[])?; + + return Ok(Some((new_set, set_state))); + } + + Ok(None) +} + +fn migrate_from_version1( + backend: &B, + genesis_round: &G, +) -> ClientResult>, + VoterSetState, +)>> where B: AuxStore, + G: Fn() -> RoundState>, +{ + CURRENT_VERSION.using_encoded(|s| + backend.insert_aux(&[(VERSION_KEY, s)], &[]) + )?; + + if let Some(set) = load_decode::<_, AuthoritySet>>( + backend, + AUTHORITY_SET_KEY, + )? { + let set_state = match load_decode::<_, V1VoterSetState>>( + backend, + SET_STATE_KEY, + )? { + Some(V1VoterSetState::Paused(last_round_number, set_state)) => { + let base = set_state.prevote_ghost + .expect("state is for completed round; completed rounds must have a prevote ghost; qed."); + + VoterSetState::Paused { + completed_rounds: CompletedRounds::new(CompletedRound { + number: last_round_number, + state: set_state, + votes: Vec::new(), + base, + }), + } + }, + Some(V1VoterSetState::Live(last_round_number, set_state)) => { + let base = set_state.prevote_ghost + .expect("state is for completed round; completed rounds must have a prevote ghost; qed."); + + VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: last_round_number, + state: set_state, + votes: Vec::new(), + base, + }), + current_round: HasVoted::No, + } + }, + None => { + let set_state = genesis_round(); + let base = set_state.prevote_ghost + .expect("state is for completed round; completed rounds must have a prevote ghost; qed."); + + VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: 0, + state: set_state, + votes: Vec::new(), + base, + }), + current_round: HasVoted::No, + } + }, + }; + + backend.insert_aux(&[(SET_STATE_KEY, set_state.encode().as_slice())], &[])?; + + return Ok(Some((set, set_state))); + } + + Ok(None) } /// Load or initialize persistent data from backend. -pub(crate) fn load_persistent( +pub(crate) fn load_persistent( backend: &B, - genesis_hash: H, - genesis_number: N, + genesis_hash: Block::Hash, + genesis_number: NumberFor, genesis_authorities: G, ) - -> ClientResult> + -> ClientResult> where B: AuxStore, - H: Debug + Decode + Encode + Clone + PartialEq, - N: Debug + Decode + Encode + Clone + Ord, - G: FnOnce() -> ClientResult> + G: FnOnce() -> ClientResult>, { let version: Option = load_decode(backend, VERSION_KEY)?; let consensus_changes = load_decode(backend, CONSENSUS_CHANGES_KEY)? - .unwrap_or_else(ConsensusChanges::::empty); + .unwrap_or_else(ConsensusChanges::>::empty); let make_genesis_round = move || RoundState::genesis((genesis_hash, genesis_number)); match version { None => { - CURRENT_VERSION.using_encoded(|s| - backend.insert_aux(&[(VERSION_KEY, s)], &[]) - )?; - - if let Some(old_set) = load_decode::<_, V0AuthoritySet>(backend, AUTHORITY_SET_KEY)? { - let new_set: AuthoritySet = old_set.into(); - backend.insert_aux(&[(AUTHORITY_SET_KEY, new_set.encode().as_slice())], &[])?; - - let set_state = match load_decode::<_, V0VoterSetState>(backend, SET_STATE_KEY)? { - Some((number, state)) => { - let set_state = VoterSetState::Live(number, state); - backend.insert_aux(&[(SET_STATE_KEY, set_state.encode().as_slice())], &[])?; - set_state - }, - None => VoterSetState::Live(0, make_genesis_round()), - }; - + if let Some((new_set, set_state)) = migrate_from_version0::(backend, &make_genesis_round)? { return Ok(PersistentData { authority_set: new_set.into(), consensus_changes: Arc::new(consensus_changes.into()), - set_state, + set_state: set_state.into(), }); } - } + }, Some(1) => { - if let Some(set) = load_decode::<_, AuthoritySet>(backend, AUTHORITY_SET_KEY)? { - let set_state = match load_decode::<_, VoterSetState>(backend, SET_STATE_KEY)? { + if let Some((new_set, set_state)) = migrate_from_version1::(backend, &make_genesis_round)? { + return Ok(PersistentData { + authority_set: new_set.into(), + consensus_changes: Arc::new(consensus_changes.into()), + set_state: set_state.into(), + }); + } + }, + Some(2) => { + if let Some(set) = load_decode::<_, AuthoritySet>>( + backend, + AUTHORITY_SET_KEY, + )? { + let set_state = match load_decode::<_, VoterSetState>( + backend, + SET_STATE_KEY, + )? { Some(state) => state, - None => VoterSetState::Live(0, make_genesis_round()), + None => { + let state = make_genesis_round(); + let base = state.prevote_ghost + .expect("state is for completed round; completed rounds must have a prevote ghost; qed."); + + VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: 0, + votes: Vec::new(), + base, + state, + }), + current_round: HasVoted::No, + } + } }; return Ok(PersistentData { authority_set: set.into(), consensus_changes: Arc::new(consensus_changes.into()), - set_state, + set_state: set_state.into(), }); } - } + }, Some(other) => return Err(ClientErrorKind::Backend( format!("Unsupported GRANDPA DB version: {:?}", other) ).into()), @@ -200,7 +325,19 @@ pub(crate) fn load_persistent( from genesis on what appears to be first startup."); let genesis_set = AuthoritySet::genesis(genesis_authorities()?); - let genesis_state = VoterSetState::Live(0, make_genesis_round()); + let state = make_genesis_round(); + let base = state.prevote_ghost + .expect("state is for completed round; completed rounds must have a prevote ghost; qed."); + + let genesis_state = VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: 0, + votes: Vec::new(), + state, + base, + }), + current_round: HasVoted::No, + }; backend.insert_aux( &[ (AUTHORITY_SET_KEY, genesis_set.encode().as_slice()), @@ -211,19 +348,17 @@ pub(crate) fn load_persistent( Ok(PersistentData { authority_set: genesis_set.into(), - set_state: genesis_state, + set_state: genesis_state.into(), consensus_changes: Arc::new(consensus_changes.into()), }) } /// Update the authority set on disk after a change. -pub(crate) fn update_authority_set( - set: &AuthoritySet, - new_set: Option<&NewAuthoritySet>, +pub(crate) fn update_authority_set( + set: &AuthoritySet>, + new_set: Option<&NewAuthoritySet>>, write_aux: F ) -> R where - H: Encode + Clone, - N: Encode + Clone, F: FnOnce(&[(&'static [u8], &[u8])]) -> R, { // write new authority set state to disk. @@ -237,7 +372,15 @@ pub(crate) fn update_authority_set( new_set.canon_hash.clone(), new_set.canon_number.clone(), )); - let set_state = VoterSetState::Live(0, round_state); + let set_state = VoterSetState::::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: 0, + state: round_state, + votes: Vec::new(), + base: (new_set.canon_hash, new_set.canon_number), + }), + current_round: HasVoted::No, + }; let encoded = set_state.encode(); write_aux(&[ @@ -250,10 +393,10 @@ pub(crate) fn update_authority_set( } /// Write voter set state. -pub(crate) fn write_voter_set_state(backend: &B, state: &VoterSetState) - -> ClientResult<()> - where B: AuxStore, H: Encode, N: Encode -{ +pub(crate) fn write_voter_set_state( + backend: &B, + state: &VoterSetState, +) -> ClientResult<()> { backend.insert_aux( &[(SET_STATE_KEY, state.encode().as_slice())], &[] @@ -286,14 +429,14 @@ mod test { use super::*; #[test] - fn load_decode_migrates_data_format() { + fn load_decode_from_v0_migrates_data_format() { let client = test_client::new(); let authorities = vec![(AuthorityId::default(), 100)]; let set_id = 3; - let round_number = 42; + let round_number: u64 = 42; let round_state = RoundState:: { - prevote_ghost: None, + prevote_ghost: Some((H256::random(), 32)), finalized: None, estimate: None, completable: false, @@ -323,19 +466,102 @@ mod test { ); // should perform the migration - load_persistent( + load_persistent::( + &client, + H256::random(), + 0, + || unreachable!(), + ).unwrap(); + + assert_eq!( + load_decode::<_, u32>(&client, VERSION_KEY).unwrap(), + Some(2), + ); + + let PersistentData { authority_set, set_state, .. } = load_persistent::( &client, H256::random(), 0, || unreachable!(), ).unwrap(); + assert_eq!( + *authority_set.inner().read(), + AuthoritySet { + current_authorities: authorities, + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + set_id, + }, + ); + + assert_eq!( + &*set_state.read(), + &VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: round_number, + state: round_state.clone(), + base: round_state.prevote_ghost.unwrap(), + votes: vec![], + }), + current_round: HasVoted::No, + }, + ); + } + + #[test] + fn load_decode_from_v1_migrates_data_format() { + let client = test_client::new(); + + let authorities = vec![(AuthorityId::default(), 100)]; + let set_id = 3; + let round_number: u64 = 42; + let round_state = RoundState:: { + prevote_ghost: Some((H256::random(), 32)), + finalized: None, + estimate: None, + completable: false, + }; + + { + let authority_set = AuthoritySet:: { + current_authorities: authorities.clone(), + pending_standard_changes: ForkTree::new(), + pending_forced_changes: Vec::new(), + set_id, + }; + + let voter_set_state = V1VoterSetState::Live(round_number, round_state.clone()); + + client.insert_aux( + &[ + (AUTHORITY_SET_KEY, authority_set.encode().as_slice()), + (SET_STATE_KEY, voter_set_state.encode().as_slice()), + (VERSION_KEY, 1u32.encode().as_slice()), + ], + &[], + ).unwrap(); + } + assert_eq!( load_decode::<_, u32>(&client, VERSION_KEY).unwrap(), Some(1), ); - let PersistentData { authority_set, set_state, .. } = load_persistent( + // should perform the migration + load_persistent::( + &client, + H256::random(), + 0, + || unreachable!(), + ).unwrap(); + + assert_eq!( + load_decode::<_, u32>(&client, VERSION_KEY).unwrap(), + Some(2), + ); + + let PersistentData { authority_set, set_state, .. } = load_persistent::( &client, H256::random(), 0, @@ -353,8 +579,16 @@ mod test { ); assert_eq!( - set_state, - VoterSetState::Live(round_number, round_state), + &*set_state.read(), + &VoterSetState::Live { + completed_rounds: CompletedRounds::new(CompletedRound { + number: round_number, + state: round_state.clone(), + base: round_state.prevote_ghost.unwrap(), + votes: vec![], + }), + current_round: HasVoted::No, + }, ); } } diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs index 6d006039f3dc..1770d41adfd9 100644 --- a/core/finality-grandpa/src/communication/mod.rs +++ b/core/finality-grandpa/src/communication/mod.rs @@ -29,8 +29,8 @@ use std::sync::Arc; -use grandpa::VoterSet; -use grandpa::Message::{Prevote, Precommit}; +use grandpa::voter_set::VoterSet; +use grandpa::Message::{Prevote, Precommit, PrimaryPropose}; use futures::prelude::*; use futures::sync::{oneshot, mpsc}; use log::{debug, trace}; @@ -43,6 +43,7 @@ use network::{consensus_gossip as network_gossip, Service as NetworkService,}; use network_gossip::ConsensusMessage; use crate::{Error, Message, SignedMessage, Commit, CompactCommit}; +use crate::environment::HasVoted; use gossip::{ GossipMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator }; @@ -184,7 +185,7 @@ impl> NetworkBridge { set_id: SetId, voters: Arc>, local_key: Option>, - has_voted: HasVoted, + has_voted: HasVoted, ) -> ( impl Stream,Error=Error>, impl Sink,SinkError=Error>, @@ -220,6 +221,13 @@ impl> NetworkBridge { } match &msg.message.message { + PrimaryPropose(propose) => { + telemetry!(CONSENSUS_INFO; "afg.received_propose"; + "voter" => ?format!("{}", msg.message.id), + "target_number" => ?propose.target_number, + "target_hash" => ?propose.target_hash, + ); + }, Prevote(prevote) => { telemetry!(CONSENSUS_INFO; "afg.received_prevote"; "voter" => ?format!("{}", msg.message.id), @@ -363,55 +371,20 @@ pub(crate) fn check_message_sig( } } -/// Whether we've voted already during a prior run of the program. -#[derive(Decode, Encode)] -pub(crate) enum HasVoted { - /// Has not voted already in this round. - #[codec(index = "0")] - No, - /// Has cast a proposal. - #[codec(index = "1")] - Proposed, - /// Has cast a prevote. - #[codec(index = "2")] - Prevoted, - /// Has cast a precommit (implies prevote.) - #[codec(index = "3")] - Precommitted, -} - -impl HasVoted { - #[allow(unused)] - fn can_propose(&self) -> bool { - match *self { - HasVoted::No => true, - HasVoted::Proposed | HasVoted::Prevoted | HasVoted::Precommitted => false, - } - } - - fn can_prevote(&self) -> bool { - match *self { - HasVoted::No | HasVoted::Proposed => true, - HasVoted::Prevoted | HasVoted::Precommitted => false, - } - } - - fn can_precommit(&self) -> bool { - match *self { - HasVoted::No | HasVoted::Proposed | HasVoted::Prevoted => true, - HasVoted::Precommitted => false, - } - } -} - -/// A sink for outgoing messages to the network. +/// A sink for outgoing messages to the network. Any messages that are sent will +/// be replaced, as appropriate, according to the given `HasVoted`. +/// NOTE: The votes are stored unsigned, which means that the signatures need to +/// be "stable", i.e. we should end up with the exact same signed message if we +/// use the same raw message and key to sign. This is currently true for +/// `ed25519` and `BLS` signatures (which we might use in the future), care must +/// be taken when switching to different key types. struct OutgoingMessages> { round: u64, set_id: u64, locals: Option<(Arc, AuthorityId)>, sender: mpsc::UnboundedSender>, network: N, - has_voted: HasVoted, + has_voted: HasVoted, } impl> Sink for OutgoingMessages @@ -419,15 +392,25 @@ impl> Sink for OutgoingMessages type SinkItem = Message; type SinkError = Error; - fn start_send(&mut self, msg: Message) -> StartSend, Error> { - // only sign if we haven't voted in this round already. - let should_sign = match msg { - grandpa::Message::Prevote(_) => self.has_voted.can_prevote(), - grandpa::Message::Precommit(_) => self.has_voted.can_precommit(), - }; + fn start_send(&mut self, mut msg: Message) -> StartSend, Error> { + // if we've voted on this round previously under the same key, send that vote instead + match &mut msg { + grandpa::Message::PrimaryPropose(ref mut vote) => + if let Some(propose) = self.has_voted.propose() { + *vote = propose.clone(); + }, + grandpa::Message::Prevote(ref mut vote) => + if let Some(prevote) = self.has_voted.prevote() { + *vote = prevote.clone(); + }, + grandpa::Message::Precommit(ref mut vote) => + if let Some(precommit) = self.has_voted.precommit() { + *vote = precommit.clone(); + }, + } // when locals exist, sign messages on import - if let (true, &Some((ref pair, ref local_id))) = (should_sign, &self.locals) { + if let Some((ref pair, ref local_id)) = self.locals { let encoded = localized_payload(self.round, self.set_id, &msg); let signature = pair.sign(&encoded[..]); diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index 556b4aead76e..2912cd580897 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -14,11 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use std::collections::VecDeque; +use std::iter::FromIterator; use std::sync::Arc; use std::time::{Duration, Instant}; use log::{debug, warn, info}; -use parity_codec::Encode; +use parity_codec::{Decode, Encode}; use futures::prelude::*; use tokio::timer::Delay; use parking_lot::RwLock; @@ -27,7 +29,8 @@ use client::{ backend::Backend, BlockchainEvents, CallExecutor, Client, error::Error as ClientError }; use grandpa::{ - BlockNumberOps, Equivocation, Error as GrandpaError, round::State as RoundState, voter, VoterSet, + BlockNumberOps, Equivocation, Error as GrandpaError, round::State as RoundState, + voter, voter_set::VoterSet, }; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{ @@ -37,8 +40,8 @@ use substrate_primitives::{Blake2Hasher, ed25519, H256, Pair}; use substrate_telemetry::{telemetry, CONSENSUS_INFO}; use crate::{ - Commit, Config, Error, Network, Precommit, Prevote, - CommandOrError, NewAuthoritySet, VoterCommand, + CommandOrError, Commit, Config, Error, Network, Precommit, Prevote, + PrimaryPropose, SignedMessage, NewAuthoritySet, VoterCommand, }; use crate::authorities::SharedAuthoritySet; @@ -49,27 +52,210 @@ use crate::until_imported::UntilVoteTargetImported; use ed25519::Public as AuthorityId; /// Data about a completed round. -pub(crate) type CompletedRound = (u64, RoundState); +#[derive(Debug, Clone, Decode, Encode, PartialEq)] +pub struct CompletedRound { + /// The round number. + pub number: u64, + /// The round state (prevote ghost, estimate, finalized, etc.) + pub state: RoundState>, + /// The target block base used for voting in the round. + pub base: (Block::Hash, NumberFor), + /// All the votes observed in the round. + pub votes: Vec>, +} + +// Data about last completed rounds. Stores NUM_LAST_COMPLETED_ROUNDS and always +// contains data about at least one round (genesis). +#[derive(Debug, Clone, PartialEq)] +pub struct CompletedRounds { + inner: VecDeque>, +} + +// NOTE: the current strategy for persisting completed rounds is very naive +// (update everything) and we also rely on cloning to do atomic updates, +// therefore this value should be kept small for now. +const NUM_LAST_COMPLETED_ROUNDS: usize = 2; + +impl Encode for CompletedRounds { + fn encode(&self) -> Vec { + Vec::from_iter(&self.inner).encode() + } +} + +impl Decode for CompletedRounds { + fn decode(value: &mut I) -> Option { + Vec::>::decode(value) + .map(|completed_rounds| CompletedRounds { + inner: completed_rounds.into(), + }) + } +} + +impl CompletedRounds { + /// Create a new completed rounds tracker with NUM_LAST_COMPLETED_ROUNDS capacity. + pub fn new(genesis: CompletedRound) -> CompletedRounds { + let mut inner = VecDeque::with_capacity(NUM_LAST_COMPLETED_ROUNDS); + inner.push_back(genesis); + CompletedRounds { inner } + } + + /// Returns the last (latest) completed round. + pub fn last(&self) -> &CompletedRound { + self.inner.back() + .expect("inner is never empty; always contains at least genesis; qed") + } + + /// Push a new completed round, returns false if the given round is older + /// than the last completed round. + pub fn push(&mut self, completed_round: CompletedRound) -> bool { + if self.last().number >= completed_round.number { + return false; + } + + if self.inner.len() == NUM_LAST_COMPLETED_ROUNDS { + self.inner.pop_front(); + } + + self.inner.push_back(completed_round); + + true + } +} + +/// The state of the current voter set, whether it is currently active or not +/// and information related to the previously completed rounds. Current round +/// voting status is used when restarting the voter, i.e. it will re-use the +/// previous votes for a given round if appropriate (same round and same local +/// key). +#[derive(Debug, Decode, Encode, PartialEq)] +pub enum VoterSetState { + /// The voter is live, i.e. participating in rounds. + Live { + /// The previously completed rounds. + completed_rounds: CompletedRounds, + /// Vote status for the current round. + current_round: HasVoted, + }, + /// The voter is paused, i.e. not casting or importing any votes. + Paused { + /// The previously completed rounds. + completed_rounds: CompletedRounds, + }, +} + +impl VoterSetState { + /// Returns the last completed rounds. + pub(crate) fn completed_rounds(&self) -> CompletedRounds { + match self { + VoterSetState::Live { completed_rounds, .. } => + completed_rounds.clone(), + VoterSetState::Paused { completed_rounds } => + completed_rounds.clone(), + } + } +} + +/// Whether we've voted already during a prior run of the program. +#[derive(Debug, Decode, Encode, PartialEq)] +pub enum HasVoted { + /// Has not voted already in this round. + No, + /// Has voted in this round. + Yes(AuthorityId, Vote), +} + +/// The votes cast by this voter already during a prior run of the program. +#[derive(Debug, Clone, Decode, Encode, PartialEq)] +pub enum Vote { + /// Has cast a proposal. + Propose(PrimaryPropose), + /// Has cast a prevote. + Prevote(Option>, Prevote), + /// Has cast a precommit (implies prevote.) + Precommit(Option>, Prevote, Precommit), +} + +impl HasVoted { + /// Returns the proposal we should vote with (if any.) + pub fn propose(&self) -> Option<&PrimaryPropose> { + match self { + HasVoted::Yes(_, Vote::Propose(propose)) => + Some(propose), + HasVoted::Yes(_, Vote::Prevote(propose, _)) | HasVoted::Yes(_, Vote::Precommit(propose, _, _)) => + propose.as_ref(), + _ => None, + } + } + + /// Returns the prevote we should vote with (if any.) + pub fn prevote(&self) -> Option<&Prevote> { + match self { + HasVoted::Yes(_, Vote::Prevote(_, prevote)) | HasVoted::Yes(_, Vote::Precommit(_, prevote, _)) => + Some(prevote), + _ => None, + } + } + + /// Returns the precommit we should vote with (if any.) + pub fn precommit(&self) -> Option<&Precommit> { + match self { + HasVoted::Yes(_, Vote::Precommit(_, _, precommit)) => + Some(precommit), + _ => None, + } + } + + /// Returns true if the voter can still propose, false otherwise. + pub fn can_propose(&self) -> bool { + self.propose().is_none() + } + + /// Returns true if the voter can still prevote, false otherwise. + pub fn can_prevote(&self) -> bool { + self.prevote().is_none() + } + + /// Returns true if the voter can still precommit, false otherwise. + pub fn can_precommit(&self) -> bool { + self.precommit().is_none() + } +} + +/// A voter set state meant to be shared safely across multiple owners. +#[derive(Clone)] +pub struct SharedVoterSetState { + inner: Arc>>, +} -/// A read-only view of the last completed round. -pub(crate) struct LastCompletedRound { - inner: RwLock>, +impl From> for SharedVoterSetState { + fn from(set_state: VoterSetState) -> Self { + SharedVoterSetState::new(set_state) + } } -impl LastCompletedRound { - /// Create a new tracker based on some starting last-completed round. - pub(crate) fn new(round: CompletedRound) -> Self { - LastCompletedRound { inner: RwLock::new(round) } +impl SharedVoterSetState { + /// Create a new shared voter set tracker with the given state. + pub(crate) fn new(state: VoterSetState) -> Self { + SharedVoterSetState { inner: Arc::new(RwLock::new(state)) } } - /// Read the last completed round. - pub(crate) fn read(&self) -> CompletedRound { - self.inner.read().clone() + /// Read the inner voter set state. + pub(crate) fn read(&self) -> parking_lot::RwLockReadGuard> { + self.inner.read() + } + + /// Return vote status information for the current round. + pub(crate) fn has_voted(&self) -> HasVoted { + match &*self.inner.read() { + VoterSetState::Live { current_round: HasVoted::Yes(id, vote), .. } => + HasVoted::Yes(id.clone(), vote.clone()), + _ => HasVoted::No, + } } // NOTE: not exposed outside of this module intentionally. fn with(&self, f: F) -> R - where F: FnOnce(&mut CompletedRound) -> R + where F: FnOnce(&mut VoterSetState) -> R { f(&mut *self.inner.write()) } @@ -84,7 +270,23 @@ pub(crate) struct Environment, RA> { pub(crate) consensus_changes: SharedConsensusChanges>, pub(crate) network: crate::communication::NetworkBridge, pub(crate) set_id: u64, - pub(crate) last_completed: LastCompletedRound>, + pub(crate) voter_set_state: SharedVoterSetState, +} + +impl, RA> Environment { + /// Updates the voter set state using the given closure. The write lock is + /// held during evaluation of the closure and the environment's voter set + /// state is set to its result if successful. + pub(crate) fn update_voter_set_state(&self, f: F) -> Result<(), Error> where + F: FnOnce(&VoterSetState) -> Result>, Error> + { + self.voter_set_state.with(|voter_set_state| { + if let Some(set_state) = f(&voter_set_state)? { + *voter_set_state = set_state; + } + Ok(()) + }) + } } impl, B, E, N, RA> grandpa::Chain> for Environment where @@ -226,7 +428,7 @@ impl, N, RA> voter::Environment voter::RoundData { + ) -> voter::RoundData { let now = Instant::now(); let prevote_timer = Delay::new(now + self.config.gossip_duration * 2); let precommit_timer = Delay::new(now + self.config.gossip_duration * 4); @@ -239,7 +441,7 @@ impl, N, RA> voter::Environment, N, RA> voter::Environment, N, RA> voter::Environment>) -> Result<(), Self::Error> { + fn proposed(&self, _round: u64, propose: PrimaryPropose) -> Result<(), Self::Error> { + let local_id = self.config.local_key.as_ref() + .map(|pair| pair.public().into()) + .filter(|id| self.voters.contains_key(&id)); + + let local_id = match local_id { + Some(id) => id, + None => return Ok(()), + }; + + self.update_voter_set_state(|voter_set_state| { + let completed_rounds = match voter_set_state { + VoterSetState::Live { completed_rounds, current_round: HasVoted::No } => completed_rounds, + VoterSetState::Live { current_round, .. } if !current_round.can_propose() => { + // we've already proposed in this round (in a previous run), + // ignore the given vote and don't update the voter set + // state + return Ok(None); + }, + _ => { + let msg = "Voter proposing after prevote/precommit or while paused."; + return Err(Error::Safety(msg.to_string())); + }, + }; + + let set_state = VoterSetState::::Live { + completed_rounds: completed_rounds.clone(), + current_round: HasVoted::Yes(local_id, Vote::Propose(propose)), + }; + + crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?; + + Ok(Some(set_state)) + })?; + + Ok(()) + } + + fn prevoted(&self, _round: u64, prevote: Prevote) -> Result<(), Self::Error> { + let local_id = self.config.local_key.as_ref() + .map(|pair| pair.public().into()) + .filter(|id| self.voters.contains_key(&id)); + + let local_id = match local_id { + Some(id) => id, + None => return Ok(()), + }; + + self.update_voter_set_state(|voter_set_state| { + let (completed_rounds, propose) = match voter_set_state { + VoterSetState::Live { completed_rounds, current_round: HasVoted::No } => + (completed_rounds, None), + VoterSetState::Live { completed_rounds, current_round: HasVoted::Yes(_, Vote::Propose(propose)) } => + (completed_rounds, Some(propose)), + VoterSetState::Live { current_round, .. } if !current_round.can_prevote() => { + // we've already prevoted in this round (in a previous run), + // ignore the given vote and don't update the voter set + // state + return Ok(None); + }, + _ => { + let msg = "Voter prevoting after precommit or while paused."; + return Err(Error::Safety(msg.to_string())); + }, + }; + + let set_state = VoterSetState::::Live { + completed_rounds: completed_rounds.clone(), + current_round: HasVoted::Yes(local_id, Vote::Prevote(propose.cloned(), prevote)), + }; + + crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?; + + Ok(Some(set_state)) + })?; + + Ok(()) + } + + fn precommitted(&self, _round: u64, precommit: Precommit) -> Result<(), Self::Error> { + let local_id = self.config.local_key.as_ref() + .map(|pair| pair.public().into()) + .filter(|id| self.voters.contains_key(&id)); + + let local_id = match local_id { + Some(id) => id, + None => return Ok(()), + }; + + self.update_voter_set_state(|voter_set_state| { + let (completed_rounds, propose, prevote) = match voter_set_state { + VoterSetState::Live { completed_rounds, current_round: HasVoted::Yes(_, Vote::Prevote(propose, prevote)) } => + (completed_rounds, propose, prevote), + VoterSetState::Live { current_round, .. } if !current_round.can_precommit() => { + // we've already precommitted in this round (in a previous run), + // ignore the given vote and don't update the voter set + // state + return Ok(None); + }, + _ => { + let msg = "Voter precommitting while paused."; + return Err(Error::Safety(msg.to_string())); + } + }; + + let set_state = VoterSetState::::Live { + completed_rounds: completed_rounds.clone(), + current_round: HasVoted::Yes(local_id, Vote::Precommit(propose.clone(), prevote.clone(), precommit)), + }; + + crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?; + + Ok(Some(set_state)) + })?; + + Ok(()) + } + + fn completed( + &self, + round: u64, + state: RoundState>, + base: (Block::Hash, NumberFor), + votes: Vec>, + ) -> Result<(), Self::Error> { debug!( target: "afg", "Voter {} completed round {} in set {}. Estimate = {:?}, Finalized in round = {:?}", self.config.name(), @@ -271,13 +598,31 @@ impl, N, RA> voter::Environment::Live { + completed_rounds, + current_round: HasVoted::No, + }; + crate::aux_schema::write_voter_set_state(&**self.inner.backend(), &set_state)?; - *last_completed = (round, state); // after writing to DB successfully. - Ok(()) - }) + Ok(Some(set_state)) + })?; + + Ok(()) } fn finalize_block(&self, hash: Block::Hash, number: NumberFor, round: u64, commit: Commit) -> Result<(), Self::Error> { @@ -498,7 +843,7 @@ pub(crate) fn finalize_block, E, RA>( }; if status.changed { - let write_result = crate::aux_schema::update_authority_set( + let write_result = crate::aux_schema::update_authority_set::( &authority_set, new_authorities.as_ref(), |insert| client.apply_aux(import_op, insert, &[]), diff --git a/core/finality-grandpa/src/finality_proof.rs b/core/finality-grandpa/src/finality_proof.rs index 2b34a094a064..2a28cca3a84d 100644 --- a/core/finality-grandpa/src/finality_proof.rs +++ b/core/finality-grandpa/src/finality_proof.rs @@ -29,7 +29,7 @@ //! The caller should track the `set_id`. The most straightforward way is to fetch finality //! proofs ONLY for blocks on the tip of the chain and track the latest known `set_id`. -use grandpa::VoterSet; +use grandpa::voter_set::VoterSet; use client::{ blockchain::Backend as BlockchainBackend, diff --git a/core/finality-grandpa/src/import.rs b/core/finality-grandpa/src/import.rs index 960ab3140d7c..34e266ddd349 100644 --- a/core/finality-grandpa/src/import.rs +++ b/core/finality-grandpa/src/import.rs @@ -362,7 +362,7 @@ impl, RA, PRA> GrandpaBlockImport None, }; - crate::aux_schema::update_authority_set( + crate::aux_schema::update_authority_set::( authorities, authorities_change, |insert| block.auxiliary.extend( diff --git a/core/finality-grandpa/src/justification.rs b/core/finality-grandpa/src/justification.rs index dce05296a8d2..57ea3a344cd8 100644 --- a/core/finality-grandpa/src/justification.rs +++ b/core/finality-grandpa/src/justification.rs @@ -21,7 +21,7 @@ use client::backend::Backend; use client::blockchain::HeaderBackend; use client::error::{Error as ClientError, ErrorKind as ClientErrorKind}; use parity_codec::{Encode, Decode}; -use grandpa::VoterSet; +use grandpa::voter_set::VoterSet; use grandpa::{Error as GrandpaError}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::{NumberFor, Block as BlockT, Header as HeaderT}; @@ -129,7 +129,7 @@ impl> GrandpaJustification { voters, &ancestry_chain, ) { - Ok(Some(_)) => {}, + Ok(ref result) if result.ghost().is_some() => {}, _ => { let msg = "invalid commit in grandpa justification".to_string(); return Err(ClientErrorKind::BadJustification(msg).into()); diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index b07535963727..b42c329d1569 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -73,7 +73,7 @@ use substrate_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_ use srml_finality_tracker; use grandpa::Error as GrandpaError; -use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; +use grandpa::{voter, round::State as RoundState, BlockNumberOps, voter_set::VoterSet}; use std::fmt; use std::sync::Arc; @@ -98,8 +98,8 @@ pub use service_integration::{LinkHalfForService, BlockImportForService}; pub use communication::Network; pub use finality_proof::{prove_finality, check_finality_proof}; -use aux_schema::{PersistentData, VoterSetState}; -use environment::Environment; +use aux_schema::PersistentData; +use environment::{CompletedRound, CompletedRounds, Environment, HasVoted, SharedVoterSetState, VoterSetState}; use import::GrandpaBlockImport; use until_imported::UntilCommitBlocksImported; use communication::NetworkBridge; @@ -119,6 +119,8 @@ pub type SignedMessage = grandpa::SignedMessage< AuthorityId, >; +/// A primary propose message for this chain's block type. +pub type PrimaryPropose = grandpa::PrimaryPropose<::Hash, NumberFor>; /// A prevote message for this chain's block type. pub type Prevote = grandpa::Prevote<::Hash, NumberFor>; /// A precommit message for this chain's block type. @@ -281,7 +283,7 @@ impl fmt::Display for CommandOrError { pub struct LinkHalf, RA> { client: Arc>, - persistent_data: PersistentData>, + persistent_data: PersistentData, voter_commands_rx: mpsc::UnboundedReceiver>>, } @@ -335,6 +337,41 @@ pub fn block_import, RA, PRA>( )) } +fn global_communication, I, O>( + commits_in: I, + commits_out: O, +) -> ( + impl Stream< + Item = voter::CommunicationIn, AuthoritySignature, AuthorityId>, + Error = CommandOrError>, + >, + impl Sink< + SinkItem = voter::CommunicationOut, AuthoritySignature, AuthorityId>, + SinkError = CommandOrError>, + >, +) where + I: Stream< + Item = (u64, ::grandpa::CompactCommit, AuthoritySignature, AuthorityId>), + Error = CommandOrError>, + >, + O: Sink< + SinkItem = (u64, ::grandpa::Commit, AuthoritySignature, AuthorityId>), + SinkError = CommandOrError>, + >, +{ + let global_in = commits_in.map(|(round, commit)| { + voter::CommunicationIn::Commit(round, commit, voter::Callback::Blank) + }); + + // NOTE: eventually this will also handle catch-up requests + let global_out = commits_out.with(|global| match global { + voter::CommunicationOut::Commit(round, commit) => Ok((round, commit)), + _ => unimplemented!(), + }); + + (global_in, global_out) +} + fn committer_communication, B, E, N, RA>( local_key: Option<&Arc>, set_id: u64, @@ -453,19 +490,44 @@ pub fn run_grandpa, N, RA>( set_id: authority_set.set_id(), authority_set: authority_set.clone(), consensus_changes: consensus_changes.clone(), - last_completed: environment::LastCompletedRound::new(set_state.round()), + voter_set_state: set_state.clone(), }); - let initial_state = (initial_environment, set_state, voter_commands_rx.into_future()); + initial_environment.update_voter_set_state(|voter_set_state| { + match voter_set_state { + VoterSetState::Live { current_round: HasVoted::Yes(id, _), completed_rounds } => { + let local_id = config.local_key.clone().map(|pair| pair.public()); + let has_voted = match local_id { + Some(local_id) => if *id == local_id { + // keep the previous votes + return Ok(None); + } else { + HasVoted::No + }, + _ => HasVoted::No, + }; + + // NOTE: only updated on disk when the voter first + // proposes/prevotes/precommits or completes a round. + Ok(Some(VoterSetState::Live { + current_round: has_voted, + completed_rounds: completed_rounds.clone(), + })) + }, + _ => Ok(None), + } + }).expect("operation inside closure cannot fail; qed"); + + let initial_state = (initial_environment, voter_commands_rx.into_future()); let voter_work = future::loop_fn(initial_state, move |params| { - let (env, set_state, voter_commands_rx) = params; + let (env, voter_commands_rx) = params; debug!(target: "afg", "{}: Starting new voter with set ID {}", config.name(), env.set_id); telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter"; "name" => ?config.name(), "set_id" => ?env.set_id ); - let mut maybe_voter = match set_state.clone() { - VoterSetState::Live(last_round_number, last_round_state) => { + let mut maybe_voter = match &*env.voter_set_state.read() { + VoterSetState::Live { completed_rounds, .. } => { let chain_info = match client.info() { Ok(i) => i, Err(e) => return future::Either::B(future::err(Error::Client(e))), @@ -476,7 +538,7 @@ pub fn run_grandpa, N, RA>( chain_info.chain.finalized_number, ); - let committer_data = committer_communication( + let (commit_in, commit_out) = committer_communication( config.local_key.as_ref(), env.set_id, &env.voters, @@ -484,18 +546,25 @@ pub fn run_grandpa, N, RA>( &network, ); + let global_comms = global_communication::( + commit_in, + commit_out, + ); + let voters = (*env.voters).clone(); + let last_completed_round = completed_rounds.last(); + Some(voter::Voter::new( env.clone(), voters, - committer_data, - last_round_number, - last_round_state, + global_comms, + last_completed_round.number, + last_completed_round.state.clone(), last_finalized, )) - } - VoterSetState::Paused(_, _) => None, + }, + VoterSetState::Paused { .. } => None, }; // needs to be combined with another future otherwise it can deadlock. @@ -526,6 +595,20 @@ pub fn run_grandpa, N, RA>( // start the new authority set using the block where the // set changed (not where the signal happened!) as the base. let genesis_state = RoundState::genesis((new.canon_hash, new.canon_number)); + + let set_state = VoterSetState::Live { + // always start at round 0 when changing sets. + completed_rounds: CompletedRounds::new(CompletedRound { + number: 0, + state: genesis_state, + base: (new.canon_hash, new.canon_number), + votes: Vec::new(), + }), + current_round: HasVoted::No, + }; + + let set_state: SharedVoterSetState<_> = set_state.into(); + let env = Arc::new(Environment { inner: client, config, @@ -534,32 +617,23 @@ pub fn run_grandpa, N, RA>( network, authority_set, consensus_changes, - last_completed: environment::LastCompletedRound::new( - (0, genesis_state.clone()) - ), + voter_set_state: set_state, }); - - let set_state = VoterSetState::Live( - 0, // always start at round 0 when changing sets. - genesis_state, - ); - - Ok(FutureLoop::Continue((env, set_state, voter_commands_rx))) + Ok(FutureLoop::Continue((env, voter_commands_rx))) } VoterCommand::Pause(reason) => { info!(target: "afg", "Pausing old validator set: {}", reason); // not racing because old voter is shut down. - let (last_round_number, last_round_state) = env.last_completed.read(); - let set_state = VoterSetState::Paused( - last_round_number, - last_round_state, - ); - - aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; - - Ok(FutureLoop::Continue((env, set_state, voter_commands_rx))) + env.update_voter_set_state(|voter_set_state| { + let completed_rounds = voter_set_state.completed_rounds(); + let set_state = VoterSetState::Paused { completed_rounds }; + aux_schema::write_voter_set_state(&**client.backend(), &set_state)?; + Ok(Some(set_state)) + })?; + + Ok(FutureLoop::Continue((env, voter_commands_rx))) }, } }; diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index d24a97fb1a85..b4a5d40bf991 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -1015,3 +1015,196 @@ fn test_bad_justification() { ImportResult::AlreadyInChain ); } + +#[test] +fn voter_persists_its_votes() { + use std::iter::FromIterator; + use std::sync::atomic::{AtomicUsize, Ordering}; + use futures::future; + use futures::sync::mpsc; + + let _ = env_logger::try_init(); + + // we have two authorities but we'll only be running the voter for alice + // we are going to be listening for the prevotes it casts + let peers = &[AuthorityKeyring::Alice, AuthorityKeyring::Bob]; + let voters = make_ids(peers); + + // alice has a chain with 20 blocks + let mut net = GrandpaTestNet::new(TestApi::new(voters.clone()), 2); + net.peer(0).push_blocks(20, false); + net.sync(); + + assert_eq!(net.peer(0).client().info().unwrap().chain.best_number, 20, + "Peer #{} failed to sync", 0); + + let mut runtime = current_thread::Runtime::new().unwrap(); + + let client = net.peer(0).client().clone(); + let net = Arc::new(Mutex::new(net)); + + let (voter_tx, voter_rx) = mpsc::unbounded::<()>(); + + // startup a grandpa voter for alice but also listen for messages on a + // channel. whenever a message is received the voter is restarted. when the + // sender is dropped the voter is stopped. + { + let net = net.clone(); + + let voter = future::loop_fn(voter_rx, move |rx| { + let (_block_import, _, link) = net.lock().make_block_import(client.clone()); + let link = link.lock().take().unwrap(); + + let mut voter = run_grandpa( + Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + local_key: Some(Arc::new(peers[0].clone().into())), + name: Some(format!("peer#{}", 0)), + }, + link, + MessageRouting::new(net.clone(), 0), + InherentDataProviders::new(), + futures::empty(), + ).expect("all in order with client and network"); + + let voter = future::poll_fn(move || { + // we need to keep the block_import alive since it owns the + // sender for the voter commands channel, if that gets dropped + // then the voter will stop + let _block_import = _block_import.clone(); + voter.poll() + }); + + voter.select2(rx.into_future()).then(|res| match res { + Ok(future::Either::A(x)) => { + panic!("voter stopped unexpectedly: {:?}", x); + }, + Ok(future::Either::B(((Some(()), rx), _))) => { + Ok(future::Loop::Continue(rx)) + }, + Ok(future::Either::B(((None, _), _))) => { + Ok(future::Loop::Break(())) + }, + Err(future::Either::A(err)) => { + panic!("unexpected error: {:?}", err); + }, + Err(future::Either::B(..)) => { + // voter_rx dropped, stop the voter. + Ok(future::Loop::Break(())) + }, + }) + }); + + runtime.spawn(voter); + } + + let (exit_tx, exit_rx) = futures::sync::oneshot::channel::<()>(); + + // create the communication layer for bob, but don't start any + // voter. instead we'll listen for the prevote that alice casts + // and cast our own manually + { + let config = Config { + gossip_duration: TEST_GOSSIP_DURATION, + justification_period: 32, + local_key: Some(Arc::new(peers[1].clone().into())), + name: Some(format!("peer#{}", 1)), + }; + let routing = MessageRouting::new(net.clone(), 1); + let network = communication::NetworkBridge::new(routing, config.clone()); + + let (round_rx, round_tx) = network.round_communication( + communication::Round(1), + communication::SetId(0), + Arc::new(VoterSet::from_iter(voters)), + Some(config.local_key.unwrap()), + HasVoted::No, + ); + + let round_tx = Arc::new(Mutex::new(round_tx)); + let exit_tx = Arc::new(Mutex::new(Some(exit_tx))); + + let net = net.clone(); + let state = AtomicUsize::new(0); + + runtime.spawn(round_rx.for_each(move |signed| { + if state.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { + // the first message we receive should be a prevote from alice. + let prevote = match signed.message { + grandpa::Message::Prevote(prevote) => prevote, + _ => panic!("voter should prevote."), + }; + + // its chain has 20 blocks and the voter targets 3/4 of the + // unfinalized chain, so the vote should be for block 15 + assert!(prevote.target_number == 15); + + // we push 20 more blocks to alice's chain + net.lock().peer(0).push_blocks(20, false); + net.lock().sync(); + + assert_eq!(net.lock().peer(0).client().info().unwrap().chain.best_number, 40, + "Peer #{} failed to sync", 0); + + let block_30_hash = + net.lock().peer(0).client().backend().blockchain().hash(30).unwrap().unwrap(); + + // we restart alice's voter + voter_tx.unbounded_send(()).unwrap(); + + // and we push our own prevote for block 30 + let prevote = grandpa::Prevote { + target_number: 30, + target_hash: block_30_hash, + }; + + round_tx.lock().start_send(grandpa::Message::Prevote(prevote)).unwrap(); + + } else if state.compare_and_swap(1, 2, Ordering::SeqCst) == 1 { + // the next message we receive should be our own prevote + let prevote = match signed.message { + grandpa::Message::Prevote(prevote) => prevote, + _ => panic!("We should receive our own prevote."), + }; + + // targeting block 30 + assert!(prevote.target_number == 30); + + // after alice restarts it should send its previous prevote + // therefore we won't ever receive it again since it will be a + // known message on the gossip layer + + } else if state.compare_and_swap(2, 3, Ordering::SeqCst) == 2 { + // we then receive a precommit from alice for block 15 + // even though we casted a prevote for block 30 + let precommit = match signed.message { + grandpa::Message::Precommit(precommit) => precommit, + _ => panic!("voter should precommit."), + }; + + assert!(precommit.target_number == 15); + + // signal exit + exit_tx.clone().lock().take().unwrap().send(()).unwrap(); + } + + Ok(()) + }).map_err(|_| ())); + } + + let net = net.clone(); + let drive_to_completion = ::tokio::timer::Interval::new_interval(TEST_ROUTING_INTERVAL) + .for_each(move |_| { + net.lock().send_import_notifications(); + net.lock().send_finality_notifications(); + net.lock().route_fast(); + Ok(()) + }) + .map(|_| ()) + .map_err(|_| ()); + + let exit = exit_rx.into_future().map(|_| ()).map_err(|_| ()); + + runtime.block_on(drive_to_completion.select(exit).map(|_| ()).map_err(|_| ())).unwrap(); +}