Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 17 additions & 72 deletions polkadot/consensus/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use std::thread;
use std::time::{Duration, Instant};
use std::sync::Arc;
use std::collections::{HashMap, VecDeque};
use futures::{future, Future, Stream, Sink, Async, Canceled, Poll};
use parking_lot::Mutex;
use substrate_network as net;
Expand All @@ -41,88 +40,35 @@ use error;

const TIMER_DELAY_MS: u64 = 5000;
const TIMER_INTERVAL_MS: u64 = 500;
const MESSAGE_LIFETIME_SEC: u64 = 10;

struct BftSink<E> {
network: Arc<net::ConsensusService>,
parent_hash: HeaderHash,
_e: ::std::marker::PhantomData<E>,
}

#[derive(Clone)]
struct SharedMessageCollection {
/// Messages for consensus over a block with known hash. Also holds timestamp of the first message.
messages: Arc<Mutex<HashMap<HeaderHash, (Instant, VecDeque<net::LocalizedBftMessage>)>>>,
}

impl SharedMessageCollection {
fn new() -> SharedMessageCollection {
SharedMessageCollection {
messages: Arc::new(Mutex::new(HashMap::new())),
}
}

fn select(&self, parent_hash: HeaderHash, stream: net::BftMessageStream, authorities: Vec<AuthorityId>) -> Messages {
Messages {
messages: self.messages.lock().remove(&parent_hash).map(|(_, m)| m).unwrap_or_else(VecDeque::new),
parent_hash,
network_stream: stream,
authorities: authorities,
collection: self.clone(),
}
}

fn push(&self, message: net::LocalizedBftMessage) {
self.messages.lock()
.entry(message.parent_hash)
.or_insert_with(|| (Instant::now(), VecDeque::new()))
.1.push_back(message);
}

fn collect_garbage(&self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SEC);
let now = Instant::now();
self.messages.lock().retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
}
}

struct Messages {
parent_hash: HeaderHash,
messages: VecDeque<net::LocalizedBftMessage>,
network_stream: net::BftMessageStream,
authorities: Vec<AuthorityId>,
collection: SharedMessageCollection,
}

impl Stream for Messages {
type Item = bft::Communication;
type Error = bft::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
// push buffered messages first
while let Some(message) = self.messages.pop_front() {
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
Err(e) => debug!("Message validation failed: {:?}", e),
}
}

