Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion polkadot/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,10 @@ impl<C: PolkadotApi, R: TableRouter> bft::Proposer for Proposer<C, R> {
let offset = U256::from_big_endian(&self.random_seed.0) % len;
let offset = offset.low_u64() as usize + round_number;

authorities[offset % authorities.len()].clone()
let proposer = authorities[offset % authorities.len()].clone();
trace!(target: "bft", "proposer for round {} is {}", round_number, Hash::from(proposer));

proposer
}

fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) {
Expand Down
20 changes: 15 additions & 5 deletions polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct BftSink<E> {

struct Messages {
network_stream: net::BftMessageStream,
local_id: AuthorityId,
authorities: Vec<AuthorityId>,
}

Expand All @@ -64,8 +65,9 @@ impl Stream for Messages {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => {
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
match process_message(message, &self.local_id, &self.authorities) {
Ok(Some(message)) => return Ok(Async::Ready(Some(message))),
Ok(None) => {} // ignored local message.
Err(e) => {
debug!("Message validation failed: {:?}", e);
}
Expand All @@ -76,10 +78,11 @@ impl Stream for Messages {
}
}

fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result<bft::Communication, bft::Error> {
Ok(match msg.message {
fn process_message(msg: net::LocalizedBftMessage, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result<Option<bft::Communication>, bft::Error> {
Ok(Some(match msg.message {
net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c {
net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({
if &proposal.sender == local_id { return Ok(None) }
let proposal = bft::generic::LocalizedProposal {
round_number: proposal.round_number as usize,
proposal: proposal.proposal,
Expand All @@ -95,9 +98,12 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
}
};
bft::check_proposal(authorities, &msg.parent_hash, &proposal)?;

trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender));
proposal
}),
net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({
if &vote.sender == local_id { return Ok(None) }
let vote = bft::generic::LocalizedVote {
sender: vote.sender,
signature: ed25519::LocalizedSignature {
Expand All @@ -111,6 +117,8 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
}
};
bft::check_vote(authorities, &msg.parent_hash, &vote)?;

trace!(target: "bft", "importing vote {:?} from {}", vote.vote, Hash::from(vote.sender));
vote
}),
}),
Expand All @@ -121,7 +129,7 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -
.map_err(|_| bft::ErrorKind::InvalidJustification.into());
bft::generic::Communication::Auxiliary(justification?)
},
})
}))
}

impl<E> Sink for BftSink<E> {
Expand Down Expand Up @@ -190,8 +198,10 @@ fn start_bft<F, C>(
}
};


let input = Messages {
network_stream: network.bft_messages(parent_hash),
local_id: bft_service.local_id(),
authorities,
};

Expand Down
110 changes: 108 additions & 2 deletions substrate/bft/src/generic/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,18 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
_ => {},
}

debug!(target: "bft", "Importing proposal for round {}", self.round_number);

self.proposal = Some(Proposal {
proposal: proposal.proposal.clone(),
digest: proposal.digest,
digest_signature: proposal.digest_signature,
});

self.state = State::Proposed(proposal.proposal);
if let State::Begin = self.state {
self.state = State::Proposed(proposal.proposal);
}

Ok(())
}

Expand Down Expand Up @@ -315,6 +320,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
.map(|&(_, ref s)| s.clone())
.collect();

