diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 2630bea720b3e..e004a44df5888 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -635,7 +635,10 @@ impl bft::Proposer for Proposer { let offset = U256::from_big_endian(&self.random_seed.0) % len; let offset = offset.low_u64() as usize + round_number; - authorities[offset % authorities.len()].clone() + let proposer = authorities[offset % authorities.len()].clone(); + trace!(target: "bft", "proposer for round {} is {}", round_number, Hash::from(proposer)); + + proposer } fn import_misbehavior(&self, misbehavior: Vec<(AuthorityId, bft::Misbehavior)>) { diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index c2280f2eaa111..ce2b562a475cd 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -49,6 +49,7 @@ struct BftSink { struct Messages { network_stream: net::BftMessageStream, + local_id: AuthorityId, authorities: Vec, } @@ -64,8 +65,9 @@ impl Stream for Messages { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end. Ok(Async::Ready(Some(message))) => { - match process_message(message, &self.authorities) { - Ok(message) => return Ok(Async::Ready(Some(message))), + match process_message(message, &self.local_id, &self.authorities) { + Ok(Some(message)) => return Ok(Async::Ready(Some(message))), + Ok(None) => {} // ignored local message. Err(e) => { debug!("Message validation failed: {:?}", e); } @@ -76,10 +78,11 @@ impl Stream for Messages { } } -fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) -> Result { - Ok(match msg.message { +fn process_message(msg: net::LocalizedBftMessage, local_id: &AuthorityId, authorities: &[AuthorityId]) -> Result, bft::Error> { + Ok(Some(match msg.message { net::BftMessage::Consensus(c) => bft::generic::Communication::Consensus(match c { net::SignedConsensusMessage::Propose(proposal) => bft::generic::LocalizedMessage::Propose({ + if &proposal.sender == local_id { return Ok(None) } let proposal = bft::generic::LocalizedProposal { round_number: proposal.round_number as usize, proposal: proposal.proposal, @@ -95,9 +98,12 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) - } }; bft::check_proposal(authorities, &msg.parent_hash, &proposal)?; + + trace!(target: "bft", "importing proposal message for round {} from {}", proposal.round_number, Hash::from(proposal.sender)); proposal }), net::SignedConsensusMessage::Vote(vote) => bft::generic::LocalizedMessage::Vote({ + if &vote.sender == local_id { return Ok(None) } let vote = bft::generic::LocalizedVote { sender: vote.sender, signature: ed25519::LocalizedSignature { @@ -111,6 +117,8 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) - } }; bft::check_vote(authorities, &msg.parent_hash, &vote)?; + + trace!(target: "bft", "importing vote {:?} from {}", vote.vote, Hash::from(vote.sender)); vote }), }), @@ -121,7 +129,7 @@ fn process_message(msg: net::LocalizedBftMessage, authorities: &[AuthorityId]) - .map_err(|_| bft::ErrorKind::InvalidJustification.into()); bft::generic::Communication::Auxiliary(justification?) }, - }) + })) } impl Sink for BftSink { @@ -190,8 +198,10 @@ fn start_bft( } }; + let input = Messages { network_stream: network.bft_messages(parent_hash), + local_id: bft_service.local_id(), authorities, }; diff --git a/substrate/bft/src/generic/accumulator.rs b/substrate/bft/src/generic/accumulator.rs index e610faa0b8eb1..811826b7d68a4 100644 --- a/substrate/bft/src/generic/accumulator.rs +++ b/substrate/bft/src/generic/accumulator.rs @@ -259,13 +259,18 @@ impl Accumulator {}, } + debug!(target: "bft", "Importing proposal for round {}", self.round_number); + self.proposal = Some(Proposal { proposal: proposal.proposal.clone(), digest: proposal.digest, digest_signature: proposal.digest_signature, }); - self.state = State::Proposed(proposal.proposal); + if let State::Begin = self.state { + self.state = State::Proposed(proposal.proposal); + } + Ok(()) } @@ -315,6 +320,7 @@ impl Accumulator Accumulator= self.threshold { - trace!(target: "bft", "observed threshold-commit for round {} with {} commits", self.round_number, count.committed); Some(digest) } else { None @@ -370,6 +375,7 @@ impl Accumulator Accumulator::new(1, 7, AuthorityId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + for i in 0..7 { + accumulator.import_message(LocalizedVote { + sender: AuthorityId(i), + signature: Signature(999, i), + vote: Vote::Prepare(1, Digest(999)), + }.into()).unwrap(); + } + + match accumulator.state() { + &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + + accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { + sender: AuthorityId(8), + full_signature: Signature(999, 8), + digest_signature: Signature(999, 8), + round_number: 1, + proposal: Candidate(999), + digest: Digest(999), + })).unwrap(); + + match accumulator.state() { + &State::Prepared(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + } + + #[test] + fn propose_after_committed_does_not_clobber_state() { + let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + for i in 0..7 { + accumulator.import_message(LocalizedVote { + sender: AuthorityId(i), + signature: Signature(999, i), + vote: Vote::Commit(1, Digest(999)), + }.into()).unwrap(); + } + + match accumulator.state() { + &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + + accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { + sender: AuthorityId(8), + full_signature: Signature(999, 8), + digest_signature: Signature(999, 8), + round_number: 1, + proposal: Candidate(999), + digest: Digest(999), + })).unwrap(); + + match accumulator.state() { + &State::Committed(ref j) => assert_eq!(j.digest, Digest(999)), + s => panic!("wrong state: {:?}", s), + } + } + + #[test] + fn propose_after_advance_does_not_clobber_state() { + let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); + assert_eq!(accumulator.state(), &State::Begin); + + for i in 0..7 { + accumulator.import_message(LocalizedVote { + sender: AuthorityId(i), + signature: Signature(1, i), + vote: Vote::AdvanceRound(1), + }.into()).unwrap(); + } + + match accumulator.state() { + &State::Advanced(_) => {} + s => panic!("wrong state: {:?}", s), + } + + accumulator.import_message(LocalizedMessage::Propose(LocalizedProposal { + sender: AuthorityId(8), + full_signature: Signature(999, 8), + digest_signature: Signature(999, 8), + round_number: 1, + proposal: Candidate(999), + digest: Digest(999), + })).unwrap(); + + match accumulator.state() { + &State::Advanced(_) => {} + s => panic!("wrong state: {:?}", s), + } + } + #[test] fn begin_to_advance() { let mut accumulator = Accumulator::::new(1, 7, AuthorityId(8)); diff --git a/substrate/bft/src/generic/mod.rs b/substrate/bft/src/generic/mod.rs index 0d0b1058934a6..0795cbfc62291 100644 --- a/substrate/bft/src/generic/mod.rs +++ b/substrate/bft/src/generic/mod.rs @@ -418,6 +418,7 @@ impl Strategy { // poll until either completion or state doesn't change. loop { + trace!(target: "bft", "Polling BFT logic. State={:?}", last_watermark); match self.poll_once(context, sending)? { Async::Ready(x) => return Ok(Async::Ready(x)), Async::NotReady => { @@ -683,6 +684,7 @@ impl Strategy { fn advance_to_round(&mut self, context: &C, round: usize) { assert!(round > self.current_accumulator.round_number()); + trace!(target: "bft", "advancing to round {}", round); let threshold = self.nodes - self.max_faulty; @@ -790,7 +792,6 @@ impl Future for Agreement self.poll() } Async::NotReady => { - Ok(Async::NotReady) } } diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index db8871bff39cb..327cfc0ad0d47 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -329,6 +329,12 @@ impl BftService } } + /// Get the local Authority ID. + pub fn local_id(&self) -> AuthorityId { + // TODO: based on a header and some keystore. + self.key.public().0 + } + /// Signal that a valid block with the given header has been imported. /// /// If the local signing key is an authority, this will begin the consensus process to build a @@ -350,7 +356,7 @@ impl BftService let max_faulty = max_faulty_of(n); trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty); - let local_id = self.key.public().0; + let local_id = self.local_id(); if !authorities.contains(&local_id) { // cancel current agreement diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index 8234ef3bf25c2..af43e396322e1 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -139,7 +139,9 @@ impl Consensus { pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) { if self.messages.contains_key(&hash) { trace!(target:"sync", "Ignored already known BFT message from {}", peer_id); + return; } + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { peer.known_messages.insert(hash); // TODO: validate signature?