diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index d2f67efdf74d9..c2280f2eaa111 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -22,7 +22,6 @@ use std::thread; use std::time::{Duration, Instant}; use std::sync::Arc; -use std::collections::{HashMap, VecDeque}; use futures::{future, Future, Stream, Sink, Async, Canceled, Poll}; use parking_lot::Mutex; use substrate_network as net; @@ -41,7 +40,6 @@ use error; const TIMER_DELAY_MS: u64 = 5000; const TIMER_INTERVAL_MS: u64 = 500; -const MESSAGE_LIFETIME_SEC: u64 = 10; struct BftSink { network: Arc, @@ -49,49 +47,9 @@ struct BftSink { _e: ::std::marker::PhantomData, } -#[derive(Clone)] -struct SharedMessageCollection { - /// Messages for consensus over a block with known hash. Also holds timestamp of the first message. - messages: Arc)>>>, -} - -impl SharedMessageCollection { - fn new() -> SharedMessageCollection { - SharedMessageCollection { - messages: Arc::new(Mutex::new(HashMap::new())), - } - } - - fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec) -> Messages { - Messages { - messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new), - parent_hash, - network_stream: stream, - authorities: authorities, - collection: self.clone(), - } - } - - fn push(&self, message: net::LocalizedBftMessage) { - self.messages.lock() - .entry(message.parent_hash) - .or_insert_with(|| (Instant::now(), VecDeque::new())) - .1.push_back(message); - } - - fn collect_garbage(&self) { - let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC); - let now = Instant::now(); - self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration); - } -} - struct Messages { - parent_hash: HeaderHash, - messages: VecDeque, network_stream: net::BftMessageStream, authorities: Vec, - collection: SharedMessageCollection, } impl Stream for Messages { @@ -99,14 +57,6 @@ impl Stream for Messages { type Error = bft::Error; fn poll(&mut self) -> Poll, Self::Error> { - // push buffered messages first - while let Some(message) = self.messages.pop_front() { - match process_message(message, &self.authorities) { - Ok(message) => return Ok(Async::Ready(Some(message))), - Err(e) => debug!("Message validation failed: {:?}", e), - } - } - // check the network loop { match self.network_stream.poll() { @@ -114,15 +64,11 @@ impl Stream for Messages { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end. Ok(Async::Ready(Some(message))) => { - if message.parent_hash == self.parent_hash { - match process_message(message, &self.authorities) { - Ok(message) => return Ok(Async::Ready(Some(message))), - Err(e) => { - debug!("Message validation failed: {:?}", e); - } + match process_message(message, &self.authorities) { + Ok(message) => return Ok(Async::Ready(Some(message))), + Err(e) => { + debug!("Message validation failed: {:?}", e); } - } else { - self.collection.push(message); } } } @@ -226,18 +172,17 @@ fn start_bft( client: &bft::Authorities, network: Arc, bft_service: &BftService, - messages: SharedMessageCollection ) where F: bft::ProposerFactory + 'static, C: bft::BlockImport + bft::Authorities + 'static, ::Error: ::std::fmt::Debug, ::Error: ::std::fmt::Display + Into, { - let hash = header.blake2_256().into(); - if bft_service.live_agreement().map_or(false, |h| h == hash) { + let parent_hash = header.blake2_256().into(); + if bft_service.live_agreement().map_or(false, |h| h == parent_hash) { return; } - let authorities = match client.authorities(&BlockId::Hash(hash)) { + let authorities = match client.authorities(&BlockId::Hash(parent_hash)) { Ok(authorities) => authorities, Err(e) => { debug!("Error reading authorities: {:?}", e); @@ -245,12 +190,16 @@ fn start_bft( } }; - let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into()); - let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() }; - match bft_service.build_upon(&header, input, output) { + let input = Messages { + network_stream: network.bft_messages(parent_hash), + authorities, + }; + + let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() }; + match bft_service.build_upon(&header, input.map_err(Into::into), output) { Ok(Some(bft)) => handle.spawn(bft), Ok(None) => {}, - Err(e) => debug!(target: "bft","BFT agreement error: {:?}", e), + Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e), } } @@ -281,7 +230,6 @@ impl Service { network: Network(network.clone()), handle: core.handle(), }; - let messages = SharedMessageCollection::new(); let bft_service = Arc::new(BftService::new(client.clone(), key, factory)); let notifications = { @@ -289,11 +237,10 @@ impl Service { let network = network.clone(); let client = client.clone(); let bft_service = bft_service.clone(); - let messages = messages.clone(); client.import_notification_stream().for_each(move |notification| { if notification.is_new_best { - start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone()); + start_bft(¬ification.header, handle.clone(), &*client, network.clone(), &*bft_service); } Ok(()) }) @@ -316,16 +263,14 @@ impl Service { let c = client.clone(); let s = bft_service.clone(); let n = network.clone(); - let m = messages.clone(); let handle = core.handle(); interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { if let Ok(best_block) = c.best_block_header() { let hash = best_block.blake2_256(); - m.collect_garbage(); if hash == prev_best { debug!("Starting consensus round after a timeout"); - start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone()); + start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s); } prev_best = hash; } diff --git a/substrate/bft/src/generic/accumulator.rs b/substrate/bft/src/generic/accumulator.rs index 64c4f22254898..e610faa0b8eb1 100644 --- a/substrate/bft/src/generic/accumulator.rs +++ b/substrate/bft/src/generic/accumulator.rs @@ -339,6 +339,7 @@ impl Accumulator= self.threshold { + trace!(target: "bft", "observed threshold-commit for round {} with {} commits", self.round_number, count.committed); Some(digest) } else { None diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index a0ffaf3968ad1..db8871bff39cb 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -348,6 +348,7 @@ impl BftService let n = authorities.len(); let max_faulty = max_faulty_of(n); + trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty); let local_id = self.key.public().0; diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index 7d13049ed4189..8234ef3bf25c2 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -46,7 +46,7 @@ pub struct Consensus { peers: HashMap, our_candidate: Option<(Hash, Vec)>, statement_sink: Option>, - bft_message_sink: Option>, + bft_message_sink: Option<(mpsc::UnboundedSender, Hash)>, messages: HashMap, } @@ -143,26 +143,41 @@ impl Consensus { if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { peer.known_messages.insert(hash); // TODO: validate signature? - if let Some(sink) = self.bft_message_sink.take() { - if let Err(e) = sink.unbounded_send(message.clone()) { - trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e); - } else { - self.bft_message_sink = Some(sink); + if let Some((sink, parent_hash)) = self.bft_message_sink.take() { + if message.parent_hash == parent_hash { + if let Err(e) = sink.unbounded_send(message.clone()) { + trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e); + } else { + self.bft_message_sink = Some((sink, parent_hash)); + } } } } else { trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id); return; } + let message = Message::BftMessage(message); self.register_message(hash.clone(), message.clone()); // Propagate to other peers. self.propagate(io, protocol, message, hash); } - pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver{ + pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver{ let (sink, stream) = mpsc::unbounded(); - self.bft_message_sink = Some(sink); + + for (_, message) in self.messages.iter() { + let bft_message = match *message { + (_, Message::BftMessage(ref msg)) => msg, + _ => continue, + }; + + if bft_message.parent_hash == parent_hash { + sink.unbounded_send(bft_message.clone()).expect("receiving end known to be open; qed"); + } + } + + self.bft_message_sink = Some((sink, parent_hash)); stream } diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index abbb41d2f6f70..75498f37d6505 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -317,8 +317,8 @@ impl Protocol { } /// See `ConsensusService` trait. - pub fn bft_messages(&self) -> BftMessageStream { - self.consensus.lock().bft_messages() + pub fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream { + self.consensus.lock().bft_messages(parent_hash) } /// See `ConsensusService` trait. diff --git a/substrate/network/src/service.rs b/substrate/network/src/service.rs index 6d8f5bea115c5..952b3b8d9777c 100644 --- a/substrate/network/src/service.rs +++ b/substrate/network/src/service.rs @@ -91,8 +91,9 @@ pub trait ConsensusService: Send + Sync { /// Pass `None` to clear the candidate. fn set_local_candidate(&self, candidate: Option<(Hash, Vec)>); - /// Get BFT message stream. - fn bft_messages(&self) -> BftMessageStream; + /// Get BFT message stream for messages corresponding to consensus on given + /// parent hash. + fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream; /// Send out a BFT message. fn send_bft_message(&self, message: LocalizedBftMessage); } @@ -254,8 +255,8 @@ impl ConsensusService for Service { self.handler.protocol.set_local_candidate(candidate) } - fn bft_messages(&self) -> BftMessageStream { - self.handler.protocol.bft_messages() + fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream { + self.handler.protocol.bft_messages(parent_hash) } fn send_bft_message(&self, message: LocalizedBftMessage) { diff --git a/substrate/network/src/test/consensus.rs b/substrate/network/src/test/consensus.rs new file mode 100644 index 0000000000000..0733e6d37cc51 --- /dev/null +++ b/substrate/network/src/test/consensus.rs @@ -0,0 +1,49 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; +use message::*; +use futures::Stream; + +#[test] +fn bft_messages_include_those_sent_before_asking_for_stream() { + let mut config = ::config::ProtocolConfig::default(); + config.roles = ::service::Role::VALIDATOR | ::service::Role::FULL; + + let mut net = TestNet::new_with_config(2, config); + net.sync(); // necessary for handshaking + + let peer = net.peer(0); + let mut io = TestIo::new(&peer.queue, None); + let bft_message = BftMessage::Consensus(SignedConsensusMessage::Vote(SignedConsensusVote { + vote: ConsensusVote::AdvanceRound(0), + sender: [0; 32], + signature: Default::default(), + })); + + let localized = LocalizedBftMessage { + message: bft_message, + parent_hash: [1; 32].into(), + }; + + + let as_bytes = ::serde_json::to_vec(&Message::BftMessage(localized.clone())).unwrap(); + peer.sync.handle_packet(&mut io, 1, &as_bytes[..]); + + let stream = peer.sync.bft_messages([1; 32].into()); + + assert_eq!(stream.wait().next(), Some(Ok(localized))); +} diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index 36c4ec34d5cb1..96b56ce45a93f 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +mod consensus; mod sync; use std::collections::{VecDeque, HashSet, HashMap};