Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,13 @@ impl crate::voter::Environment<&'static str, u32> for Environment {
type Out = Box<Sink<SinkItem=Message<&'static str, u32>,SinkError=Error> + Send + 'static>;
type Error = Error;

fn round_data(&self, round: u64) -> RoundData<Self::Timer, Self::In, Self::Out> {
fn round_data(&self, round: u64) -> RoundData<Self::Id, Self::Timer, Self::In, Self::Out> {
const GOSSIP_DURATION: Duration = Duration::from_millis(500);

let now = Instant::now();
let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id);
RoundData {
voter_id: Some(self.local_id),
prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION)
.map_err(|_| panic!("Timer failed"))),
precommit_timer: Box::new(Delay::new(now + GOSSIP_DURATION + GOSSIP_DURATION)
Expand Down
17 changes: 13 additions & 4 deletions src/voter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
//! A voter in GRANDPA. This transitions between rounds and casts votes.
//!
//! Voters rely on some external context to function:
//! - setting timers to cast votes
//! - incoming vote streams
//! - setting timers to cast votes.
//! - incoming vote streams.
//! - providing voter weights.
//! - getting the local voter id.
//!
//! The local voter id is used to check whether to cast votes for a given
//! round. If no local id is defined or if it's not part of the voter set then
//! votes will not be pushed to the sink. The protocol state machine still
//! transitions state as if the votes had been pushed out.

use futures::prelude::*;
use futures::sync::mpsc::{self, UnboundedReceiver};
Expand Down Expand Up @@ -70,9 +76,10 @@ pub trait Environment<H: Eq, N: BlockNumberOps>: Chain<H, N> {
/// Furthermore, this means that actual logic of creating and verifying
/// signatures is flexible and can be maintained outside this crate.
fn round_data(&self, round: u64) -> RoundData<
Self::Id,
Self::Timer,
Self::In,
Self::Out
Self::Out,
>;

/// Return a timer that will be used to delay the broadcast of a commit
Expand Down Expand Up @@ -153,7 +160,9 @@ pub struct CatchUp<H, N> {
}

/// Data necessary to participate in a round.
pub struct RoundData<Timer, Input, Output> {
pub struct RoundData<Id, Timer, Input, Output> {
/// Local voter id (if any.)
pub voter_id: Option<Id>,
/// Timer before prevotes can be cast. This should be Start + 2T
/// where T is the gossip time estimate.
pub prevote_timer: Timer,
Expand Down
115 changes: 95 additions & 20 deletions src/voter/voting_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::{Environment, Buffered};
/// The state of a voting round.
pub(super) enum State<T> {
Start(T, T),
Proposed(T, T),
Prevoted(T),
Precommitted,
}
Expand All @@ -41,6 +42,7 @@ impl<T> std::fmt::Debug for State<T> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
State::Start(..) => write!(f, "Start"),
State::Proposed(..) => write!(f, "Proposed"),
State::Prevoted(_) => write!(f, "Prevoted"),
State::Precommitted => write!(f, "Precommitted"),
}
Expand All @@ -53,6 +55,7 @@ pub(super) struct VotingRound<H, N, E: Environment<H, N>> where
N: Copy + BlockNumberOps + ::std::fmt::Debug,
{
env: Arc<E>,
voting: Voting,
votes: Round<E::Id, H, N, E::Signature>,
incoming: E::In,
outgoing: Buffered<E::Out>,
Expand All @@ -64,6 +67,36 @@ pub(super) struct VotingRound<H, N, E: Environment<H, N>> where
best_finalized: Option<Commit<H, N, E::Signature, E::Id>>,
}

/// Whether we should vote in the current round (i.e. push votes to the sink.)
enum Voting {
/// Voting is disabled for the current round.
No,
/// Voting is enabled for the current round (prevotes and precommits.)
Yes,
/// Voting is enabled for the current round and we are the primary proposer
/// (we can also push primary propose messages).
Primary,
}

impl Voting {
/// Whether the voter should cast round votes (prevotes and precommits.)
fn is_active(&self) -> bool {
match self {
Voting::Yes => true,
Voting::Primary => true,
_ => false,
}
}

/// Whether the voter is the primary proposer.
fn is_primary(&self) -> bool {
match self {
Voting::Primary => true,
_ => false,
}
}
}

impl<H, N, E: Environment<H, N>> VotingRound<H, N, E> where
H: Hash + Clone + Eq + Ord + ::std::fmt::Debug,
N: Copy + BlockNumberOps + ::std::fmt::Debug,
Expand All @@ -84,8 +117,23 @@ impl<H, N, E: Environment<H, N>> VotingRound<H, N, E> where
round_number,
};

let votes = Round::new(round_params);

let voting = if round_data.voter_id.as_ref() == Some(&votes.primary_voter().0) {
Voting::Primary
} else if round_data.voter_id
.as_ref()
.map(|id| votes.voters().contains_key(id))
.unwrap_or(false)
{
Voting::Yes
} else {
Voting::No
};

VotingRound {
votes: Round::new(round_params),
votes,
voting,
incoming: round_data.incoming,
outgoing: Buffered::new(round_data.outgoing),
state: Some(
Expand Down Expand Up @@ -244,10 +292,11 @@ impl<H, N, E: Environment<H, N>> VotingRound<H, N, E> where
}

fn primary_propose(&mut self, last_round_state: &RoundState<H, N>) -> Result<(), E::Error> {
match self.state {
Some(State::Start(_, _)) => {
match self.state.take() {
Some(State::Start(prevote_timer, precommit_timer)) => {
let maybe_estimate = last_round_state.estimate.clone();
if let Some(last_round_estimate) = maybe_estimate {

if let (Some(last_round_estimate), true) = (maybe_estimate, self.voting.is_primary()) {
let maybe_finalized = last_round_state.finalized.clone();

// Last round estimate has not been finalized.
Expand All @@ -260,37 +309,61 @@ impl<H, N, E: Environment<H, N>> VotingRound<H, N, E> where
target_number: last_round_estimate.1,
})
);
self.state = Some(State::Proposed(prevote_timer, precommit_timer));

return Ok(());
}
} else {
debug!(target: "afg", "Last round estimate does not exists, \
}

if self.voting.is_primary() {
debug!(target: "afg", "Last round estimate does not exist, \
not sending primary block hint for round {}", self.votes.number());
}
}
_ => { }

self.state = Some(State::Start(prevote_timer, precommit_timer));
},
x => { self.state = x; }
}

Ok(())
}

fn prevote(&mut self, last_round_state: &RoundState<H, N>) -> Result<(), E::Error> {
match self.state.take() {
Some(State::Start(mut prevote_timer, precommit_timer)) => {
let should_prevote = match prevote_timer.poll() {
Err(e) => return Err(e),
Ok(Async::Ready(())) => true,
Ok(Async::NotReady) => self.votes.completable(),
};
let state = self.state.take();

let mut handle_prevote = |mut prevote_timer: E::Timer, precommit_timer: E::Timer, proposed| {
let should_prevote = match prevote_timer.poll() {
Err(e) => return Err(e),
Ok(Async::Ready(())) => true,
Ok(Async::NotReady) => self.votes.completable(),
};

if should_prevote {
if should_prevote {
if self.voting.is_active() {
if let Some(prevote) = self.construct_prevote(last_round_state)? {
debug!(target: "afg", "Casting prevote for round {}", self.votes.number());
self.outgoing.push(Message::Prevote(prevote));
}
self.state = Some(State::Prevoted(precommit_timer));
}
self.state = Some(State::Prevoted(precommit_timer));
} else {
if proposed {
self.state = Some(State::Proposed(prevote_timer, precommit_timer));
} else {
self.state = Some(State::Start(prevote_timer, precommit_timer));
}
}

Ok(())
};

match state {
Some(State::Start(prevote_timer, precommit_timer)) => {
handle_prevote(prevote_timer, precommit_timer, false)?;
},
Some(State::Proposed(prevote_timer, precommit_timer)) => {
handle_prevote(prevote_timer, precommit_timer, true)?;
},
x => { self.state = x; }
}

Expand All @@ -317,9 +390,11 @@ impl<H, N, E: Environment<H, N>> VotingRound<H, N, E> where
};

if should_precommit {
debug!(target: "afg", "Casting precommit for round {}", self.votes.number());
let precommit = self.construct_precommit();
self.outgoing.push(Message::Precommit(precommit));
if self.voting.is_active() {
debug!(target: "afg", "Casting precommit for round {}", self.votes.number());
let precommit = self.construct_precommit();
self.outgoing.push(Message::Precommit(precommit));
}
self.state = Some(State::Precommitted);
} else {
self.state = Some(State::Prevoted(precommit_timer));
Expand Down