trace!(target: "bft", "observed threshold-prepare for round {}", self.round_number);
self.state = State::Prepared(Justification(UncheckedJustification {
round_number: self.round_number,
digest: threshold_prepared,
Expand All @@ -339,7 +345,6 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
count.committed += 1;

if count.committed >= self.threshold {
trace!(target: "bft", "observed threshold-commit for round {} with {} commits", self.round_number, count.committed);
Some(digest)
} else {
None
Expand Down Expand Up @@ -370,6 +375,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
.map(|&(_, ref s)| s.clone())
.collect();

trace!(target: "bft", "observed threshold-commit for round {}", self.round_number);
self.state = State::Committed(Justification(UncheckedJustification {
round_number: self.round_number,
digest: threshold_committed,
Expand All @@ -387,6 +393,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
self.advance_round.insert(sender);

if self.advance_round.len() < self.threshold { return Ok(()) }
trace!(target: "bft", "Witnessed threshold advance-round messages for round {}", self.round_number);

// allow transition to new round only if we haven't produced a justification
// yet.
Expand Down Expand Up @@ -665,6 +672,105 @@ mod tests {
}
}

#[test]
fn propose_after_prepared_does_not_clobber_state() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);

for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Prepare(1, Digest(999)),
}.into()).unwrap();
}

match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}

accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();

match accumulator.state() {
&State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}

#[test]
fn propose_after_committed_does_not_clobber_state() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);

for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(999, i),
vote: Vote::Commit(1, Digest(999)),
}.into()).unwrap();
}

match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}

accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();

match accumulator.state() {
&State::Committed(ref j) => assert_eq!(j.digest, Digest(999)),
s => panic!("wrong state: {:?}", s),
}
}

#[test]
fn propose_after_advance_does_not_clobber_state() {
let mut accumulator = Accumulator::<Candidate, _, _, _>::new(1, 7, AuthorityId(8));
assert_eq!(accumulator.state(), &State::Begin);

for i in 0..7 {
accumulator.import_message(LocalizedVote {
sender: AuthorityId(i),
signature: Signature(1, i),
vote: Vote::AdvanceRound(1),
}.into()).unwrap();
}

match accumulator.state() {
&State::Advanced(_) => {}
s => panic!("wrong state: {:?}", s),
}

accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal {
sender: AuthorityId(8),
full_signature: Signature(999, 8),
digest_signature: Signature(999, 8),
round_number: 1,
proposal: Candidate(999),
digest: Digest(999),
})).unwrap();

match accumulator.state() {
&State::Advanced(_) => {}
s => panic!("wrong state: {:?}", s),
}
}

#[test]
fn begin_to_advance() {
let mut accumulator = Accumulator::<Candidate, Digest, _, _>::new(1, 7, AuthorityId(8));
Expand Down
3 changes: 2 additions & 1 deletion substrate/bft/src/generic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ impl<C: Context> Strategy<C> {

// poll until either completion or state doesn't change.
loop {
trace!(target: "bft", "Polling BFT logic. State={:?}", last_watermark);
match self.poll_once(context, sending)? {
Async::Ready(x) => return Ok(Async::Ready(x)),
Async::NotReady => {
Expand Down Expand Up @@ -683,6 +684,7 @@ impl<C: Context> Strategy<C> {

fn advance_to_round(&mut self, context: &C, round: usize) {
assert!(round > self.current_accumulator.round_number());
trace!(target: "bft", "advancing to round {}", round);

let threshold = self.nodes - self.max_faulty;

Expand Down Expand Up @@ -790,7 +792,6 @@ impl<C, I, O> Future for Agreement<C, I, O>
self.poll()
}
Async::NotReady => {

Ok(Async::NotReady)
}
}
Expand Down
8 changes: 7 additions & 1 deletion substrate/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ impl<P, I> BftService<P, I>
}
}

/// Get the local Authority ID.
pub fn local_id(&self) -> AuthorityId {
// TODO: based on a header and some keystore.
self.key.public().0
}

/// Signal that a valid block with the given header has been imported.
///
/// If the local signing key is an authority, this will begin the consensus process to build a
Expand All @@ -350,7 +356,7 @@ impl<P, I> BftService<P, I>
let max_faulty = max_faulty_of(n);
trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty);

let local_id = self.key.public().0;
let local_id = self.local_id();

if !authorities.contains(&local_id) {
// cancel current agreement
Expand Down
2 changes: 2 additions & 0 deletions substrate/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ impl Consensus {
pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
if self.messages.contains_key(&hash) {
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
return;
}

if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.known_messages.insert(hash);
// TODO: validate signature?
Expand Down