diff --git a/src/testing.rs b/src/testing.rs index 6461baef..acd22f5c 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -187,12 +187,13 @@ impl crate::voter::Environment<&'static str, u32> for Environment { type Out = Box,SinkError=Error> + Send + 'static>; type Error = Error; - fn round_data(&self, round: u64) -> RoundData { + fn round_data(&self, round: u64) -> RoundData { const GOSSIP_DURATION: Duration = Duration::from_millis(500); let now = Instant::now(); let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { + voter_id: Some(self.local_id), prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index be593489..6ff16279 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -17,9 +17,15 @@ //! A voter in GRANDPA. This transitions between rounds and casts votes. //! //! Voters rely on some external context to function: -//! - setting timers to cast votes -//! - incoming vote streams +//! - setting timers to cast votes. +//! - incoming vote streams. //! - providing voter weights. +//! - getting the local voter id. +//! +//! The local voter id is used to check whether to cast votes for a given +//! round. If no local id is defined or if it's not part of the voter set then +//! votes will not be pushed to the sink. The protocol state machine still +//! transitions state as if the votes had been pushed out. use futures::prelude::*; use futures::sync::mpsc::{self, UnboundedReceiver}; @@ -70,9 +76,10 @@ pub trait Environment: Chain { /// Furthermore, this means that actual logic of creating and verifying /// signatures is flexible and can be maintained outside this crate. fn round_data(&self, round: u64) -> RoundData< + Self::Id, Self::Timer, Self::In, - Self::Out + Self::Out, >; /// Return a timer that will be used to delay the broadcast of a commit @@ -153,7 +160,9 @@ pub struct CatchUp { } /// Data necessary to participate in a round. -pub struct RoundData { +pub struct RoundData { + /// Local voter id (if any.) + pub voter_id: Option, /// Timer before prevotes can be cast. This should be Start + 2T /// where T is the gossip time estimate. pub prevote_timer: Timer, diff --git a/src/voter/voting_round.rs b/src/voter/voting_round.rs index acc00b97..940578d2 100644 --- a/src/voter/voting_round.rs +++ b/src/voter/voting_round.rs @@ -33,6 +33,7 @@ use super::{Environment, Buffered}; /// The state of a voting round. pub(super) enum State { Start(T, T), + Proposed(T, T), Prevoted(T), Precommitted, } @@ -41,6 +42,7 @@ impl std::fmt::Debug for State { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { State::Start(..) => write!(f, "Start"), + State::Proposed(..) => write!(f, "Proposed"), State::Prevoted(_) => write!(f, "Prevoted"), State::Precommitted => write!(f, "Precommitted"), } @@ -53,6 +55,7 @@ pub(super) struct VotingRound> where N: Copy + BlockNumberOps + ::std::fmt::Debug, { env: Arc, + voting: Voting, votes: Round, incoming: E::In, outgoing: Buffered, @@ -64,6 +67,36 @@ pub(super) struct VotingRound> where best_finalized: Option>, } +/// Whether we should vote in the current round (i.e. push votes to the sink.) +enum Voting { + /// Voting is disabled for the current round. + No, + /// Voting is enabled for the current round (prevotes and precommits.) + Yes, + /// Voting is enabled for the current round and we are the primary proposer + /// (we can also push primary propose messages). + Primary, +} + +impl Voting { + /// Whether the voter should cast round votes (prevotes and precommits.) + fn is_active(&self) -> bool { + match self { + Voting::Yes => true, + Voting::Primary => true, + _ => false, + } + } + + /// Whether the voter is the primary proposer. + fn is_primary(&self) -> bool { + match self { + Voting::Primary => true, + _ => false, + } + } +} + impl> VotingRound where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, @@ -84,8 +117,23 @@ impl> VotingRound where round_number, }; + let votes = Round::new(round_params); + + let voting = if round_data.voter_id.as_ref() == Some(&votes.primary_voter().0) { + Voting::Primary + } else if round_data.voter_id + .as_ref() + .map(|id| votes.voters().contains_key(id)) + .unwrap_or(false) + { + Voting::Yes + } else { + Voting::No + }; + VotingRound { - votes: Round::new(round_params), + votes, + voting, incoming: round_data.incoming, outgoing: Buffered::new(round_data.outgoing), state: Some( @@ -244,10 +292,11 @@ impl> VotingRound where } fn primary_propose(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { - match self.state { - Some(State::Start(_, _)) => { + match self.state.take() { + Some(State::Start(prevote_timer, precommit_timer)) => { let maybe_estimate = last_round_state.estimate.clone(); - if let Some(last_round_estimate) = maybe_estimate { + + if let (Some(last_round_estimate), true) = (maybe_estimate, self.voting.is_primary()) { let maybe_finalized = last_round_state.finalized.clone(); // Last round estimate has not been finalized. @@ -260,37 +309,61 @@ impl> VotingRound where target_number: last_round_estimate.1, }) ); + self.state = Some(State::Proposed(prevote_timer, precommit_timer)); + + return Ok(()); } - } else { - debug!(target: "afg", "Last round estimate does not exists, \ + } + + if self.voting.is_primary() { + debug!(target: "afg", "Last round estimate does not exist, \ not sending primary block hint for round {}", self.votes.number()); } - } - _ => { } + + self.state = Some(State::Start(prevote_timer, precommit_timer)); + }, + x => { self.state = x; } } Ok(()) } fn prevote(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { - match self.state.take() { - Some(State::Start(mut prevote_timer, precommit_timer)) => { - let should_prevote = match prevote_timer.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(())) => true, - Ok(Async::NotReady) => self.votes.completable(), - }; + let state = self.state.take(); + + let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| { + let should_prevote = match prevote_timer.poll() { + Err(e) => return Err(e), + Ok(Async::Ready(())) => true, + Ok(Async::NotReady) => self.votes.completable(), + }; - if should_prevote { + if should_prevote { + if self.voting.is_active() { if let Some(prevote) = self.construct_prevote(last_round_state)? { debug!(target: "afg", "Casting prevote for round {}", self.votes.number()); self.outgoing.push(Message::Prevote(prevote)); } - self.state = Some(State::Prevoted(precommit_timer)); + } + self.state = Some(State::Prevoted(precommit_timer)); + } else { + if proposed { + self.state = Some(State::Proposed(prevote_timer, precommit_timer)); } else { self.state = Some(State::Start(prevote_timer, precommit_timer)); } } + + Ok(()) + }; + + match state { + Some(State::Start(prevote_timer, precommit_timer)) => { + handle_prevote(prevote_timer, precommit_timer, false)?; + }, + Some(State::Proposed(prevote_timer, precommit_timer)) => { + handle_prevote(prevote_timer, precommit_timer, true)?; + }, x => { self.state = x; } } @@ -317,9 +390,11 @@ impl> VotingRound where }; if should_precommit { - debug!(target: "afg", "Casting precommit for round {}", self.votes.number()); - let precommit = self.construct_precommit(); - self.outgoing.push(Message::Precommit(precommit)); + if self.voting.is_active() { + debug!(target: "afg", "Casting precommit for round {}", self.votes.number()); + let precommit = self.construct_precommit(); + self.outgoing.push(Message::Precommit(precommit)); + } self.state = Some(State::Precommitted); } else { self.state = Some(State::Prevoted(precommit_timer));