From a088a8ec3cd83d4cfd57e9ef49dd60d413827d66 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 27 Mar 2019 21:55:27 +0000 Subject: [PATCH 1/4] voter: only primary voter proposes and only once --- src/voter/mod.rs | 7 +++++ src/voter/voting_round.rs | 66 +++++++++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index be593489..599447e1 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -242,6 +242,7 @@ pub struct Voter, GlobalIn, GlobalOut> where GlobalOut: Sink, SinkError=E::Error>, { env: Arc, + voter_id: Option, voters: VoterSet, best_round: VotingRound, past_rounds: PastRounds, @@ -274,6 +275,7 @@ impl, GlobalIn, GlobalOut> Voter, + voter_id: Option, voters: VoterSet, global_comms: (GlobalIn, GlobalOut), last_round_number: u64, @@ -286,6 +288,7 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter { Start(T, T), + Proposed(T, T), Prevoted(T), Precommitted, } @@ -41,6 +42,7 @@ impl std::fmt::Debug for State { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { State::Start(..) => write!(f, "Start"), + State::Proposed(..) => write!(f, "Proposed"), State::Prevoted(_) => write!(f, "Prevoted"), State::Precommitted => write!(f, "Precommitted"), } @@ -53,6 +55,7 @@ pub(super) struct VotingRound> where N: Copy + BlockNumberOps + ::std::fmt::Debug, { env: Arc, + voter_id: Option, votes: Round, incoming: E::In, outgoing: Buffered, @@ -71,6 +74,7 @@ impl> VotingRound where /// Create a new voting round. pub (super) fn new( round_number: u64, + voter_id: Option, voters: VoterSet, base: (H, N), last_round_state: Option>, @@ -85,6 +89,7 @@ impl> VotingRound where }; VotingRound { + voter_id, votes: Round::new(round_params), incoming: round_data.incoming, outgoing: Buffered::new(round_data.outgoing), @@ -244,10 +249,12 @@ impl> VotingRound where } fn primary_propose(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { - match self.state { - Some(State::Start(_, _)) => { + match self.state.take() { + Some(State::Start(prevote_timer, precommit_timer)) => { + let our_turn = self.voter_id.as_ref() == Some(&self.votes.primary_voter().0); let maybe_estimate = last_round_state.estimate.clone(); - if let Some(last_round_estimate) = maybe_estimate { + + if let (Some(last_round_estimate), true) = (maybe_estimate, our_turn) { let maybe_finalized = last_round_state.finalized.clone(); // Last round estimate has not been finalized. @@ -260,37 +267,56 @@ impl> VotingRound where target_number: last_round_estimate.1, }) ); + self.state = Some(State::Proposed(prevote_timer, precommit_timer)); } } else { - debug!(target: "afg", "Last round estimate does not exists, \ - not sending primary block hint for round {}", self.votes.number()); + if our_turn { + debug!(target: "afg", "Last round estimate does not exist, \ + not sending primary block hint for round {}", self.votes.number()); + } + self.state = Some(State::Start(prevote_timer, precommit_timer)); } - } - _ => { } + }, + x => { self.state = x; } } Ok(()) } fn prevote(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { - match self.state.take() { - Some(State::Start(mut prevote_timer, precommit_timer)) => { - let should_prevote = match prevote_timer.poll() { - Err(e) => return Err(e), - Ok(Async::Ready(())) => true, - Ok(Async::NotReady) => self.votes.completable(), - }; + let state = self.state.take(); - if should_prevote { - if let Some(prevote) = self.construct_prevote(last_round_state)? { - debug!(target: "afg", "Casting prevote for round {}", self.votes.number()); - self.outgoing.push(Message::Prevote(prevote)); - } - self.state = Some(State::Prevoted(precommit_timer)); + let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| { + let should_prevote = match prevote_timer.poll() { + Err(e) => return Err(e), + Ok(Async::Ready(())) => true, + Ok(Async::NotReady) => self.votes.completable(), + }; + + if should_prevote { + if let Some(prevote) = self.construct_prevote(last_round_state)? { + debug!(target: "afg", "Casting prevote for round {}", self.votes.number()); + self.outgoing.push(Message::Prevote(prevote)); + } + self.state = Some(State::Prevoted(precommit_timer)); + } else { + if proposed { + self.state = Some(State::Proposed(prevote_timer, precommit_timer)); } else { self.state = Some(State::Start(prevote_timer, precommit_timer)); } } + + Ok(()) + }; + + match state { + Some(State::Start(prevote_timer, precommit_timer)) => { + handle_prevote(prevote_timer, precommit_timer, false)?; + }, + Some(State::Proposed(prevote_timer, precommit_timer)) => { + handle_prevote(prevote_timer, precommit_timer, true)?; + }, x => { self.state = x; } } From 2a22404ae277af0dc7324e1c7b2c9e1aa7fca06c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Wed, 27 Mar 2019 22:06:34 +0000 Subject: [PATCH 2/4] voter: fix tests --- src/voter/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index 599447e1..fd0f5ca0 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -657,6 +657,7 @@ mod tests { let finalized = env.finalized_stream(); let voter = Voter::new( env.clone(), + None, voters, global_comms, 0, @@ -703,6 +704,7 @@ mod tests { let finalized = env.finalized_stream(); let voter = Voter::new( env.clone(), + None, voters.clone(), network.make_global_comms(), 0, @@ -746,6 +748,7 @@ mod tests { // run voter in background. scheduling it to shut down at the end. let voter = Voter::new( env.clone(), + None, voters.clone(), global_comms, 0, @@ -811,6 +814,7 @@ mod tests { // run voter in background. scheduling it to shut down at the end. let voter = Voter::new( env.clone(), + None, voters.clone(), global_comms, 0, @@ -898,6 +902,7 @@ mod tests { // run voter in background. scheduling it to shut down at the end. let voter = Voter::new( env.clone(), + None, voters.clone(), global_comms, 1, @@ -945,6 +950,7 @@ mod tests { Voter::new( env.clone(), + None, voters.clone(), network.make_global_comms(), 0, @@ -978,6 +984,7 @@ mod tests { // run voter in background starting at round 5. scheduling it to shut down when signalled. let voter = Voter::new( env.clone(), + None, voters.clone(), network.make_global_comms(), 5, From a9fca5dfb0b996eb12c2bc04bbb5277e08452334 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Thu, 28 Mar 2019 22:55:38 +0000 Subject: [PATCH 3/4] voter: fix primary proposer state restore --- src/voter/voting_round.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/voter/voting_round.rs b/src/voter/voting_round.rs index 65aceb22..94792c02 100644 --- a/src/voter/voting_round.rs +++ b/src/voter/voting_round.rs @@ -268,14 +268,17 @@ impl> VotingRound where }) ); self.state = Some(State::Proposed(prevote_timer, precommit_timer)); + + return Ok(()); } - } else { - if our_turn { - debug!(target: "afg", "Last round estimate does not exist, \ - not sending primary block hint for round {}", self.votes.number()); - } - self.state = Some(State::Start(prevote_timer, precommit_timer)); } + + if our_turn { + debug!(target: "afg", "Last round estimate does not exist, \ + not sending primary block hint for round {}", self.votes.number()); + } + + self.state = Some(State::Start(prevote_timer, precommit_timer)); }, x => { self.state = x; } } From 46cbed09d802b76a60021c49f4728dd8d9a101c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Silva?= Date: Fri, 29 Mar 2019 13:52:14 +0000 Subject: [PATCH 4/4] voter: only push votes if voter/proposer for current round --- src/testing.rs | 3 +- src/voter/mod.rs | 31 +++++++---------- src/voter/voting_round.rs | 72 ++++++++++++++++++++++++++++++++------- 3 files changed, 74 insertions(+), 32 deletions(-) diff --git a/src/testing.rs b/src/testing.rs index 6461baef..acd22f5c 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -187,12 +187,13 @@ impl crate::voter::Environment<&'static str, u32> for Environment { type Out = Box,SinkError=Error> + Send + 'static>; type Error = Error; - fn round_data(&self, round: u64) -> RoundData { + fn round_data(&self, round: u64) -> RoundData { const GOSSIP_DURATION: Duration = Duration::from_millis(500); let now = Instant::now(); let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { + voter_id: Some(self.local_id), prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION) diff --git a/src/voter/mod.rs b/src/voter/mod.rs index fd0f5ca0..6ff16279 100644 --- a/src/voter/mod.rs +++ b/src/voter/mod.rs @@ -17,9 +17,15 @@ //! A voter in GRANDPA. This transitions between rounds and casts votes. //! //! Voters rely on some external context to function: -//! - setting timers to cast votes -//! - incoming vote streams +//! - setting timers to cast votes. +//! - incoming vote streams. //! - providing voter weights. +//! - getting the local voter id. +//! +//! The local voter id is used to check whether to cast votes for a given +//! round. If no local id is defined or if it's not part of the voter set then +//! votes will not be pushed to the sink. The protocol state machine still +//! transitions state as if the votes had been pushed out. use futures::prelude::*; use futures::sync::mpsc::{self, UnboundedReceiver}; @@ -70,9 +76,10 @@ pub trait Environment: Chain { /// Furthermore, this means that actual logic of creating and verifying /// signatures is flexible and can be maintained outside this crate. fn round_data(&self, round: u64) -> RoundData< + Self::Id, Self::Timer, Self::In, - Self::Out + Self::Out, >; /// Return a timer that will be used to delay the broadcast of a commit @@ -153,7 +160,9 @@ pub struct CatchUp { } /// Data necessary to participate in a round. -pub struct RoundData { +pub struct RoundData { + /// Local voter id (if any.) + pub voter_id: Option, /// Timer before prevotes can be cast. This should be Start + 2T /// where T is the gossip time estimate. pub prevote_timer: Timer, @@ -242,7 +251,6 @@ pub struct Voter, GlobalIn, GlobalOut> where GlobalOut: Sink, SinkError=E::Error>, { env: Arc, - voter_id: Option, voters: VoterSet, best_round: VotingRound, past_rounds: PastRounds, @@ -275,7 +283,6 @@ impl, GlobalIn, GlobalOut> Voter, - voter_id: Option, voters: VoterSet, global_comms: (GlobalIn, GlobalOut), last_round_number: u64, @@ -288,7 +295,6 @@ impl, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter, GlobalIn, GlobalOut> Voter> where N: Copy + BlockNumberOps + ::std::fmt::Debug, { env: Arc, - voter_id: Option, + voting: Voting, votes: Round, incoming: E::In, outgoing: Buffered, @@ -67,6 +67,36 @@ pub(super) struct VotingRound> where best_finalized: Option>, } +/// Whether we should vote in the current round (i.e. push votes to the sink.) +enum Voting { + /// Voting is disabled for the current round. + No, + /// Voting is enabled for the current round (prevotes and precommits.) + Yes, + /// Voting is enabled for the current round and we are the primary proposer + /// (we can also push primary propose messages). + Primary, +} + +impl Voting { + /// Whether the voter should cast round votes (prevotes and precommits.) + fn is_active(&self) -> bool { + match self { + Voting::Yes => true, + Voting::Primary => true, + _ => false, + } + } + + /// Whether the voter is the primary proposer. + fn is_primary(&self) -> bool { + match self { + Voting::Primary => true, + _ => false, + } + } +} + impl> VotingRound where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug, N: Copy + BlockNumberOps + ::std::fmt::Debug, @@ -74,7 +104,6 @@ impl> VotingRound where /// Create a new voting round. pub (super) fn new( round_number: u64, - voter_id: Option, voters: VoterSet, base: (H, N), last_round_state: Option>, @@ -88,9 +117,23 @@ impl> VotingRound where round_number, }; + let votes = Round::new(round_params); + + let voting = if round_data.voter_id.as_ref() == Some(&votes.primary_voter().0) { + Voting::Primary + } else if round_data.voter_id + .as_ref() + .map(|id| votes.voters().contains_key(id)) + .unwrap_or(false) + { + Voting::Yes + } else { + Voting::No + }; + VotingRound { - voter_id, - votes: Round::new(round_params), + votes, + voting, incoming: round_data.incoming, outgoing: Buffered::new(round_data.outgoing), state: Some( @@ -251,10 +294,9 @@ impl> VotingRound where fn primary_propose(&mut self, last_round_state: &RoundState) -> Result<(), E::Error> { match self.state.take() { Some(State::Start(prevote_timer, precommit_timer)) => { - let our_turn = self.voter_id.as_ref() == Some(&self.votes.primary_voter().0); let maybe_estimate = last_round_state.estimate.clone(); - if let (Some(last_round_estimate), true) = (maybe_estimate, our_turn) { + if let (Some(last_round_estimate), true) = (maybe_estimate, self.voting.is_primary()) { let maybe_finalized = last_round_state.finalized.clone(); // Last round estimate has not been finalized. @@ -273,7 +315,7 @@ impl> VotingRound where } } - if our_turn { + if self.voting.is_primary() { debug!(target: "afg", "Last round estimate does not exist, \ not sending primary block hint for round {}", self.votes.number()); } @@ -297,9 +339,11 @@ impl> VotingRound where }; if should_prevote { - if let Some(prevote) = self.construct_prevote(last_round_state)? { - debug!(target: "afg", "Casting prevote for round {}", self.votes.number()); - self.outgoing.push(Message::Prevote(prevote)); + if self.voting.is_active() { + if let Some(prevote) = self.construct_prevote(last_round_state)? { + debug!(target: "afg", "Casting prevote for round {}", self.votes.number()); + self.outgoing.push(Message::Prevote(prevote)); + } } self.state = Some(State::Prevoted(precommit_timer)); } else { @@ -346,9 +390,11 @@ impl> VotingRound where }; if should_precommit { - debug!(target: "afg", "Casting precommit for round {}", self.votes.number()); - let precommit = self.construct_precommit(); - self.outgoing.push(Message::Precommit(precommit)); + if self.voting.is_active() { + debug!(target: "afg", "Casting precommit for round {}", self.votes.number()); + let precommit = self.construct_precommit(); + self.outgoing.push(Message::Precommit(precommit)); + } self.state = Some(State::Precommitted); } else { self.state = Some(State::Prevoted(precommit_timer));