// check the network
loop {
match self.network_stream.poll() {
Err(_) => return Err(bft::InputStreamConcluded.into()),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(None)) => return Ok(Async::NotReady), // the input stream for agreements is never meant to logically end.
Ok(Async::Ready(Some(message))) => {
if message.parent_hash == self.parent_hash {
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
Err(e) => {
debug!("Message validation failed: {:?}", e);
}
match process_message(message, &self.authorities) {
Ok(message) => return Ok(Async::Ready(Some(message))),
Err(e) => {
debug!("Message validation failed: {:?}", e);
}
} else {
self.collection.push(message);
}
}
}
Expand Down Expand Up @@ -226,31 +172,34 @@ fn start_bft<F, C>(
client: &bft::Authorities,
network: Arc<net::ConsensusService>,
bft_service: &BftService<F, C>,
messages: SharedMessageCollection
) where
F: bft::ProposerFactory + 'static,
C: bft::BlockImport + bft::Authorities + 'static,
<F as bft::ProposerFactory>::Error: ::std::fmt::Debug,
<F::Proposer as bft::Proposer>::Error: ::std::fmt::Display + Into<error::Error>,
{
let hash = header.blake2_256().into();
if bft_service.live_agreement().map_or(false, |h| h == hash) {
let parent_hash = header.blake2_256().into();
if bft_service.live_agreement().map_or(false, |h| h == parent_hash) {
return;
}
let authorities = match client.authorities(&BlockId::Hash(hash)) {
let authorities = match client.authorities(&BlockId::Hash(parent_hash)) {
Ok(authorities) => authorities,
Err(e) => {
debug!("Error reading authorities: {:?}", e);
return;
}
};

let input = messages.select(hash, network.bft_messages(), authorities).map_err(|e| e.into());
let output = BftSink { network: network, parent_hash: hash.clone(), _e: Default::default() };
match bft_service.build_upon(&header, input, output) {
let input = Messages {
network_stream: network.bft_messages(parent_hash),
authorities,
};

let output = BftSink { network: network, parent_hash: parent_hash, _e: Default::default() };
match bft_service.build_upon(&header, input.map_err(Into::into), output) {
Ok(Some(bft)) => handle.spawn(bft),
Ok(None) => {},
Err(e) => debug!(target: "bft","BFT agreement error: {:?}", e),
Err(e) => debug!(target: "bft", "BFT agreement error: {:?}", e),
}
}

Expand Down Expand Up @@ -281,19 +230,17 @@ impl Service {
network: Network(network.clone()),
handle: core.handle(),
};
let messages = SharedMessageCollection::new();
let bft_service = Arc::new(BftService::new(client.clone(), key, factory));

let notifications = {
let handle = core.handle();
let network = network.clone();
let client = client.clone();
let bft_service = bft_service.clone();
let messages = messages.clone();

client.import_notification_stream().for_each(move |notification| {
if notification.is_new_best {
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service, messages.clone());
start_bft(&notification.header, handle.clone(), &*client, network.clone(), &*bft_service);
}
Ok(())
})
Expand All @@ -316,16 +263,14 @@ impl Service {
let c = client.clone();
let s = bft_service.clone();
let n = network.clone();
let m = messages.clone();
let handle = core.handle();

interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| {
if let Ok(best_block) = c.best_block_header() {
let hash = best_block.blake2_256();
m.collect_garbage();
if hash == prev_best {
debug!("Starting consensus round after a timeout");
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s, m.clone());
start_bft(&best_block, handle.clone(), &*c, n.clone(), &*s);
}
prev_best = hash;
}
Expand Down
1 change: 1 addition & 0 deletions substrate/bft/src/generic/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ impl<Candidate, Digest, AuthorityId, Signature> Accumulator<Candidate, Digest, A
count.committed += 1;

if count.committed >= self.threshold {
trace!(target: "bft", "observed threshold-commit for round {} with {} commits", self.round_number, count.committed);
Some(digest)
} else {
None
Expand Down
1 change: 1 addition & 0 deletions substrate/bft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ impl<P, I> BftService<P, I>

let n = authorities.len();
let max_faulty = max_faulty_of(n);
trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty);

let local_id = self.key.public().0;

Expand Down
31 changes: 23 additions & 8 deletions substrate/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub struct Consensus {
peers: HashMap<PeerId, PeerConsensus>,
our_candidate: Option<(Hash, Vec<u8>)>,
statement_sink: Option<mpsc::UnboundedSender<message::Statement>>,
bft_message_sink: Option<mpsc::UnboundedSender<message::LocalizedBftMessage>>,
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: HashMap<Hash, (Instant, message::Message)>,
}

Expand Down Expand Up @@ -143,26 +143,41 @@ impl Consensus {
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.known_messages.insert(hash);
// TODO: validate signature?
if let Some(sink) = self.bft_message_sink.take() {
if let Err(e) = sink.unbounded_send(message.clone()) {
trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e);
} else {
self.bft_message_sink = Some(sink);
if let Some((sink, parent_hash)) = self.bft_message_sink.take() {
if message.parent_hash == parent_hash {
if let Err(e) = sink.unbounded_send(message.clone()) {
trace!(target:"sync", "Error broadcasting BFT message notification: {:?}", e);
} else {
self.bft_message_sink = Some((sink, parent_hash));
}
}
}
} else {
trace!(target:"sync", "Ignored BFT statement from unregistered peer {}", peer_id);
return;
}

let message = Message::BftMessage(message);
self.register_message(hash.clone(), message.clone());
// Propagate to other peers.
self.propagate(io, protocol, message, hash);
}

pub fn bft_messages(&mut self) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
let (sink, stream) = mpsc::unbounded();
self.bft_message_sink = Some(sink);

for (_, message) in self.messages.iter() {
let bft_message = match *message {
(_, Message::BftMessage(ref msg)) => msg,
_ => continue,
};

if bft_message.parent_hash == parent_hash {
sink.unbounded_send(bft_message.clone()).expect("receiving end known to be open; qed");
}
}

self.bft_message_sink = Some((sink, parent_hash));
stream
}

Expand Down
4 changes: 2 additions & 2 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ impl Protocol {
}

/// See `ConsensusService` trait.
pub fn bft_messages(&self) -> BftMessageStream {
self.consensus.lock().bft_messages()
pub fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream {
self.consensus.lock().bft_messages(parent_hash)
}

/// See `ConsensusService` trait.
Expand Down
9 changes: 5 additions & 4 deletions substrate/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ pub trait ConsensusService: Send + Sync {
/// Pass `None` to clear the candidate.
fn set_local_candidate(&self, candidate: Option<(Hash, Vec<u8>)>);

/// Get BFT message stream.
fn bft_messages(&self) -> BftMessageStream;
/// Get BFT message stream for messages corresponding to consensus on given
/// parent hash.
fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream;
/// Send out a BFT message.
fn send_bft_message(&self, message: LocalizedBftMessage);
}
Expand Down Expand Up @@ -254,8 +255,8 @@ impl ConsensusService for Service {
self.handler.protocol.set_local_candidate(candidate)
}

fn bft_messages(&self) -> BftMessageStream {
self.handler.protocol.bft_messages()
fn bft_messages(&self, parent_hash: Hash) -> BftMessageStream {
self.handler.protocol.bft_messages(parent_hash)
}

fn send_bft_message(&self, message: LocalizedBftMessage) {
Expand Down
49 changes: 49 additions & 0 deletions substrate/network/src/test/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2017 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use super::*;
use message::*;
use futures::Stream;

#[test]
fn bft_messages_include_those_sent_before_asking_for_stream() {
let mut config = ::config::ProtocolConfig::default();
config.roles = ::service::Role::VALIDATOR | ::service::Role::FULL;

let mut net = TestNet::new_with_config(2, config);
net.sync(); // necessary for handshaking

let peer = net.peer(0);
let mut io = TestIo::new(&peer.queue, None);
let bft_message = BftMessage::Consensus(SignedConsensusMessage::Vote(SignedConsensusVote {
vote: ConsensusVote::AdvanceRound(0),
sender: [0; 32],
signature: Default::default(),
}));

let localized = LocalizedBftMessage {
message: bft_message,
parent_hash: [1; 32].into(),
};


let as_bytes = ::serde_json::to_vec(&Message::BftMessage(localized.clone())).unwrap();
peer.sync.handle_packet(&mut io, 1, &as_bytes[..]);

let stream = peer.sync.bft_messages([1; 32].into());

assert_eq!(stream.wait().next(), Some(Ok(localized)));
}
1 change: 1 addition & 0 deletions substrate/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

mod consensus;
mod sync;

use std::collections::{VecDeque, HashSet, HashMap};
Expand Down