diff --git a/core/finality-grandpa/src/communication.rs b/core/finality-grandpa/src/communication.rs deleted file mode 100644 index f498b51460ac..000000000000 --- a/core/finality-grandpa/src/communication.rs +++ /dev/null @@ -1,543 +0,0 @@ -// Copyright 2017-2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Incoming message streams that verify signatures, and outgoing message streams -//! that sign or re-shape. - -use std::collections::HashMap; -use std::sync::Arc; - -use grandpa::VoterSet; -use grandpa::Message::{Prevote, Precommit}; -use futures::prelude::*; -use futures::sync::mpsc; -use log::{debug, trace}; -use parity_codec::{Encode, Decode}; -use substrate_primitives::{ed25519, Pair}; -use substrate_telemetry::{telemetry, CONSENSUS_INFO}; -use runtime_primitives::traits::Block as BlockT; -use tokio::timer::Interval; -use crate::{Error, Network, Message, SignedMessage, Commit, - CompactCommit, GossipMessage, FullCommitMessage, VoteOrPrecommitMessage}; -use ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; - -fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { - (message, round, set_id).encode() -} - -#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] -struct Round(u64); -#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)] -struct SetId(u64); - -enum Broadcast { - // round, set id, encoded commit. - Commit(Round, SetId, Vec), - // round, set id, encoded signed message. - Message(Round, SetId, Vec), - // round, set id, announcement of block hash that should be downloaded - Announcement(Round, SetId, Block::Hash), - // round, set id being dropped. - DropRound(Round, SetId), - // set_id being dropped. - DropSet(SetId), -} - -impl Broadcast { - fn set_id(&self) -> SetId { - match *self { - Broadcast::Commit(_, s, _) => s, - Broadcast::Message(_, s, _) => s, - Broadcast::Announcement(_, s, _) => s, - Broadcast::DropRound(_, s) => s, - Broadcast::DropSet(s) => s, - } - } -} - -/// Produces a future that should be run in the background and proxies -/// and rebroadcasts messages. -pub(crate) fn rebroadcasting_network>(network: N) -> (BroadcastWorker, BroadcastHandle) { - use std::time::Duration; - const REBROADCAST_PERIOD: Duration = Duration::from_secs(60); - - let (tx, rx) = mpsc::unbounded(); - - ( - BroadcastWorker { - interval: Interval::new_interval(REBROADCAST_PERIOD), - set_id: SetId(0), // will be overwritten on first item to broadcast. - last_commit: None, - round_messages: (Round(0), Vec::new()), - announcements: HashMap::new(), - network: network.clone(), - incoming_broadcast: rx, - }, - BroadcastHandle { - relay: tx, - network, - }, - ) -} - -// A worker which broadcasts messages to the background, potentially -// rebroadcasting. -#[must_use = "network rebroadcast future must be driven to completion"] -pub(crate) struct BroadcastWorker> { - interval: Interval, - set_id: SetId, - last_commit: Option<(Round, Vec)>, - round_messages: (Round, Vec>), - announcements: HashMap, - network: N, - incoming_broadcast: mpsc::UnboundedReceiver>, -} - -/// A handle used by communication work to broadcast to network. -#[derive(Clone)] -pub(crate) struct BroadcastHandle { - relay: mpsc::UnboundedSender>, - network: N, -} - -impl> Future for BroadcastWorker { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll<(), Error> { - { - let mut rebroadcast = false; - loop { - match self.interval.poll().map_err(Error::Timer)? { - Async::NotReady => break, - Async::Ready(_) => { rebroadcast = true; } - } - } - - if rebroadcast { - let SetId(set_id) = self.set_id; - if let Some((Round(c_round), ref c_commit)) = self.last_commit { - self.network.send_commit(c_round, set_id, c_commit.clone(), true); - } - - let Round(round) = self.round_messages.0; - for message in self.round_messages.1.iter().cloned() { - self.network.send_message(round, set_id, message, true); - } - - for (&announce_hash, &Round(round)) in &self.announcements { - self.network.announce(round, set_id, announce_hash); - } - } - } - - loop { - match self.incoming_broadcast.poll().expect("UnboundedReceiver does not yield errors; qed") { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Err(Error::Network( - "all broadcast handles dropped, connection to network severed".into() - )), - Async::Ready(Some(item)) => { - if item.set_id() > self.set_id { - self.set_id = item.set_id(); - self.last_commit = None; - self.round_messages = (Round(0), Vec::new()); - self.announcements.clear(); - } - - match item { - Broadcast::Commit(round, set_id, commit) => { - if self.set_id == set_id { - if round >= self.last_commit.as_ref() - .map_or(Round(0), |&(r, _)| r) - { - self.last_commit = Some((round, commit.clone())); - } - } - - // always send out to network. - self.network.send_commit(round.0, self.set_id.0, commit, false); - } - Broadcast::Message(round, set_id, message) => { - if self.set_id == set_id { - if round > self.round_messages.0 { - self.round_messages = (round, vec![message.clone()]); - } else if round == self.round_messages.0 { - self.round_messages.1.push(message.clone()); - }; - - // ignore messages from earlier rounds. - } - - // always send out to network. - self.network.send_message(round.0, set_id.0, message, false); - } - Broadcast::Announcement(round, set_id, hash) => { - if self.set_id == set_id { - self.announcements.insert(hash, round); - } - - // always send out. - self.network.announce(round.0, set_id.0, hash); - } - Broadcast::DropRound(round, set_id) => { - // stop making announcements for any dead rounds. - self.announcements.retain(|_, &mut r| r > round); - self.network.drop_round_messages(round.0, set_id.0); - } - Broadcast::DropSet(set_id) => { - // stop making announcements for any dead rounds. - self.network.drop_set_messages(set_id.0); - } - } - } - } - } - } -} - -impl> Network for BroadcastHandle { - type In = N::In; - - fn messages_for(&self, round: u64, set_id: u64) -> Self::In { - self.network.messages_for(round, set_id) - } - - fn send_message(&self, round: u64, set_id: u64, message: Vec, _force: bool) { - let _ = self.relay.unbounded_send(Broadcast::Message(Round(round), SetId(set_id), message)); - } - - fn drop_round_messages(&self, round: u64, set_id: u64) { - let _ = self.relay.unbounded_send(Broadcast::DropRound(Round(round), SetId(set_id))); - } - - fn drop_set_messages(&self, set_id: u64) { - let _ = self.relay.unbounded_send(Broadcast::DropSet(SetId(set_id))); - } - - fn commit_messages(&self, set_id: u64) -> Self::In { - self.network.commit_messages(set_id) - } - - fn send_commit(&self, round: u64, set_id: u64, message: Vec, _force: bool) { - let _ = self.relay.unbounded_send(Broadcast::Commit(Round(round), SetId(set_id), message)); - } - - fn announce(&self, round: u64, set_id: u64, block: B::Hash) { - let _ = self.relay.unbounded_send( - Broadcast::Announcement(Round(round), SetId(set_id), block) - ); - } -} - -// check a message. -pub(crate) fn check_message_sig( - message: &Message, - id: &AuthorityId, - signature: &AuthoritySignature, - round: u64, - set_id: u64, -) -> Result<(), ()> { - let as_public = AuthorityId::from_raw(id.0); - let encoded_raw = localized_payload(round, set_id, message); - if ed25519::Pair::verify(signature, &encoded_raw, as_public) { - Ok(()) - } else { - debug!(target: "afg", "Bad signature on message from {:?}", id); - Err(()) - } -} - -/// converts a message stream into a stream of signed messages. -/// the output stream checks signatures also. -pub(crate) fn checked_message_stream( - inner: S, - voters: Arc>, -) - -> impl Stream,Error=Error> where - S: Stream,Error=()> -{ - inner - .filter_map(|raw| { - let decoded = GossipMessage::::decode(&mut &raw[..]); - if decoded.is_none() { - debug!(target: "afg", "Skipping malformed message {:?}", raw); - } - decoded - }) - .and_then(move |msg| { - match msg { - GossipMessage::VoteOrPrecommit(msg) => { - // check signature. - if !voters.contains_key(&msg.message.id) { - debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); - return Ok(None); - } - - match &msg.message.message { - Prevote(prevote) => { - telemetry!(CONSENSUS_INFO; "afg.received_prevote"; - "voter" => ?format!("{}", msg.message.id), - "target_number" => ?prevote.target_number, - "target_hash" => ?prevote.target_hash, - ); - }, - Precommit(precommit) => { - telemetry!(CONSENSUS_INFO; "afg.received_precommit"; - "voter" => ?format!("{}", msg.message.id), - "target_number" => ?precommit.target_number, - "target_hash" => ?precommit.target_hash, - ); - }, - }; - - Ok(Some(msg.message)) - } - _ => { - debug!(target: "afg", "Skipping unknown message type"); - return Ok(None); - } - } - }) - .filter_map(|x| x) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) -} - -pub(crate) struct OutgoingMessages> { - round: u64, - set_id: u64, - locals: Option<(Arc, AuthorityId)>, - sender: mpsc::UnboundedSender>, - network: N, -} - -impl> Sink for OutgoingMessages -{ - type SinkItem = Message; - type SinkError = Error; - - fn start_send(&mut self, msg: Message) -> StartSend, Error> { - // when locals exist, sign messages on import - if let Some((ref pair, ref local_id)) = self.locals { - let encoded = localized_payload(self.round, self.set_id, &msg); - let signature = pair.sign(&encoded[..]); - - let target_hash = msg.target().0.clone(); - let signed = SignedMessage:: { - message: msg, - signature, - id: local_id.clone(), - }; - - let message = GossipMessage::VoteOrPrecommit(VoteOrPrecommitMessage:: { - message: signed.clone(), - round: self.round, - set_id: self.set_id, - }); - - // announce our block hash to peers and propagate the - // message. - self.network.announce(self.round, self.set_id, target_hash); - self.network.send_message(self.round, self.set_id, message.encode(), false); - - // forward the message to the inner sender. - let _ = self.sender.unbounded_send(signed); - } - - Ok(AsyncSink::Ready) - } - - fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } - - fn close(&mut self) -> Poll<(), Error> { - // ignore errors since we allow this inner sender to be closed already. - self.sender.close().or_else(|_| Ok(Async::Ready(()))) - } -} - -impl> Drop for OutgoingMessages { - fn drop(&mut self) { - self.network.drop_round_messages(self.round, self.set_id); - } -} - -/// A sink for outgoing messages. This signs the messages with the key, -/// if we are an authority. A stream for the signed messages is also returned. -/// -/// A future can push unsigned messages into the sink. They will be automatically -/// broadcast to the network. The returned stream should be combined with other input. -pub(crate) fn outgoing_messages>( - round: u64, - set_id: u64, - local_key: Option>, - voters: Arc>, - network: N, -) -> ( - impl Stream,Error=Error>, - OutgoingMessages, -) { - let locals = local_key.and_then(|pair| { - let public = pair.public(); - let id = AuthorityId(public.0); - if voters.contains_key(&id) { - Some((pair, id)) - } else { - None - } - }); - - let (tx, rx) = mpsc::unbounded(); - let outgoing = OutgoingMessages:: { - round, - set_id, - network, - locals, - sender: tx, - }; - - let rx = rx.map_err(move |()| Error::Network( - format!("Failed to receive on unbounded receiver for round {}", round) - )); - - (rx, outgoing) -} - -fn check_compact_commit( - msg: CompactCommit, - voters: &VoterSet, -) -> Option> { - if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() { - debug!(target: "afg", "Skipping malformed compact commit"); - return None; - } - - // check signatures on all contained precommits. - for (_, ref id) in &msg.auth_data { - if !voters.contains_key(id) { - debug!(target: "afg", "Skipping commit containing unknown voter {}", id); - return None; - } - } - - Some(msg) -} - -/// A stream for incoming commit messages. This checks all the signatures on the -/// messages. -pub(crate) fn checked_commit_stream( - inner: S, - voters: Arc>, -) - -> impl Stream),Error=Error> where - S: Stream,Error=()> -{ - inner - .filter_map(|raw| { - // this could be optimized by decoding piecewise. - let decoded = GossipMessage::::decode(&mut &raw[..]); - if decoded.is_none() { - trace!(target: "afg", "Skipping malformed commit message {:?}", raw); - } - decoded - }) - .filter_map(move |msg| { - match msg { - GossipMessage::Commit(msg) => { - let round = msg.round; - let precommits_signed_by: Vec = - msg.message.auth_data.iter().map(move |(_, a)| { - format!("{}", a) - }).collect(); - telemetry!(CONSENSUS_INFO; "afg.received_commit"; - "contains_precommits_signed_by" => ?precommits_signed_by, - "target_number" => ?msg.message.target_number, - "target_hash" => ?msg.message.target_hash, - ); - check_compact_commit::(msg.message, &*voters).map(move |c| (round, c)) - }, - _ => { - debug!(target: "afg", "Skipping unknown message type"); - return None; - } - } - }) - .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))) -} - -/// An output sink for commit messages. -pub(crate) struct CommitsOut> { - network: N, - set_id: u64, - _marker: ::std::marker::PhantomData, - is_voter: bool, -} - -impl> CommitsOut { - /// Create a new commit output stream. - pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self { - CommitsOut { - network, - set_id, - is_voter, - _marker: Default::default(), - } - } -} - -impl> Sink for CommitsOut { - type SinkItem = (u64, Commit); - type SinkError = Error; - - fn start_send(&mut self, input: (u64, Commit)) -> StartSend { - if !self.is_voter { - return Ok(AsyncSink::Ready); - } - - let (round, commit) = input; - telemetry!(CONSENSUS_INFO; "afg.commit_issued"; - "target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash, - ); - let (precommits, auth_data) = commit.precommits.into_iter() - .map(|signed| (signed.precommit, (signed.signature, signed.id))) - .unzip(); - - let compact_commit = CompactCommit:: { - target_hash: commit.target_hash, - target_number: commit.target_number, - precommits, - auth_data - }; - - let message = GossipMessage::Commit(FullCommitMessage:: { - round: round, - set_id: self.set_id, - message: compact_commit, - }); - - self.network.send_commit(round, self.set_id, Encode::encode(&message), false); - - Ok(AsyncSink::Ready) - } - - fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } - fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } -} - -impl> Drop for CommitsOut { - fn drop(&mut self) { - self.network.drop_set_messages(self.set_id); - } -} diff --git a/core/finality-grandpa/src/communication/gossip.rs b/core/finality-grandpa/src/communication/gossip.rs new file mode 100644 index 000000000000..720c083db792 --- /dev/null +++ b/core/finality-grandpa/src/communication/gossip.rs @@ -0,0 +1,1005 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Gossip and politeness for polite-grandpa. +//! +//! This module implements the following message types: +//! #### Neighbor Packet +//! +//! The neighbor packet is sent to only our neighbors. It contains this information +//! +//! - Current Round +//! - Current voter set ID +//! - Last finalized hash from commit messages. +//! +//! If a peer is at a given voter set, it is impolite to send messages from +//! an earlier voter set. It is extremely impolite to send messages +//! from a future voter set. "future-set" messages can be dropped and ignored. +//! +//! If a peer is at round r, is impolite to send messages about r-2 or earlier and extremely +//! impolite to send messages about r+1 or later. "future-round" messages can +//! be dropped and ignored. +//! +//! It is impolite to send a neighbor packet which moves backwards in protocol state. +//! +//! This is beneficial if it conveys some progress in the protocol state of the peer. +//! +//! #### Prevote / Precommit +//! +//! These are votes within a round. Noting that we receive these messages +//! from our peers who are not necessarily voters, we have to account the benefit +//! based on what they might have seen. +//! +//! #### Propose +//! +//! This is a broadcast by a known voter of the last-round estimate. + +//! #### Commit +//! +//! These are used to announce past agreement of finality. +//! +//! It is impolite to send commits which are earlier than the last commit +//! sent. It is especially impolite to send commits which are invalid, or from +//! a different Set ID than the receiving peer has indicated. +//! +//! Sending a commit is polite when it may finalize something that the receiving peer +//! was not aware of. +//! +//! ## Expiration +//! +//! We keep some amount of recent rounds' messages, but do not accept new ones from rounds +//! older than our current_round - 1. +//! +//! ## Message Validation +//! +//! We only send polite messages to peers, + +use runtime_primitives::traits::{NumberFor, Block as BlockT, Zero}; +use network::consensus_gossip::{self as network_gossip, MessageIntent, ValidatorContext}; +use network::{config::Roles, PeerId}; +use parity_codec::{Encode, Decode}; + +use substrate_telemetry::{telemetry, CONSENSUS_DEBUG}; +use log::{trace, debug}; + +use crate::{CompactCommit, SignedMessage}; +use super::{Round, SetId, Network}; + +use std::collections::{HashMap, VecDeque}; +use std::time::{Duration, Instant}; + +const REBROADCAST_AFTER: Duration = Duration::from_secs(60 * 5); + +/// An outcome of examining a message. +#[derive(Debug, PartialEq, Clone, Copy)] +enum Consider { + /// Accept the message. + Accept, + /// Message is too early. Reject. + RejectPast, + /// Message is from the future. Reject. + RejectFuture, +} + +/// A view of protocol state. +#[derive(Debug)] +struct View { + round: Round, // the current round we are at. + set_id: SetId, // the current voter set id. + last_commit: Option, // commit-finalized block height, if any. +} + +impl Default for View { + fn default() -> Self { + View { + round: Round(0), + set_id: SetId(0), + last_commit: None, + } + } +} + +impl View { + /// Update the set ID. implies a reset to round 0. + fn update_set(&mut self, set_id: SetId) { + if set_id != self.set_id { + self.set_id = set_id; + self.round = Round(0); + } + } + + /// Consider a round and set ID combination under a current view. + fn consider_vote(&self, round: Round, set_id: SetId) -> Consider { + // only from current set + if set_id < self.set_id { return Consider::RejectPast } + if set_id > self.set_id { return Consider::RejectFuture } + + // only r-1 ... r+1 + if round.0 > self.round.0.saturating_add(1) { return Consider::RejectFuture } + if round.0 < self.round.0.saturating_sub(1) { return Consider::RejectPast } + + Consider::Accept + } + + /// Consider a set-id global message. Rounds are not taken into account, but are implicitly + /// because we gate on finalization of a further block than a previous commit. + fn consider_global(&self, set_id: SetId, number: N) -> Consider { + // only from current set + if set_id < self.set_id { return Consider::RejectPast } + if set_id > self.set_id { return Consider::RejectFuture } + + // only commits which claim to prove a higher block number than + // the one we're aware of. + match self.last_commit { + None => Consider::Accept, + Some(ref num) => if num < &number { + Consider::Accept + } else { + Consider::RejectPast + } + } + } +} + +const KEEP_RECENT_ROUNDS: usize = 3; + +/// Tracks topics we keep messages for. +struct KeepTopics { + current_set: SetId, + rounds: VecDeque<(Round, SetId)>, + reverse_map: HashMap, SetId)> +} + +impl KeepTopics { + fn new() -> Self { + KeepTopics { + current_set: SetId(0), + rounds: VecDeque::with_capacity(KEEP_RECENT_ROUNDS + 1), + reverse_map: HashMap::new(), + } + } + + fn push(&mut self, round: Round, set_id: SetId) { + self.current_set = std::cmp::max(self.current_set, set_id); + self.rounds.push_back((round, set_id)); + + // the 1 is for the current round. + while self.rounds.len() > KEEP_RECENT_ROUNDS + 1 { + let _ = self.rounds.pop_front(); + } + + let mut map = HashMap::with_capacity(KEEP_RECENT_ROUNDS + 2); + map.insert(super::global_topic::(self.current_set.0), (None, self.current_set)); + + for &(round, set) in &self.rounds { + map.insert( + super::round_topic::(round.0, set.0), + (Some(round), set) + ); + } + + self.reverse_map = map; + } + + fn topic_info(&self, topic: &B::Hash) -> Option<(Option, SetId)> { + self.reverse_map.get(topic).cloned() + } +} + +// topics to send to a neighbor based on their view. +fn neighbor_topics(view: &View>) -> Vec { + let s = view.set_id; + let mut topics = vec![ + super::global_topic::(s.0), + super::round_topic::(view.round.0, s.0), + ]; + + if view.round.0 != 0 { + let r = Round(view.round.0 - 1); + topics.push(super::round_topic::(r.0, s.0)) + } + + topics +} + +/// Grandpa gossip message type. +/// This is the root type that gets encoded and sent on the network. +#[derive(Debug, Encode, Decode)] +pub(super) enum GossipMessage { + /// Grandpa message with round and set info. + VoteOrPrecommit(VoteOrPrecommitMessage), + /// Grandpa commit message with round and set info. + Commit(FullCommitMessage), + /// A neighbor packet. Not repropagated. + Neighbor(VersionedNeighborPacket>), +} + +/// Network level message with topic information. +#[derive(Debug, Encode, Decode)] +pub(super) struct VoteOrPrecommitMessage { + /// The round this message is from. + pub(super) round: Round, + /// The voter set ID this message is from. + pub(super) set_id: SetId, + /// The message itself. + pub(super) message: SignedMessage, +} + +/// Network level commit message with topic information. +#[derive(Debug, Encode, Decode)] +pub(super) struct FullCommitMessage { + /// The round this message is from. + pub(super) round: Round, + /// The voter set ID this message is from. + pub(super) set_id: SetId, + /// The compact commit message. + pub(super) message: CompactCommit, +} + +/// V1 neighbor packet. Neighbor packets are sent from nodes to their peers +/// and are not repropagated. These contain information about the node's state. +#[derive(Debug, Encode, Decode, Clone)] +pub(super) struct NeighborPacket { + round: Round, + set_id: SetId, + commit_finalized_height: N, +} + +/// A versioned neighbor packet. +#[derive(Debug, Encode, Decode)] +pub(super) enum VersionedNeighborPacket { + #[codec(index = "1")] + V1(NeighborPacket), +} + +impl VersionedNeighborPacket { + fn into_neighbor_packet(self) -> NeighborPacket { + match self { + VersionedNeighborPacket::V1(p) => p, + } + } +} + +// cost scalars for reporting peers. +mod cost { + pub(super) const PAST_REJECTION: i32 = -50; + pub(super) const BAD_SIGNATURE: i32 = -100; + pub(super) const MALFORMED_COMMIT: i32 = -1000; + pub(super) const FUTURE_MESSAGE: i32 = -500; + + pub(super) const INVALID_VIEW_CHANGE: i32 = -500; + pub(super) const PER_UNDECODABLE_BYTE: i32 = -5; + pub(super) const PER_SIGNATURE_CHECKED: i32 = -25; + pub(super) const PER_BLOCK_LOADED: i32 = -10; +} + +// benefit scalars for reporting peers. +mod benefit { + pub(super) const ROUND_MESSAGE: i32 = 100; + pub(super) const BASIC_VALIDATED_COMMIT: i32 = 100; + pub(super) const PER_EQUIVOCATION: i32 = 10; +} + +/// Misbehavior that peers can perform. +/// +/// `cost` gives a cost that can be used to perform cost/benefit analysis of a +/// peer. +#[derive(Clone, Copy, Debug, PartialEq)] +enum Misbehavior { + // invalid neighbor message, considering the last one. + InvalidViewChange, + // could not decode neighbor message. bytes-length of the packet. + UndecodablePacket(i32), + // Bad commit message + BadCommitMessage { + signatures_checked: i32, + blocks_loaded: i32, + equivocations_caught: i32, + }, + // A message received that's from the future relative to our view. + // always misbehavior. + FutureMessage, +} + +impl Misbehavior { + fn cost(&self) -> i32 { + use Misbehavior::*; + + match *self { + InvalidViewChange => cost::INVALID_VIEW_CHANGE, + UndecodablePacket(bytes) => bytes.saturating_mul(cost::PER_UNDECODABLE_BYTE), + BadCommitMessage { signatures_checked, blocks_loaded, equivocations_caught } => { + let cost = cost::PER_SIGNATURE_CHECKED + .saturating_mul(signatures_checked) + .saturating_add(cost::PER_BLOCK_LOADED.saturating_mul(blocks_loaded)); + + let benefit = equivocations_caught.saturating_mul(benefit::PER_EQUIVOCATION); + + (benefit as i32).saturating_add(cost as i32) + }, + FutureMessage => cost::FUTURE_MESSAGE, + } + } +} + +struct PeerInfo { + view: View, +} + +impl PeerInfo { + fn new() -> Self { + PeerInfo { + view: View::default(), + } + } +} + +/// The peers we're connected do in gossip. +struct Peers { + inner: HashMap>, +} + +impl Default for Peers { + fn default() -> Self { + Peers { inner: HashMap::new() } + } +} + +impl Peers { + fn new_peer(&mut self, who: PeerId) { + self.inner.insert(who, PeerInfo::new()); + } + + fn peer_disconnected(&mut self, who: &PeerId) { + self.inner.remove(who); + } + + // returns a reference to the new view, if the peer is known. + fn update_peer_state(&mut self, who: &PeerId, update: NeighborPacket) + -> Result>, Misbehavior> + { + let peer = match self.inner.get_mut(who) { + None => return Ok(None), + Some(p) => p, + }; + + let invalid_change = peer.view.set_id > update.set_id + || peer.view.round > update.round && peer.view.set_id == update.set_id + || peer.view.last_commit.as_ref() > Some(&update.commit_finalized_height); + + if invalid_change { + return Err(Misbehavior::InvalidViewChange); + } + + peer.view = View { + round: update.round, + set_id: update.set_id, + last_commit: Some(update.commit_finalized_height), + }; + + trace!(target: "afg", "Peer {} updated view. Now at {:?}, {:?}", + who, peer.view.round, peer.view.set_id); + + Ok(Some(&peer.view)) + } + + fn update_commit_height(&mut self, who: &PeerId, new_height: N) -> Result<(), Misbehavior> { + let peer = match self.inner.get_mut(who) { + None => return Ok(()), + Some(p) => p, + }; + + if peer.view.last_commit.as_ref() >= Some(&new_height) { + return Err(Misbehavior::InvalidViewChange); + } + peer.view.last_commit = Some(new_height); + + Ok(()) + } + + fn peer<'a>(&'a self, who: &PeerId) -> Option<&'a PeerInfo> { + self.inner.get(who) + } +} + +#[derive(Debug)] +enum Action { + // repropagate under given topic, to the given peers, applying cost/benefit to originator. + Keep(H, i32), + // discard and process. + ProcessAndDiscard(H, i32), + // discard, applying cost/benefit to originator. + Discard(i32), +} + +struct Inner { + local_view: View>, + peers: Peers>, + live_topics: KeepTopics, + config: crate::Config, + next_rebroadcast: Instant, +} + +impl Inner { + fn new(config: crate::Config) -> Self { + Inner { + local_view: View::default(), + peers: Peers::default(), + live_topics: KeepTopics::new(), + next_rebroadcast: Instant::now() + REBROADCAST_AFTER, + config, + } + } + + /// Note a round in a set has started. + fn note_round>(&mut self, round: Round, set_id: SetId, net: &N) { + if self.local_view.round == round && self.local_view.set_id == set_id { + return + } + + debug!(target: "afg", "Voter {} noting beginning of round {:?} to network.", + self.config.name(), (round, set_id)); + + self.local_view.round = round; + self.local_view.set_id = set_id; + + self.live_topics.push(round, set_id); + self.multicast_neighbor_packet(net); + } + + /// Note that a voter set with given ID has started. Does nothing if the last + /// call to the function was with the same `set_id`. + fn note_set>(&mut self, set_id: SetId, net: &N) { + if self.local_view.set_id == set_id { return } + + self.local_view.update_set(set_id); + self.live_topics.push(Round(0), set_id); + self.multicast_neighbor_packet(net); + } + + /// Note that we've imported a commit finalizing a given block. + fn note_commit_finalized>(&mut self, finalized: NumberFor, net: &N) { + if self.local_view.last_commit.as_ref() < Some(&finalized) { + self.local_view.last_commit = Some(finalized); + self.multicast_neighbor_packet(net) + } + } + + fn consider_vote(&self, round: Round, set_id: SetId) -> Consider { + self.local_view.consider_vote(round, set_id) + } + + fn consider_global(&self, set_id: SetId, number: NumberFor) -> Consider { + self.local_view.consider_global(set_id, number) + } + + fn cost_past_rejection(&self, _who: &PeerId, _round: Round, _set_id: SetId) -> i32 { + // hardcoded for now. + cost::PAST_REJECTION + } + + fn validate_round_message(&self, who: &PeerId, full: &VoteOrPrecommitMessage) + -> Action + { + match self.consider_vote(full.round, full.set_id) { + Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()), + Consider::RejectPast => + return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)), + Consider::Accept => {}, + } + + if let Err(()) = super::check_message_sig::( + &full.message.message, + &full.message.id, + &full.message.signature, + full.round.0, + full.set_id.0, + ) { + debug!(target: "afg", "Bad message signature {}", full.message.id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id); + return Action::Discard(cost::BAD_SIGNATURE); + } + + let topic = super::round_topic::(full.round.0, full.set_id.0); + Action::Keep(topic, benefit::ROUND_MESSAGE) + } + + fn validate_commit_message(&mut self, who: &PeerId, full: &FullCommitMessage) + -> Action + { + use grandpa::Message as GrandpaMessage; + + if let Err(misbehavior) = self.peers.update_commit_height(who, full.message.target_number) { + return Action::Discard(misbehavior.cost()); + } + + match self.consider_global(full.set_id, full.message.target_number) { + Consider::RejectFuture => return Action::Discard(Misbehavior::FutureMessage.cost()), + Consider::RejectPast => + return Action::Discard(self.cost_past_rejection(who, full.round, full.set_id)), + Consider::Accept => {}, + } + + if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { + debug!(target: "afg", "Malformed compact commit"); + telemetry!(CONSENSUS_DEBUG; "afg.malformed_compact_commit"; + "precommits_len" => ?full.message.precommits.len(), + "auth_data_len" => ?full.message.auth_data.len(), + "precommits_is_empty" => ?full.message.precommits.is_empty(), + ); + return Action::Discard(cost::MALFORMED_COMMIT); + } + + // check signatures on all contained precommits. + for (i, (precommit, &(ref sig, ref id))) in full.message.precommits.iter() + .zip(&full.message.auth_data) + .enumerate() + { + if let Err(()) = super::check_message_sig::( + &GrandpaMessage::Precommit(precommit.clone()), + id, + sig, + full.round.0, + full.set_id.0, + ) { + debug!(target: "afg", "Bad commit message signature {}", id); + telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); + return Action::Discard(Misbehavior::BadCommitMessage { + signatures_checked: i as i32, + blocks_loaded: 0, + equivocations_caught: 0, + }.cost()); + } + } + + // always discard commits initially and rebroadcast after doing full + // checking. + let topic = super::global_topic::(full.set_id.0); + Action::ProcessAndDiscard(topic, benefit::BASIC_VALIDATED_COMMIT) + } + + fn import_neighbor_message(&mut self, who: &PeerId, update: NeighborPacket>) + -> (Vec, Action) + { + let (cb, topics) = match self.peers.update_peer_state(who, update) { + Ok(view) => (100i32, view.map(|view| neighbor_topics::(view))), + Err(misbehavior) => (misbehavior.cost(), None) + }; + + let neighbor_topics = topics.unwrap_or_default(); + + // always discard, it's valid for one hop. + (neighbor_topics, Action::Discard(cb)) + } + + fn construct_neighbor_packet(&self) -> GossipMessage { + let packet = NeighborPacket { + round: self.local_view.round, + set_id: self.local_view.set_id, + commit_finalized_height: self.local_view.last_commit.unwrap_or(Zero::zero()), + }; + + GossipMessage::Neighbor(VersionedNeighborPacket::V1(packet)) + } + + fn multicast_neighbor_packet>(&self, net: &N) { + let packet = self.construct_neighbor_packet(); + let peers = self.peers.inner.keys().cloned().collect(); + net.send_message(peers, packet.encode()); + } +} + +/// A validator for GRANDPA gossip messages. +pub(super) struct GossipValidator { + inner: parking_lot::RwLock>, +} + +impl GossipValidator { + /// Create a new gossip-validator. + pub(super) fn new(config: crate::Config) -> GossipValidator { + GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)) } + } + + /// Note a round in a set has started. + pub(super) fn note_round>(&self, round: Round, set_id: SetId, net: &N) { + self.inner.write().note_round(round, set_id, net); + } + + /// Note that a voter set with given ID has started. + pub(super) fn note_set>(&self, set_id: SetId, net: &N) { + self.inner.write().note_set(set_id, net); + } + + /// Note that we've imported a commit finalizing a given block. + pub(super) fn note_commit_finalized>(&self, finalized: NumberFor, net: &N) { + self.inner.write().note_commit_finalized(finalized, net); + } + + fn report(&self, _who: &PeerId, _cost_benefit: i32) { + // report + } + + fn do_validate(&self, who: &PeerId, mut data: &[u8]) + -> (Action, Vec) + { + let mut broadcast_topics = Vec::new(); + let action = { + let mut inner = self.inner.write(); + match GossipMessage::::decode(&mut data) { + Some(GossipMessage::VoteOrPrecommit(ref message)) + => inner.validate_round_message(who, message), + Some(GossipMessage::Commit(ref message)) => inner.validate_commit_message(who, message), + Some(GossipMessage::Neighbor(update)) => { + let (topics, action) = inner.import_neighbor_message( + who, + update.into_neighbor_packet(), + ); + + broadcast_topics = topics; + action + } + None => { + debug!(target: "afg", "Error decoding message"); + telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => ""); + + let len = std::cmp::min(i32::max_value() as usize, data.len()) as i32; + Action::Discard(Misbehavior::UndecodablePacket(len).cost()) + } + } + }; + + (action, broadcast_topics) + } +} + +impl network_gossip::Validator for GossipValidator { + fn new_peer(&self, context: &mut ValidatorContext, who: &PeerId, _roles: Roles) { + let packet_data = { + let mut inner = self.inner.write(); + inner.peers.new_peer(who.clone()); + inner.construct_neighbor_packet().encode() + }; + context.send_message(who, packet_data); + } + + fn peer_disconnected(&self, _context: &mut ValidatorContext, who: &PeerId) { + self.inner.write().peers.peer_disconnected(who); + } + + fn validate(&self, context: &mut ValidatorContext, who: &PeerId, data: &[u8]) + -> network_gossip::ValidationResult + { + let (action, broadcast_topics) = self.do_validate(who, data); + + // not with lock held! + for topic in broadcast_topics { + context.send_topic(who, topic, false); + } + + match action { + Action::Keep(topic, cb) => { + self.report(who, cb); + network_gossip::ValidationResult::ProcessAndKeep(topic) + } + Action::ProcessAndDiscard(topic, cb) => { + self.report(who, cb); + network_gossip::ValidationResult::ProcessAndDiscard(topic) + } + Action::Discard(cb) => { + self.report(who, cb); + network_gossip::ValidationResult::Discard + } + } + } + + fn message_allowed<'a>(&'a self) + -> Box bool + 'a> + { + let (inner, do_rebroadcast) = { + use parking_lot::RwLockWriteGuard; + + let mut inner = self.inner.write(); + let now = Instant::now(); + let do_rebroadcast = if now >= inner.next_rebroadcast { + inner.next_rebroadcast = now + REBROADCAST_AFTER; + true + } else { + false + }; + + // downgrade to read-lock. + (RwLockWriteGuard::downgrade(inner), do_rebroadcast) + }; + + Box::new(move |who, intent, topic, mut data| { + if let MessageIntent::PeriodicRebroadcast = intent { + return do_rebroadcast; + } + + let peer = match inner.peers.peer(who) { + None => return false, + Some(x) => x, + }; + + // if the topic is not something we're keeping at the moment, + // do not send. + let (maybe_round, set_id) = match inner.live_topics.topic_info(&topic) { + None => return false, + Some(x) => x, + }; + + // if the topic is not something the peer accepts, discard. + if let Some(round) = maybe_round { + return peer.view.consider_vote(round, set_id) == Consider::Accept + } + + // global message. + let our_best_commit = inner.local_view.last_commit; + let peer_best_commit = peer.view.last_commit; + + match GossipMessage::::decode(&mut data) { + None => false, + Some(GossipMessage::Commit(full)) => { + // we only broadcast our best commit and only if it's + // better than last received by peer. + Some(full.message.target_number) == our_best_commit + && Some(full.message.target_number) > peer_best_commit + } + Some(GossipMessage::Neighbor(_)) => false, + Some(GossipMessage::VoteOrPrecommit(_)) => false, // should not be the case. + } + }) + } + + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + let inner = self.inner.read(); + Box::new(move |topic, mut data| { + // if the topic is not one of the ones that we are keeping at the moment, + // it is expired. + match inner.live_topics.topic_info(&topic) { + None => return true, + Some((Some(_), _)) => return false, // round messages don't require further checking. + Some((None, _)) => {}, + }; + + // global messages -- only keep the best commit. + let best_commit = inner.local_view.last_commit; + + match GossipMessage::::decode(&mut data) { + None => true, + Some(GossipMessage::Commit(full)) + => Some(full.message.target_number) != best_commit, + Some(_) => true, + } + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use network_gossip::Validator as GossipValidatorT; + use network::test::Block; + + // some random config (not really needed) + fn config() -> crate::Config { + crate::Config { + gossip_duration: Duration::from_millis(10), + justification_period: 256, + local_key: None, + name: None, + } + } + + #[derive(Clone)] + struct StubNetwork; + + impl super::Network for StubNetwork { + type In = futures::stream::Empty; + + fn messages_for(&self, _topic: Block::Hash) -> Self::In { + futures::stream::empty() + } + + fn register_validator( + &self, + _validator: std::sync::Arc>, + ) { + + } + + fn gossip_message(&self, _topic: Block::Hash, _data: Vec, _force: bool) { + + } + + fn send_message(&self, _who: Vec, _data: Vec) { + + } + + fn announce(&self, _block: Block::Hash) { + + } + } + + #[test] + fn view_vote_rules() { + let view = View { round: Round(100), set_id: SetId(1), last_commit: Some(1000u64) }; + + assert_eq!(view.consider_vote(Round(98), SetId(1)), Consider::RejectPast); + assert_eq!(view.consider_vote(Round(1), SetId(0)), Consider::RejectPast); + assert_eq!(view.consider_vote(Round(1000), SetId(0)), Consider::RejectPast); + + assert_eq!(view.consider_vote(Round(99), SetId(1)), Consider::Accept); + assert_eq!(view.consider_vote(Round(100), SetId(1)), Consider::Accept); + assert_eq!(view.consider_vote(Round(101), SetId(1)), Consider::Accept); + + assert_eq!(view.consider_vote(Round(102), SetId(1)), Consider::RejectFuture); + assert_eq!(view.consider_vote(Round(1), SetId(2)), Consider::RejectFuture); + assert_eq!(view.consider_vote(Round(1000), SetId(2)), Consider::RejectFuture); + } + + #[test] + fn view_global_message_rules() { + let view = View { round: Round(100), set_id: SetId(2), last_commit: Some(1000u64) }; + + assert_eq!(view.consider_global(SetId(3), 1), Consider::RejectFuture); + assert_eq!(view.consider_global(SetId(3), 1000), Consider::RejectFuture); + assert_eq!(view.consider_global(SetId(3), 10000), Consider::RejectFuture); + + assert_eq!(view.consider_global(SetId(1), 1), Consider::RejectPast); + assert_eq!(view.consider_global(SetId(1), 1000), Consider::RejectPast); + assert_eq!(view.consider_global(SetId(1), 10000), Consider::RejectPast); + + assert_eq!(view.consider_global(SetId(2), 1), Consider::RejectPast); + assert_eq!(view.consider_global(SetId(2), 1000), Consider::RejectPast); + assert_eq!(view.consider_global(SetId(2), 1001), Consider::Accept); + assert_eq!(view.consider_global(SetId(2), 10000), Consider::Accept); + } + + #[test] + fn unknown_peer_cannot_be_updated() { + let mut peers = Peers::default(); + let id = PeerId::random(); + + let update = NeighborPacket { + round: Round(5), + set_id: SetId(10), + commit_finalized_height: 50, + }; + + let res = peers.update_peer_state(&id, update.clone()); + assert!(res.unwrap().is_none()); + + // connect & disconnect. + peers.new_peer(id.clone()); + peers.peer_disconnected(&id); + + let res = peers.update_peer_state(&id, update.clone()); + assert!(res.unwrap().is_none()); + } + + #[test] + fn update_peer_state() { + let update1 = NeighborPacket { + round: Round(5), + set_id: SetId(10), + commit_finalized_height: 50u32, + }; + + let update2 = NeighborPacket { + round: Round(6), + set_id: SetId(10), + commit_finalized_height: 60, + }; + + let update3 = NeighborPacket { + round: Round(2), + set_id: SetId(11), + commit_finalized_height: 61, + }; + + let update4 = NeighborPacket { + round: Round(3), + set_id: SetId(11), + commit_finalized_height: 80, + }; + + let mut peers = Peers::default(); + let id = PeerId::random(); + + peers.new_peer(id.clone()); + + let mut check_update = move |update: NeighborPacket<_>| { + let view = peers.update_peer_state(&id, update.clone()).unwrap().unwrap(); + assert_eq!(view.round, update.round); + assert_eq!(view.set_id, update.set_id); + assert_eq!(view.last_commit, Some(update.commit_finalized_height)); + }; + + check_update(update1); + check_update(update2); + check_update(update3); + check_update(update4); + } + + #[test] + fn invalid_view_change() { + let mut peers = Peers::default(); + + let id = PeerId::random(); + peers.new_peer(id.clone()); + + peers.update_peer_state(&id, NeighborPacket { + round: Round(10), + set_id: SetId(10), + commit_finalized_height: 10, + }).unwrap().unwrap(); + + let mut check_update = move |update: NeighborPacket<_>| { + let err = peers.update_peer_state(&id, update.clone()).unwrap_err(); + assert_eq!(err, Misbehavior::InvalidViewChange); + }; + + // round moves backwards. + check_update(NeighborPacket { + round: Round(9), + set_id: SetId(10), + commit_finalized_height: 10, + }); + // commit finalized height moves backwards. + check_update(NeighborPacket { + round: Round(10), + set_id: SetId(10), + commit_finalized_height: 9, + }); + // set ID moves backwards. + check_update(NeighborPacket { + round: Round(10), + set_id: SetId(9), + commit_finalized_height: 10, + }); + } + + #[test] + fn messages_not_expired_immediately() { + let val = GossipValidator::::new(config()); + + let set_id = 1; + + for round_num in 1u64..10 { + val.note_round(Round(round_num), SetId(set_id), &StubNetwork); + } + + { + let mut is_expired = val.message_expired(); + let last_kept_round = 10u64 - KEEP_RECENT_ROUNDS as u64 - 1; + + // messages from old rounds are expired. + for round_num in 1u64..last_kept_round { + println!("{} should be expired?", round_num); + let topic = crate::communication::round_topic::(round_num, 1); + assert!(is_expired(topic, &[1, 2, 3])); + } + + // messages from not-too-old rounds are not expired. + for round_num in last_kept_round..10 { + println!("{} should not be expired?", round_num); + let topic = crate::communication::round_topic::(round_num, 1); + assert!(!is_expired(topic, &[1, 2, 3])); + } + } + } +} diff --git a/core/finality-grandpa/src/communication/mod.rs b/core/finality-grandpa/src/communication/mod.rs new file mode 100644 index 000000000000..6d006039f3dc --- /dev/null +++ b/core/finality-grandpa/src/communication/mod.rs @@ -0,0 +1,561 @@ +// Copyright 2017-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Communication streams for the polite-grandpa networking protocol. +//! +//! GRANDPA nodes communicate over a gossip network, where messages are not sent to +//! peers until they have reached a given round. +//! +//! Rather than expressing protocol rules, +//! polite-grandpa just carries a notion of impoliteness. Nodes which pass some arbitrary +//! threshold of impoliteness are removed. Messages are either costly, or beneficial. +//! +//! For instance, it is _impolite_ to send the same message more than once. +//! In the future, there will be a fallback for allowing sending the same message +//! under certain conditions that are used to un-stick the protocol. + +use std::sync::Arc; + +use grandpa::VoterSet; +use grandpa::Message::{Prevote, Precommit}; +use futures::prelude::*; +use futures::sync::{oneshot, mpsc}; +use log::{debug, trace}; +use parity_codec::{Encode, Decode}; +use substrate_primitives::{ed25519, Pair}; +use substrate_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; +use runtime_primitives::ConsensusEngineId; +use runtime_primitives::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; +use network::{consensus_gossip as network_gossip, Service as NetworkService,}; +use network_gossip::ConsensusMessage; + +use crate::{Error, Message, SignedMessage, Commit, CompactCommit}; +use gossip::{ + GossipMessage, FullCommitMessage, VoteOrPrecommitMessage, GossipValidator +}; +use substrate_primitives::ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; + +pub mod gossip; + +/// The consensus engine ID of GRANDPA. +pub const GRANDPA_ENGINE_ID: ConsensusEngineId = [b'a', b'f', b'g', b'1']; + +/// A handle to the network. This is generally implemented by providing some +/// handle to a gossip service or similar. +/// +/// Intended to be a lightweight handle such as an `Arc`. +pub trait Network: Clone { + /// A stream of input messages for a topic. + type In: Stream; + + /// Get a stream of messages for a specific gossip topic. + fn messages_for(&self, topic: Block::Hash) -> Self::In; + + /// Register a gossip validator. + fn register_validator(&self, validator: Arc>); + + /// Gossip a message out to all connected peers. + /// + /// Force causes it to be sent to all peers, even if they've seen it already. + /// Only should be used in case of consensus stall. + fn gossip_message(&self, topic: Block::Hash, data: Vec, force: bool); + + /// Send a message to a bunch of specific peers, even if they've seen it already. + fn send_message(&self, who: Vec, data: Vec); + + /// Inform peers that a block with given hash should be downloaded. + fn announce(&self, block: Block::Hash); +} + +/// Create a unique topic for a round and set-id combo. +pub(crate) fn round_topic(round: u64, set_id: u64) -> B::Hash { + <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) +} + +/// Create a unique topic for global messages on a set ID. +pub(crate) fn global_topic(set_id: u64) -> B::Hash { + <::Hashing as HashT>::hash(format!("{}-GLOBAL", set_id).as_bytes()) +} + +impl Network for Arc> where + B: BlockT, + S: network::specialization::NetworkSpecialization, +{ + type In = NetworkStream; + + fn messages_for(&self, topic: B::Hash) -> Self::In { + let (tx, rx) = oneshot::channel(); + self.with_gossip(move |gossip, _| { + let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, topic); + let _ = tx.send(inner_rx); + }); + NetworkStream { outer: rx, inner: None } + } + + fn register_validator(&self, validator: Arc>) { + self.with_gossip( + move |gossip, context| gossip.register_validator(context, GRANDPA_ENGINE_ID, validator) + ) + } + + fn gossip_message(&self, topic: B::Hash, data: Vec, force: bool) { + let msg = ConsensusMessage { + engine_id: GRANDPA_ENGINE_ID, + data, + }; + self.with_gossip( + move |gossip, ctx| gossip.multicast(ctx, topic, msg, force) + ) + } + + fn send_message(&self, who: Vec, data: Vec) { + let msg = ConsensusMessage { + engine_id: GRANDPA_ENGINE_ID, + data, + }; + + self.with_gossip(move |gossip, ctx| for who in &who { + gossip.send_message(ctx, who, msg.clone()) + }) + } + + fn announce(&self, block: B::Hash) { + self.announce_block(block) + } +} + +/// A stream used by NetworkBridge in its implementation of Network. +pub struct NetworkStream { + inner: Option>, + outer: oneshot::Receiver> +} + +impl Stream for NetworkStream { + type Item = network_gossip::TopicNotification; + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + if let Some(ref mut inner) = self.inner { + return inner.poll(); + } + match self.outer.poll() { + Ok(futures::Async::Ready(mut inner)) => { + let poll_result = inner.poll(); + self.inner = Some(inner); + poll_result + }, + Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady), + Err(_) => Err(()) + } + } +} + +/// Bridge between the underlying network service, gossiping consensus messages and Grandpa +pub(crate) struct NetworkBridge> { + service: N, + validator: Arc>, +} + +impl> NetworkBridge { + /// Create a new NetworkBridge to the given NetworkService + pub(crate) fn new(service: N, config: crate::Config) -> Self { + let validator = Arc::new(GossipValidator::new(config)); + service.register_validator(validator.clone()); + NetworkBridge { service, validator: validator } + } + + /// Get the round messages for a round in a given set ID. These are signature-checked. + pub(crate) fn round_communication( + &self, + round: Round, + set_id: SetId, + voters: Arc>, + local_key: Option>, + has_voted: HasVoted, + ) -> ( + impl Stream,Error=Error>, + impl Sink,SinkError=Error>, + ) { + self.validator.note_round(round, set_id, &self.service); + + let locals = local_key.and_then(|pair| { + let public = pair.public(); + let id = AuthorityId(public.0); + if voters.contains_key(&id) { + Some((pair, id)) + } else { + None + } + }); + + let topic = round_topic::(round.0, set_id.0); + let incoming = self.service.messages_for(topic) + .filter_map(|notification| { + let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); + if decoded.is_none() { + debug!(target: "afg", "Skipping malformed message {:?}", notification); + } + decoded + }) + .and_then(move |msg| { + match msg { + GossipMessage::VoteOrPrecommit(msg) => { + // check signature. + if !voters.contains_key(&msg.message.id) { + debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id); + return Ok(None); + } + + match &msg.message.message { + Prevote(prevote) => { + telemetry!(CONSENSUS_INFO; "afg.received_prevote"; + "voter" => ?format!("{}", msg.message.id), + "target_number" => ?prevote.target_number, + "target_hash" => ?prevote.target_hash, + ); + }, + Precommit(precommit) => { + telemetry!(CONSENSUS_INFO; "afg.received_precommit"; + "voter" => ?format!("{}", msg.message.id), + "target_number" => ?precommit.target_number, + "target_hash" => ?precommit.target_hash, + ); + }, + }; + + Ok(Some(msg.message)) + } + _ => { + debug!(target: "afg", "Skipping unknown message type"); + return Ok(None); + } + } + }) + .filter_map(|x| x) + .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); + + let (tx, out_rx) = mpsc::unbounded(); + let outgoing = OutgoingMessages:: { + round: round.0, + set_id: set_id.0, + network: self.service.clone(), + locals, + sender: tx, + has_voted, + }; + + let out_rx = out_rx.map_err(move |()| Error::Network( + format!("Failed to receive on unbounded receiver for round {}", round.0) + )); + + let incoming = incoming.select(out_rx); + + (incoming, outgoing) + } + + /// Set up the global communication streams. + pub(crate) fn global_communication(&self, + set_id: SetId, + voters: Arc>, + is_voter: bool + ) -> ( + impl Stream), Error = Error>, + impl Sink), SinkError = Error>, + ) { + self.validator.note_set(set_id, &self.service); + + let topic = global_topic::(set_id.0); + let incoming = self.service.messages_for(topic) + .filter_map(|notification| { + // this could be optimized by decoding piecewise. + let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); + if decoded.is_none() { + trace!(target: "afg", "Skipping malformed commit message {:?}", notification); + } + decoded + }) + .filter_map(move |msg| { + match msg { + GossipMessage::Commit(msg) => { + let round = msg.round; + let precommits_signed_by: Vec = + msg.message.auth_data.iter().map(move |(_, a)| { + format!("{}", a) + }).collect(); + telemetry!(CONSENSUS_INFO; "afg.received_commit"; + "contains_precommits_signed_by" => ?precommits_signed_by, + "target_number" => ?msg.message.target_number, + "target_hash" => ?msg.message.target_hash, + ); + check_compact_commit::(msg.message, &*voters).map(move |c| (round.0, c)) + }, + _ => { + debug!(target: "afg", "Skipping unknown message type"); + return None; + } + } + }) + .map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream"))); + + let outgoing = CommitsOut::::new( + self.service.clone(), + set_id.0, + is_voter, + ); + + (incoming, outgoing) + } + + pub(crate) fn note_commit_finalized(&self, number: NumberFor) { + self.validator.note_commit_finalized(number, &self.service); + } +} + +impl> Clone for NetworkBridge { + fn clone(&self) -> Self { + NetworkBridge { + service: self.service.clone(), + validator: Arc::clone(&self.validator), + } + } +} + +fn localized_payload(round: u64, set_id: u64, message: &E) -> Vec { + (message, round, set_id).encode() +} + +/// Type-safe wrapper around u64 when indicating that it's a round number. +#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)] +pub struct Round(pub u64); + +/// Type-safe wrapper around u64 when indicating that it's a set ID. +#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, Encode, Decode)] +pub struct SetId(pub u64); + +// check a message. +pub(crate) fn check_message_sig( + message: &Message, + id: &AuthorityId, + signature: &AuthoritySignature, + round: u64, + set_id: u64, +) -> Result<(), ()> { + let as_public = AuthorityId::from_raw(id.0); + let encoded_raw = localized_payload(round, set_id, message); + if ed25519::Pair::verify(signature, &encoded_raw, as_public) { + Ok(()) + } else { + debug!(target: "afg", "Bad signature on message from {:?}", id); + Err(()) + } +} + +/// Whether we've voted already during a prior run of the program. +#[derive(Decode, Encode)] +pub(crate) enum HasVoted { + /// Has not voted already in this round. + #[codec(index = "0")] + No, + /// Has cast a proposal. + #[codec(index = "1")] + Proposed, + /// Has cast a prevote. + #[codec(index = "2")] + Prevoted, + /// Has cast a precommit (implies prevote.) + #[codec(index = "3")] + Precommitted, +} + +impl HasVoted { + #[allow(unused)] + fn can_propose(&self) -> bool { + match *self { + HasVoted::No => true, + HasVoted::Proposed | HasVoted::Prevoted | HasVoted::Precommitted => false, + } + } + + fn can_prevote(&self) -> bool { + match *self { + HasVoted::No | HasVoted::Proposed => true, + HasVoted::Prevoted | HasVoted::Precommitted => false, + } + } + + fn can_precommit(&self) -> bool { + match *self { + HasVoted::No | HasVoted::Proposed | HasVoted::Prevoted => true, + HasVoted::Precommitted => false, + } + } +} + +/// A sink for outgoing messages to the network. +struct OutgoingMessages> { + round: u64, + set_id: u64, + locals: Option<(Arc, AuthorityId)>, + sender: mpsc::UnboundedSender>, + network: N, + has_voted: HasVoted, +} + +impl> Sink for OutgoingMessages +{ + type SinkItem = Message; + type SinkError = Error; + + fn start_send(&mut self, msg: Message) -> StartSend, Error> { + // only sign if we haven't voted in this round already. + let should_sign = match msg { + grandpa::Message::Prevote(_) => self.has_voted.can_prevote(), + grandpa::Message::Precommit(_) => self.has_voted.can_precommit(), + }; + + // when locals exist, sign messages on import + if let (true, &Some((ref pair, ref local_id))) = (should_sign, &self.locals) { + let encoded = localized_payload(self.round, self.set_id, &msg); + let signature = pair.sign(&encoded[..]); + + let target_hash = msg.target().0.clone(); + let signed = SignedMessage:: { + message: msg, + signature, + id: local_id.clone(), + }; + + let message = GossipMessage::VoteOrPrecommit(VoteOrPrecommitMessage:: { + message: signed.clone(), + round: Round(self.round), + set_id: SetId(self.set_id), + }); + + debug!( + target: "afg", + "Announcing block {} to peers which we voted on in round {} in set {}", + target_hash, + self.round, + self.set_id, + ); + + telemetry!( + CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers"; + "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id, + ); + + // announce our block hash to peers and propagate the + // message. + self.network.announce(target_hash); + + let topic = round_topic::(self.round, self.set_id); + self.network.gossip_message(topic, message.encode(), false); + + // forward the message to the inner sender. + let _ = self.sender.unbounded_send(signed); + } + + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + + fn close(&mut self) -> Poll<(), Error> { + // ignore errors since we allow this inner sender to be closed already. + self.sender.close().or_else(|_| Ok(Async::Ready(()))) + } +} + +fn check_compact_commit( + msg: CompactCommit, + voters: &VoterSet, +) -> Option> { + if msg.precommits.len() != msg.auth_data.len() || msg.precommits.is_empty() { + debug!(target: "afg", "Skipping malformed compact commit"); + return None; + } + + // check signatures on all contained precommits. + for (_, ref id) in &msg.auth_data { + if !voters.contains_key(id) { + debug!(target: "afg", "Skipping commit containing unknown voter {}", id); + return None; + } + } + + Some(msg) +} +/// An output sink for commit messages. +struct CommitsOut> { + network: N, + set_id: SetId, + is_voter: bool, + _marker: ::std::marker::PhantomData, +} + +impl> CommitsOut { + /// Create a new commit output stream. + pub(crate) fn new(network: N, set_id: u64, is_voter: bool) -> Self { + CommitsOut { + network, + set_id: SetId(set_id), + is_voter, + _marker: Default::default(), + } + } +} + +impl> Sink for CommitsOut { + type SinkItem = (u64, Commit); + type SinkError = Error; + + fn start_send(&mut self, input: (u64, Commit)) -> StartSend { + if !self.is_voter { + return Ok(AsyncSink::Ready); + } + + let (round, commit) = input; + let round = Round(round); + + telemetry!(CONSENSUS_INFO; "afg.commit_issued"; + "target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash, + ); + let (precommits, auth_data) = commit.precommits.into_iter() + .map(|signed| (signed.precommit, (signed.signature, signed.id))) + .unzip(); + + let compact_commit = CompactCommit:: { + target_hash: commit.target_hash, + target_number: commit.target_number, + precommits, + auth_data + }; + + let message = GossipMessage::Commit(FullCommitMessage:: { + round: round, + set_id: self.set_id, + message: compact_commit, + }); + + let topic = global_topic::(self.set_id.0); + self.network.gossip_message(topic, message.encode(), false); + + Ok(AsyncSink::Ready) + } + + fn close(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } + fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) } +} diff --git a/core/finality-grandpa/src/environment.rs b/core/finality-grandpa/src/environment.rs index 587762b608e0..556b4aead76e 100644 --- a/core/finality-grandpa/src/environment.rs +++ b/core/finality-grandpa/src/environment.rs @@ -82,7 +82,7 @@ pub(crate) struct Environment, RA> { pub(crate) config: Config, pub(crate) authority_set: SharedAuthoritySet>, pub(crate) consensus_changes: SharedConsensusChanges>, - pub(crate) network: N, + pub(crate) network: crate::communication::NetworkBridge, pub(crate) set_id: u64, pub(crate) last_completed: LastCompletedRound>, } @@ -231,32 +231,24 @@ impl, N, RA> voter::Environment( - self.network.messages_for(round, self.set_id), - self.voters.clone(), - ); - let local_key = self.config.local_key.as_ref() .filter(|pair| self.voters.contains_key(&pair.public().into())); - let (out_rx, outgoing) = crate::communication::outgoing_messages::( - round, - self.set_id, - local_key.cloned(), + let (incoming, outgoing) = self.network.round_communication( + crate::communication::Round(round), + crate::communication::SetId(self.set_id), self.voters.clone(), - self.network.clone(), + local_key.cloned(), + crate::communication::HasVoted::No, ); // schedule incoming messages from the network to be held until // corresponding blocks are imported. - let incoming = UntilVoteTargetImported::new( + let incoming = Box::new(UntilVoteTargetImported::new( self.inner.import_notification_stream(), self.inner.clone(), incoming, - ); - - // join incoming network messages with locally originating ones. - let incoming = Box::new(out_rx.select(incoming).map_err(Into::into)); + ).map_err(Into::into)); // schedule network message cleanup when sink drops. let outgoing = Box::new(outgoing.sink_map_err(Into::into)); @@ -305,7 +297,7 @@ impl, N, RA> voter::Environment, N, RA> voter::Environment Self::Timer { diff --git a/core/finality-grandpa/src/lib.rs b/core/finality-grandpa/src/lib.rs index daf37357a086..b07535963727 100644 --- a/core/finality-grandpa/src/lib.rs +++ b/core/finality-grandpa/src/lib.rs @@ -53,32 +53,28 @@ //! included in the newly-finalized chain. use futures::prelude::*; -use log::{debug, info, warn, trace}; -use futures::sync::{self, mpsc, oneshot}; +use log::{debug, info, warn}; +use futures::sync::mpsc; use client::{ BlockchainEvents, CallExecutor, Client, backend::Backend, error::Error as ClientError, }; use client::blockchain::HeaderBackend; -use parity_codec::{Encode, Decode}; +use parity_codec::Encode; use runtime_primitives::traits::{ - NumberFor, Block as BlockT, Header as HeaderT, DigestFor, ProvideRuntimeApi, Hash as HashT, - DigestItemFor, DigestItem, + NumberFor, Block as BlockT, DigestFor, ProvideRuntimeApi, DigestItemFor, DigestItem, }; use fg_primitives::GrandpaApi; use inherents::InherentDataProviders; use runtime_primitives::generic::BlockId; -use substrate_primitives::{ed25519, H256, Blake2Hasher, Pair}; -use substrate_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO}; +use substrate_primitives::{ed25519, H256, Pair, Blake2Hasher}; +use substrate_telemetry::{telemetry, CONSENSUS_INFO, CONSENSUS_DEBUG, CONSENSUS_WARN}; use srml_finality_tracker; use grandpa::Error as GrandpaError; use grandpa::{voter, round::State as RoundState, BlockNumberOps, VoterSet}; -use network::Service as NetworkService; -use network::consensus_gossip as network_gossip; - use std::fmt; use std::sync::Arc; use std::time::Duration; @@ -99,21 +95,20 @@ mod until_imported; mod service_integration; #[cfg(feature="service-integration")] pub use service_integration::{LinkHalfForService, BlockImportForService}; +pub use communication::Network; +pub use finality_proof::{prove_finality, check_finality_proof}; use aux_schema::{PersistentData, VoterSetState}; use environment::Environment; -pub use finality_proof::{prove_finality, check_finality_proof}; use import::GrandpaBlockImport; use until_imported::UntilCommitBlocksImported; +use communication::NetworkBridge; use ed25519::{Public as AuthorityId, Signature as AuthoritySignature}; #[cfg(test)] mod tests; -const GRANDPA_ENGINE_ID: runtime_primitives::ConsensusEngineId = [b'a', b'f', b'g', b'1']; -const MESSAGE_ROUND_TOLERANCE: u64 = 2; - /// A GRANDPA message for a substrate chain. pub type Message = grandpa::Message<::Hash, NumberFor>; /// A signed message. @@ -124,24 +119,6 @@ pub type SignedMessage = grandpa::SignedMessage< AuthorityId, >; -/// Grandpa gossip message type. -/// This is the root type that gets encoded and sent on the network. -#[derive(Debug, Encode, Decode)] -pub enum GossipMessage { - /// Grandpa message with round and set info. - VoteOrPrecommit(VoteOrPrecommitMessage), - /// Grandpa commit message with round and set info. - Commit(FullCommitMessage), -} - -/// Network level message with topic information. -#[derive(Debug, Encode, Decode)] -pub struct VoteOrPrecommitMessage { - pub round: u64, - pub set_id: u64, - pub message: SignedMessage, -} - /// A prevote message for this chain's block type. pub type Prevote = grandpa::Prevote<::Hash, NumberFor>; /// A precommit message for this chain's block type. @@ -161,14 +138,6 @@ pub type CompactCommit = grandpa::CompactCommit< AuthorityId >; -/// Network level commit message with topic information. -#[derive(Debug, Encode, Decode)] -pub struct FullCommitMessage { - pub round: u64, - pub set_id: u64, - pub message: CompactCommit, -} - /// Configuration for the GRANDPA service. #[derive(Clone)] pub struct Config { @@ -219,337 +188,6 @@ impl From for Error { } } -/// A stream used by NetworkBridge in its implementation of Network. -pub struct NetworkStream { - inner: Option>>, - outer: oneshot::Receiver>> -} - -impl Stream for NetworkStream { - type Item = Vec; - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - if let Some(ref mut inner) = self.inner { - return inner.poll(); - } - match self.outer.poll() { - Ok(futures::Async::Ready(mut inner)) => { - let poll_result = inner.poll(); - self.inner = Some(inner); - poll_result - }, - Ok(futures::Async::NotReady) => Ok(futures::Async::NotReady), - Err(_) => Err(()) - } - } -} - -struct TopicTracker { - min_live_round: u64, - max_round: u64, - set_id: u64, -} - -impl TopicTracker { - fn is_expired(&self, round: u64, set_id: u64) -> bool { - if set_id < self.set_id { - trace!(target: "afg", "Expired: Message with expired set_id {} (ours {})", set_id, self.set_id); - telemetry!(CONSENSUS_TRACE; "afg.expired_set_id"; - "set_id" => ?set_id, "ours" => ?self.set_id - ); - return true; - } else if set_id == self.set_id + 1 { - // allow a few first rounds of future set. - if round > MESSAGE_ROUND_TOLERANCE { - trace!(target: "afg", "Expired: Message too far in the future set, round {} (ours set_id {})", round, self.set_id); - telemetry!(CONSENSUS_TRACE; "afg.expired_msg_too_far_in_future_set"; - "round" => ?round, "ours" => ?self.set_id - ); - return true; - } - } else if set_id == self.set_id { - if round < self.min_live_round.saturating_sub(MESSAGE_ROUND_TOLERANCE) { - trace!(target: "afg", "Expired: Message round is out of bounds {} (ours {}-{})", round, self.min_live_round, self.max_round); - telemetry!(CONSENSUS_TRACE; "afg.msg_round_oob"; - "round" => ?round, "our_min_live_round" => ?self.min_live_round, "our_max_round" => ?self.max_round - ); - return true; - } - } else { - trace!(target: "afg", "Expired: Message in invalid future set {} (ours {})", set_id, self.set_id); - telemetry!(CONSENSUS_TRACE; "afg.expired_msg_in_invalid_future_set"; - "set_id" => ?set_id, "ours" => ?self.set_id - ); - return true; - } - - false - } -} - -struct GossipValidator { - rounds: parking_lot::RwLock, - _marker: ::std::marker::PhantomData, -} - -impl GossipValidator { - fn new() -> GossipValidator { - GossipValidator { - rounds: parking_lot::RwLock::new(TopicTracker { - min_live_round: 0, - max_round: 0, - set_id: 0, - }), - _marker: Default::default(), - } - } - - fn note_round(&self, round: u64, set_id: u64) { - let mut rounds = self.rounds.write(); - if set_id > rounds.set_id { - rounds.set_id = set_id; - rounds.max_round = 0; - rounds.min_live_round = 0; - } - rounds.max_round = rounds.max_round.max(round); - } - - fn note_set(&self, _set_id: u64) { - } - - fn drop_round(&self, round: u64, set_id: u64) { - let mut rounds = self.rounds.write(); - if set_id == rounds.set_id && round >= rounds.min_live_round { - rounds.min_live_round = round + 1; - } - } - - fn drop_set(&self, _set_id: u64) { - } - - fn is_expired(&self, round: u64, set_id: u64) -> bool { - self.rounds.read().is_expired(round, set_id) - } - - fn validate_round_message(&self, full: VoteOrPrecommitMessage) - -> network_gossip::ValidationResult - { - if self.is_expired(full.round, full.set_id) { - return network_gossip::ValidationResult::Expired; - } - - if let Err(()) = communication::check_message_sig::( - &full.message.message, - &full.message.id, - &full.message.signature, - full.round, - full.set_id - ) { - debug!(target: "afg", "Bad message signature {}", full.message.id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id); - return network_gossip::ValidationResult::Invalid; - } - - let topic = message_topic::(full.round, full.set_id); - network_gossip::ValidationResult::Valid(topic) - } - - fn validate_commit_message(&self, full: FullCommitMessage) - -> network_gossip::ValidationResult - { - use grandpa::Message as GrandpaMessage; - - if self.is_expired(full.round, full.set_id) { - return network_gossip::ValidationResult::Expired; - } - - if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { - debug!(target: "afg", "Malformed compact commit"); - telemetry!(CONSENSUS_DEBUG; "afg.malformed_compact_commit"; - "precommits_len" => ?full.message.precommits.len(), - "auth_data_len" => ?full.message.auth_data.len(), - "precommits_is_empty" => ?full.message.precommits.is_empty(), - ); - return network_gossip::ValidationResult::Invalid; - } - - // check signatures on all contained precommits. - for (precommit, &(ref sig, ref id)) in full.message.precommits.iter().zip(&full.message.auth_data) { - if let Err(()) = communication::check_message_sig::( - &GrandpaMessage::Precommit(precommit.clone()), - id, - sig, - full.round, - full.set_id, - ) { - debug!(target: "afg", "Bad commit message signature {}", id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); - return network_gossip::ValidationResult::Invalid; - } - } - - let topic = commit_topic::(full.set_id); - - let precommits_signed_by: Vec = full.message.auth_data.iter().map(move |(_, a)| { - format!("{}", a) - }).collect(); - - telemetry!(CONSENSUS_INFO; "afg.received_commit_msg"; - "contains_precommits_signed_by" => ?precommits_signed_by, - "round" => ?full.round, - "set_id" => ?full.set_id, - "topic" => ?topic, - "block_hash" => ?full.message, - ); - network_gossip::ValidationResult::Valid(topic) - } -} - -impl network_gossip::Validator for GossipValidator { - fn validate(&self, mut data: &[u8]) -> network_gossip::ValidationResult { - match GossipMessage::::decode(&mut data) { - Some(GossipMessage::VoteOrPrecommit(message)) => self.validate_round_message(message), - Some(GossipMessage::Commit(message)) => self.validate_commit_message(message), - None => { - debug!(target: "afg", "Error decoding message"); - telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => ""); - network_gossip::ValidationResult::Invalid - } - } - } - - fn message_expired<'a>(&'a self) -> Box bool + 'a> { - let rounds = self.rounds.read(); - Box::new(move |_topic, mut data| { - match GossipMessage::::decode(&mut data) { - None => true, - Some(GossipMessage::Commit(full)) => rounds.is_expired(full.round, full.set_id), - Some(GossipMessage::VoteOrPrecommit(full)) => - rounds.is_expired(full.round, full.set_id), - } - }) - } -} - -/// A handle to the network. This is generally implemented by providing some -/// handle to a gossip service or similar. -/// -/// Intended to be a lightweight handle such as an `Arc`. -pub trait Network: Clone { - /// A stream of input messages for a topic. - type In: Stream,Error=()>; - - /// Get a stream of messages for a specific round. This stream should - /// never logically conclude. - fn messages_for(&self, round: u64, set_id: u64) -> Self::In; - - /// Send a message at a specific round out. - fn send_message(&self, round: u64, set_id: u64, message: Vec, force: bool); - - /// Clean up messages for a round. - fn drop_round_messages(&self, round: u64, set_id: u64); - - /// Clean up messages for a given authority set id (e.g. commit messages). - fn drop_set_messages(&self, set_id: u64); - - /// Get a stream of commit messages for a specific set-id. This stream - /// should never logically conclude. - fn commit_messages(&self, set_id: u64) -> Self::In; - - /// Send message over the commit channel. - fn send_commit(&self, round: u64, set_id: u64, message: Vec, force: bool); - - /// Inform peers that a block with given hash should be downloaded. - fn announce(&self, round: u64, set_id: u64, block: Block::Hash); -} - -/// Bridge between NetworkService, gossiping consensus messages and Grandpa -pub struct NetworkBridge> { - service: Arc>, - validator: Arc>, -} - -impl> NetworkBridge { - /// Create a new NetworkBridge to the given NetworkService - pub fn new(service: Arc>) -> Self { - let validator = Arc::new(GossipValidator::new()); - let v = validator.clone(); - service.with_gossip(move |gossip, _| { - gossip.register_validator(GRANDPA_ENGINE_ID, v); - }); - NetworkBridge { service, validator: validator } - } -} - -impl,> Clone for NetworkBridge { - fn clone(&self) -> Self { - NetworkBridge { - service: Arc::clone(&self.service), - validator: Arc::clone(&self.validator), - } - } -} - -fn message_topic(round: u64, set_id: u64) -> B::Hash { - <::Hashing as HashT>::hash(format!("{}-{}", set_id, round).as_bytes()) -} - -fn commit_topic(set_id: u64) -> B::Hash { - <::Hashing as HashT>::hash(format!("{}-COMMITS", set_id).as_bytes()) -} - -impl,> Network for NetworkBridge { - type In = NetworkStream; - fn messages_for(&self, round: u64, set_id: u64) -> Self::In { - self.validator.note_round(round, set_id); - let (tx, rx) = sync::oneshot::channel(); - self.service.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, message_topic::(round, set_id)); - let _ = tx.send(inner_rx); - }); - NetworkStream { outer: rx, inner: None } - } - - fn send_message(&self, round: u64, set_id: u64, message: Vec, force: bool) { - let topic = message_topic::(round, set_id); - self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, force); - } - - fn drop_round_messages(&self, round: u64, set_id: u64) { - self.validator.drop_round(round, set_id); - self.service.with_gossip(move |gossip, _| gossip.collect_garbage()); - } - - fn drop_set_messages(&self, set_id: u64) { - self.validator.drop_set(set_id); - self.service.with_gossip(move |gossip, _| gossip.collect_garbage()); - } - - fn commit_messages(&self, set_id: u64) -> Self::In { - self.validator.note_set(set_id); - let (tx, rx) = sync::oneshot::channel(); - self.service.with_gossip(move |gossip, _| { - let inner_rx = gossip.messages_for(GRANDPA_ENGINE_ID, commit_topic::(set_id)); - let _ = tx.send(inner_rx); - }); - NetworkStream { outer: rx, inner: None } - } - - fn send_commit(&self, _round: u64, set_id: u64, message: Vec, force: bool) { - let topic = commit_topic::(set_id); - self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, force); - } - - fn announce(&self, round: u64, _set_id: u64, block: B::Hash) { - debug!(target: "afg", "Announcing block {} to peers which we voted on in round {}", block, round); - telemetry!(CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers"; - "block" => ?block, "round" => ?round - ); - self.service.announce_block(block) - } -} - /// Something which can determine if a block is known. pub trait BlockStatus { /// Return `Ok(Some(number))` or `Ok(None)` depending on whether the block @@ -698,11 +336,11 @@ pub fn block_import, RA, PRA>( } fn committer_communication, B, E, N, RA>( - local_key: Option>, + local_key: Option<&Arc>, set_id: u64, voters: &Arc>, client: &Arc>, - network: &N, + network: &NetworkBridge, ) -> ( impl Stream< Item = (u64, ::grandpa::CompactCommit, AuthoritySignature, AuthorityId>), @@ -720,10 +358,15 @@ fn committer_communication, B, E, N, RA>( NumberFor: BlockNumberOps, DigestItemFor: DigestItem, { + let is_voter = local_key + .map(|pair| voters.contains_key(&pair.public().into())) + .unwrap_or(false); + // verification stream - let commit_in = crate::communication::checked_commit_stream::( - network.commit_messages(set_id), + let (commit_in, commit_out) = network.global_communication( + communication::SetId(set_id), voters.clone(), + is_voter, ); // block commit messages until relevant blocks are imported. @@ -733,16 +376,6 @@ fn committer_communication, B, E, N, RA>( commit_in, ); - let is_voter = local_key - .map(|pair| voters.contains_key(&pair.public().into())) - .unwrap_or(false); - - let commit_out = crate::communication::CommitsOut::::new( - network.clone(), - set_id, - is_voter, - ); - let commit_in = commit_in.map_err(Into::into); let commit_out = commit_out.sink_map_err(Into::into); @@ -800,20 +433,18 @@ pub fn run_grandpa, N, RA>( { use futures::future::{self, Loop as FutureLoop}; + let network = NetworkBridge::new(network, config.clone()); + let LinkHalf { client, persistent_data, voter_commands_rx, } = link; - // we shadow network with the wrapping/rebroadcasting network to avoid - // accidental reuse. - let (broadcast_worker, network) = communication::rebroadcasting_network(network); let PersistentData { authority_set, set_state, consensus_changes } = persistent_data; register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?; let voters = authority_set.current_authorities(); - let initial_environment = Arc::new(Environment { inner: client.clone(), config: config.clone(), @@ -846,7 +477,7 @@ pub fn run_grandpa, N, RA>( ); let committer_data = committer_communication( - config.local_key.clone(), + config.local_key.as_ref(), env.set_id, &env.voters, &client, @@ -962,8 +593,7 @@ pub fn run_grandpa, N, RA>( }); let voter_work = voter_work - .join(broadcast_worker) - .map(|((), ())| ()) + .map(|_| ()) .map_err(|e| { warn!("GRANDPA Voter failed: {:?}", e); telemetry!(CONSENSUS_WARN; "afg.voter_failed"; "e" => ?e); diff --git a/core/finality-grandpa/src/tests.rs b/core/finality-grandpa/src/tests.rs index 2fd9da894403..d24a97fb1a85 100644 --- a/core/finality-grandpa/src/tests.rs +++ b/core/finality-grandpa/src/tests.rs @@ -20,6 +20,7 @@ use super::*; use network::test::{Block, DummySpecialization, Hash, TestNetFactory, Peer, PeersClient}; use network::test::{PassThroughVerifier}; use network::config::{ProtocolConfig, Roles}; +use network::consensus_gossip as network_gossip; use parking_lot::Mutex; use tokio::runtime::current_thread; use keyring::AuthorityKeyring; @@ -33,11 +34,12 @@ use consensus_common::{BlockOrigin, ForkChoiceStrategy, ImportedAux, ImportBlock use consensus_common::import_queue::{SharedBlockImport, SharedJustificationImport}; use std::collections::{HashMap, HashSet}; use std::result; -use runtime_primitives::traits::{ApiRef, ProvideRuntimeApi}; +use runtime_primitives::traits::{ApiRef, ProvideRuntimeApi, Header as HeaderT}; use runtime_primitives::generic::BlockId; use substrate_primitives::{NativeOrEncoded, ExecutionContext}; use authorities::AuthoritySet; +use communication::GRANDPA_ENGINE_ID; use consensus_changes::ConsensusChanges; type PeerData = @@ -137,102 +139,72 @@ impl TestNetFactory for GrandpaTestNet { struct MessageRouting { inner: Arc>, peer_id: usize, - validator: Arc>, } impl MessageRouting { fn new(inner: Arc>, peer_id: usize,) -> Self { - let validator = Arc::new(GossipValidator::new()); - let v = validator.clone(); - { - let inner = inner.lock(); - let peer = inner.peer(peer_id); - peer.with_gossip(move |gossip, _| { - gossip.register_validator(GRANDPA_ENGINE_ID, v); - }); - } MessageRouting { inner, peer_id, - validator, } } - - fn drop_messages(&self, topic: Hash) { - let inner = self.inner.lock(); - let peer = inner.peer(self.peer_id); - peer.consensus_gossip_collect_garbage_for_topic(topic); - } -} - -fn make_topic(round: u64, set_id: u64) -> Hash { - message_topic::(round, set_id) -} - -fn make_commit_topic(set_id: u64) -> Hash { - commit_topic::(set_id) } impl Network for MessageRouting { - type In = Box,Error=()> + Send>; + type In = Box + Send>; - fn messages_for(&self, round: u64, set_id: u64) -> Self::In { - self.validator.note_round(round, set_id); + /// Get a stream of messages for a specific gossip topic. + fn messages_for(&self, topic: Hash) -> Self::In { let inner = self.inner.lock(); let peer = inner.peer(self.peer_id); + let messages = peer.consensus_gossip_messages_for( GRANDPA_ENGINE_ID, - make_topic(round, set_id), + topic, ); let messages = messages.map_err( - move |_| panic!("Messages for round {} dropped too early", round) + move |_| panic!("Messages for topic {} dropped too early", topic) ); Box::new(messages) } - fn send_message(&self, round: u64, set_id: u64, message: Vec, force: bool) { + fn register_validator(&self, v: Arc>) { let inner = self.inner.lock(); - inner.peer(self.peer_id) - .gossip_message(make_topic(round, set_id), GRANDPA_ENGINE_ID, message, force); - } - - fn drop_round_messages(&self, round: u64, set_id: u64) { - self.validator.drop_round(round, set_id); - let topic = make_topic(round, set_id); - self.drop_messages(topic); - } - - fn drop_set_messages(&self, set_id: u64) { - self.validator.drop_set(set_id); - let topic = make_commit_topic(set_id); - self.drop_messages(topic); + let peer = inner.peer(self.peer_id); + peer.with_gossip(move |gossip, context| { + gossip.register_validator(context, GRANDPA_ENGINE_ID, v); + }); } - fn commit_messages(&self, set_id: u64) -> Self::In { - self.validator.note_set(set_id); + fn gossip_message(&self, topic: Hash, data: Vec, force: bool) { let inner = self.inner.lock(); - let peer = inner.peer(self.peer_id); - let messages = peer.consensus_gossip_messages_for( + inner.peer(self.peer_id).gossip_message( + topic, GRANDPA_ENGINE_ID, - make_commit_topic(set_id), - ); - - let messages = messages.map_err( - move |_| panic!("Commit messages for set {} dropped too early", set_id) + data, + force, ); - - Box::new(messages) } - fn send_commit(&self, _round: u64, set_id: u64, message: Vec, force: bool) { + fn send_message(&self, who: Vec, data: Vec) { let inner = self.inner.lock(); - inner.peer(self.peer_id) - .gossip_message(make_commit_topic(set_id), GRANDPA_ENGINE_ID, message, force); + let peer = inner.peer(self.peer_id); + + peer.with_gossip(move |gossip, ctx| for who in &who { + gossip.send_message( + ctx, + who, + network_gossip::ConsensusMessage { + engine_id: GRANDPA_ENGINE_ID, + data: data.clone(), + } + ) + }) } - fn announce(&self, _round: u64, _set_id: u64, _block: H256) { + fn announce(&self, _block: Hash) { } } diff --git a/core/network/src/consensus_gossip.rs b/core/network/src/consensus_gossip.rs index 433dd5b795fd..745a644a9d05 100644 --- a/core/network/src/consensus_gossip.rs +++ b/core/network/src/consensus_gossip.rs @@ -17,11 +17,12 @@ //! Utility for gossip of network messages between authorities. //! Handles chain-specific and standard BFT messages. -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, hash_map::Entry}; use std::sync::Arc; +use std::iter; +use std::time; use log::{trace, debug}; use futures::sync::mpsc; -use rand::{self, seq::SliceRandom}; use lru_cache::LruCache; use network_libp2p::{Severity, PeerId}; use runtime_primitives::traits::{Block as BlockT, Hash, HashFor}; @@ -33,57 +34,192 @@ use crate::config::Roles; // FIXME: Add additional spam/DoS attack protection: https://github.com/paritytech/substrate/issues/1115 const KNOWN_MESSAGES_CACHE_SIZE: usize = 4096; +const REBROADCAST_INTERVAL: time::Duration = time::Duration::from_secs(30); + struct PeerConsensus { known_messages: HashSet, - is_authority: bool, + roles: Roles, } -#[derive(Clone, Copy)] -enum Status { - Live, - Future, +/// Topic stream message with sender. +#[derive(Debug, Eq, PartialEq)] +pub struct TopicNotification { + /// Message data. + pub message: Vec, + /// Sender if available. + pub sender: Option, } struct MessageEntry { message_hash: B::Hash, topic: B::Hash, message: ConsensusMessage, - status: Status, +} + +/// Consensus message destination. +pub enum MessageRecipient { + /// Send to all peers. + BroadcastToAll, + /// Send to peers that don't have that message already. + BroadcastNew, + /// Send to specific peer. + Peer(PeerId), +} + +/// The reason for sending out the message. +#[derive(Eq, PartialEq, Copy, Clone)] +pub enum MessageIntent { + /// Requested broadcast + Broadcast, + /// Requested broadcast to all peers. + ForcedBroadcast, + /// Periodic rebroadcast of all messages to all peers. + PeriodicRebroadcast, } /// Message validation result. pub enum ValidationResult { - /// Message is valid with this topic. - Valid(H), - /// Message is future with this topic. - Future(H), - /// Invalid message. - Invalid, - /// Obsolete message. - Expired, + /// Message should be stored and propagated under given topic. + ProcessAndKeep(H), + /// Message should be processed, but not propagated. + ProcessAndDiscard(H), + /// Message should be ignored. + Discard, +} + +/// Validation context. Allows reacting to incoming messages by sending out further messages. +pub trait ValidatorContext { + /// Broadcast all messages with given topic to peers that do not have it yet. + fn broadcast_topic(&mut self, topic: B::Hash, force: bool); + /// Broadcast a message to all peers that have not received it previously. + fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool); + /// Send addressed message to a peer. + fn send_message(&mut self, who: &PeerId, message: Vec); + /// Send all messages with given topic to a peer. + fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool); +} + +struct NetworkContext<'g, 'p, B: BlockT> { + gossip: &'g mut ConsensusGossip, + protocol: &'p mut Context, + engine_id: ConsensusEngineId, +} + +impl<'g, 'p, B: BlockT> ValidatorContext for NetworkContext<'g, 'p, B> { + /// Broadcast all messages with given topic to peers that do not have it yet. + fn broadcast_topic(&mut self, topic: B::Hash, force: bool) { + self.gossip.broadcast_topic(self.protocol, topic, force); + } + + /// Broadcast a message to all peers that have not received it previously. + fn broadcast_message(&mut self, topic: B::Hash, message: Vec, force: bool) { + self.gossip.multicast( + self.protocol, + topic, + ConsensusMessage{ data: message, engine_id: self.engine_id.clone() }, + force, + ); + } + + /// Send addressed message to a peer. + fn send_message(&mut self, who: &PeerId, message: Vec) { + self.protocol.send_message(who.clone(), Message::Consensus(ConsensusMessage { + engine_id: self.engine_id, + data: message, + })); + } + + /// Send all messages with given topic to a peer. + fn send_topic(&mut self, who: &PeerId, topic: B::Hash, force: bool) { + self.gossip.send_topic(self.protocol, who, topic, self.engine_id, force); + } +} + +fn propagate<'a, B: BlockT, I>( + protocol: &mut Context, + messages: I, + intent: MessageIntent, + peers: &mut HashMap>, + validators: &HashMap>>, +) + where I: IntoIterator, // (msg_hash, topic, message) +{ + let mut check_fns = HashMap::new(); + let mut message_allowed = move |who: &PeerId, intent: MessageIntent, topic: &B::Hash, message: &ConsensusMessage| { + let engine_id = message.engine_id; + let check_fn = match check_fns.entry(engine_id) { + Entry::Occupied(entry) => entry.into_mut(), + Entry::Vacant(vacant) => match validators.get(&engine_id) { + None => return false, // treat all messages with no validator as not allowed + Some(validator) => vacant.insert(validator.message_allowed()), + } + }; + + (check_fn)(who, intent, topic, &message.data) + }; + + for (message_hash, topic, message) in messages { + for (id, ref mut peer) in peers.iter_mut() { + let intent = match intent { + MessageIntent::Broadcast => + if peer.known_messages.contains(&message_hash) { + continue + } else { + MessageIntent::Broadcast + }, + MessageIntent::PeriodicRebroadcast => + if peer.known_messages.contains(&message_hash) { + MessageIntent::PeriodicRebroadcast + } else { + // peer doesn't know message, so the logic should treat it as an + // initial broadcast. + MessageIntent::Broadcast + }, + other => other, + }; + + if !message_allowed(id, intent, &topic, &message) { + continue + } + peer.known_messages.insert(message_hash.clone()); + trace!(target: "gossip", "Propagating to {}: {:?}", id, message); + protocol.send_message(id.clone(), Message::Consensus(message.clone())); + } + } } /// Validates consensus messages. -pub trait Validator { +pub trait Validator: Send + Sync { + /// New peer is connected. + fn new_peer(&self, _context: &mut ValidatorContext, _who: &PeerId, _roles: Roles) { + } + + /// New connection is dropped. + fn peer_disconnected(&self, _context: &mut ValidatorContext, _who: &PeerId) { + } + /// Validate consensus message. - fn validate(&self, data: &[u8]) -> ValidationResult; + fn validate(&self, context: &mut ValidatorContext, sender: &PeerId, data: &[u8]) -> ValidationResult; /// Produce a closure for validating messages on a given topic. - fn message_expired<'a>(&'a self) -> Box bool + 'a> { - Box::new(move |_topic, data| match self.validate(data) { - ValidationResult::Valid(_) | ValidationResult::Future(_) => false, - ValidationResult::Invalid | ValidationResult::Expired => true, - }) + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, _data| false) + } + + /// Produce a closure for filtering egress messages. + fn message_allowed<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_who, _intent, _topic, _data| true) } } /// Consensus network protocol handler. Manages statements and candidate requests. pub struct ConsensusGossip { peers: HashMap>, - live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec>>>, + live_message_sinks: HashMap<(ConsensusEngineId, B::Hash), Vec>>, messages: Vec>, known_messages: LruCache, - validators: HashMap>>, + validators: HashMap>>, + next_broadcast: time::Instant, } impl ConsensusGossip { @@ -95,6 +231,7 @@ impl ConsensusGossip { messages: Default::default(), known_messages: LruCache::new(KNOWN_MESSAGES_CACHE_SIZE), validators: Default::default(), + next_broadcast: time::Instant::now() + REBROADCAST_INTERVAL, } } @@ -104,105 +241,84 @@ impl ConsensusGossip { } /// Register message validator for a message type. - pub fn register_validator(&mut self, engine_id: ConsensusEngineId, validator: Arc>) { - self.validators.insert(engine_id, validator); - } - - /// Handle new connected peer. - pub fn new_peer(&mut self, protocol: &mut Context, who: PeerId, roles: Roles) { - if roles.intersects(Roles::AUTHORITY) { - trace!(target:"gossip", "Registering {:?} {}", roles, who); - // Send out all known messages to authorities. - let mut known_messages = HashSet::new(); - for entry in self.messages.iter() { - if let Status::Future = entry.status { continue } - - known_messages.insert(entry.message_hash); - protocol.send_message(who.clone(), Message::Consensus(entry.message.clone())); - } - self.peers.insert(who, PeerConsensus { - known_messages, - is_authority: true, - }); - } - else if roles.intersects(Roles::FULL) { - self.peers.insert(who, PeerConsensus { - known_messages: HashSet::new(), - is_authority: false, - }); + pub fn register_validator(&mut self, protocol: &mut Context, engine_id: ConsensusEngineId, validator: Arc>) { + self.register_validator_internal(engine_id, validator.clone()); + let peers: Vec<_> = self.peers.iter().map(|(id, peer)| (id.clone(), peer.roles)).collect(); + for (id, roles) in peers { + let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() }; + validator.new_peer(&mut context, &id, roles); } } - fn propagate( - &mut self, - protocol: &mut Context, - message_hash: B::Hash, - get_message: F, - force: bool, - ) - where F: Fn() -> ConsensusMessage, - { - let mut non_authorities: Vec<_> = self.peers.iter() - .filter_map(|(id, ref peer)| - if !peer.is_authority && (!peer.known_messages.contains(&message_hash) || force) { - Some(id.clone()) - } else { - None - } - ) - .collect(); - - non_authorities.shuffle(&mut rand::thread_rng()); - let non_authorities: HashSet<_> = if non_authorities.is_empty() { - HashSet::new() - } else { - non_authorities[0..non_authorities.len().min(((non_authorities.len() as f64).sqrt() as usize).max(3))].iter().collect() - }; + fn register_validator_internal(&mut self, engine_id: ConsensusEngineId, validator: Arc>) { + self.validators.insert(engine_id, validator.clone()); + } - for (id, ref mut peer) in self.peers.iter_mut() { - if peer.is_authority { - if peer.known_messages.insert(message_hash.clone()) || force { - let message = get_message(); - trace!(target:"gossip", "Propagating to authority {}: {:?}", id, message); - protocol.send_message(id.clone(), Message::Consensus(message)); - } - } else if non_authorities.contains(&id) { - let message = get_message(); - trace!(target:"gossip", "Propagating to {}: {:?}", id, message); - protocol.send_message(id.clone(), Message::Consensus(message)); - } + /// Handle new connected peer. + pub fn new_peer(&mut self, protocol: &mut Context, who: PeerId, roles: Roles) { + trace!(target:"gossip", "Registering {:?} {}", roles, who); + self.peers.insert(who.clone(), PeerConsensus { + known_messages: HashSet::new(), + roles, + }); + for (engine_id, v) in self.validators.clone() { + let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() }; + v.new_peer(&mut context, &who, roles); } } - fn register_message( + fn register_message( &mut self, message_hash: B::Hash, topic: B::Hash, - status: Status, - get_message: F, - ) - where F: Fn() -> ConsensusMessage - { - if self.known_messages.insert(message_hash, ()).is_none() { + message: ConsensusMessage, + ) { + if self.known_messages.insert(message_hash.clone(), ()).is_none() { self.messages.push(MessageEntry { - topic, message_hash, - message: get_message(), - status, + topic, + message, }); } } /// Call when a peer has been disconnected to stop tracking gossip status. - pub fn peer_disconnected(&mut self, _protocol: &mut Context, who: PeerId) { - self.peers.remove(&who); + pub fn peer_disconnected(&mut self, protocol: &mut Context, who: PeerId) { + for (engine_id, v) in self.validators.clone() { + let mut context = NetworkContext { gossip: self, protocol, engine_id: engine_id.clone() }; + v.peer_disconnected(&mut context, &who); + } + } + + /// Perform periodic maintenance + pub fn tick(&mut self, protocol: &mut Context) { + self.collect_garbage(); + if time::Instant::now() >= self.next_broadcast { + self.rebroadcast(protocol); + self.next_broadcast = time::Instant::now() + REBROADCAST_INTERVAL; + } + } + + /// Rebroadcast all messages to all peers. + fn rebroadcast(&mut self, protocol: &mut Context) { + let messages = self.messages.iter() + .map(|entry| (&entry.message_hash, &entry.topic, &entry.message)); + propagate(protocol, messages, MessageIntent::PeriodicRebroadcast, &mut self.peers, &self.validators); + } + + /// Broadcast all messages with given topic. + pub fn broadcast_topic(&mut self, protocol: &mut Context, topic: B::Hash, force: bool) { + let messages = self.messages.iter() + .filter_map(|entry| + if entry.topic == topic { Some((&entry.message_hash, &entry.topic, &entry.message)) } else { None } + ); + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; + propagate(protocol, messages, intent, &mut self.peers, &self.validators); } /// Prune old or no longer relevant consensus messages. Provide a predicate /// for pruning, which returns `false` when the items with a given topic should be pruned. pub fn collect_garbage(&mut self) { - use std::collections::hash_map::Entry; - self.live_message_sinks.retain(|_, sinks| { sinks.retain(|sink| !sink.is_closed()); !sinks.is_empty() @@ -241,42 +357,17 @@ impl ConsensusGossip { /// Get data of valid, incoming messages for a topic (but might have expired meanwhile) pub fn messages_for(&mut self, engine_id: ConsensusEngineId, topic: B::Hash) - -> mpsc::UnboundedReceiver> + -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded(); - - let validator = match self.validators.get(&engine_id) { - None => { - self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); - return rx; - } - Some(v) => v, - }; - for entry in self.messages.iter_mut() .filter(|e| e.topic == topic && e.message.engine_id == engine_id) { - let live = match entry.status { - Status::Live => true, - Status::Future => match validator.validate(&entry.message.data) { - ValidationResult::Valid(_) => { - entry.status = Status::Live; - true - } - _ => { - // don't send messages considered to be future still. - // if messages are considered expired they'll be cleaned up when we - // collect garbage. - false - } - } - }; - - if live { - entry.status = Status::Live; - tx.unbounded_send(entry.message.data.clone()) - .expect("receiver known to be live; qed"); - } + tx.unbounded_send(TopicNotification { + message: entry.message.data.clone(), + sender: None, + }) + .expect("receiver known to be live; qed"); } self.live_message_sinks.entry((engine_id, topic)).or_default().push(tx); @@ -293,65 +384,80 @@ impl ConsensusGossip { protocol: &mut Context, who: PeerId, message: ConsensusMessage, - ) -> Option<(B::Hash, ConsensusMessage)> { + ) { let message_hash = HashFor::::hash(&message.data[..]); if self.known_messages.contains_key(&message_hash) { trace!(target:"gossip", "Ignored already known message from {}", who); - return None; + return; } - if let Some(ref mut peer) = self.peers.get_mut(&who) { - use std::collections::hash_map::Entry; - - let engine_id = message.engine_id; - // validate the message - let (topic, status) = match self.validators.get(&engine_id) - .map(|v| v.validate(&message.data)) - { - Some(ValidationResult::Valid(topic)) => (topic, Status::Live), - Some(ValidationResult::Future(topic)) => (topic, Status::Future), - Some(ValidationResult::Invalid) => { - trace!(target:"gossip", "Invalid message from {}", who); - protocol.report_peer( - who, - Severity::Bad(format!("Sent invalid consensus message")), - ); - return None; - }, - Some(ValidationResult::Expired) => { - trace!(target:"gossip", "Ignored expired message from {}", who); - return None; - }, - None => { - protocol.report_peer( - who.clone(), - Severity::Useless(format!("Sent unknown consensus engine id")), - ); - trace!(target:"gossip", "Unknown message engine id {:?} from {}", - engine_id, who); - return None; - }, - }; + let engine_id = message.engine_id; + //validate the message + let validation = self.validators.get(&engine_id) + .cloned() + .map(|v| { + let mut context = NetworkContext { gossip: self, protocol, engine_id }; + v.validate(&mut context, &who, &message.data) + }); + let validation_result = match validation { + Some(ValidationResult::ProcessAndKeep(topic)) => Some((topic, true)), + Some(ValidationResult::ProcessAndDiscard(topic)) => Some((topic, false)), + Some(ValidationResult::Discard) => None, + None => { + trace!(target:"gossip", "Unknown message engine id {:?} from {}", engine_id, who); + protocol.report_peer( + who, + Severity::Useless(format!("Sent unknown consensus engine id")), + ); + return; + } + }; - peer.known_messages.insert(message_hash); - if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { - debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); - entry.get_mut().retain(|sink| { - if let Err(e) = sink.unbounded_send(message.data.clone()) { - trace!(target:"gossip", "Error broadcasting message notification: {:?}", e); + if let Some((topic, keep)) = validation_result { + if let Some(ref mut peer) = self.peers.get_mut(&who) { + use std::collections::hash_map::Entry; + peer.known_messages.insert(message_hash); + if let Entry::Occupied(mut entry) = self.live_message_sinks.entry((engine_id, topic)) { + debug!(target: "gossip", "Pushing consensus message to sinks for {}.", topic); + entry.get_mut().retain(|sink| { + if let Err(e) = sink.unbounded_send(TopicNotification { + message: message.data.clone(), + sender: Some(who.clone()) + }) { + trace!(target: "gossip", "Error broadcasting message notification: {:?}", e); + } + !sink.is_closed() + }); + if entry.get().is_empty() { + entry.remove_entry(); } - !sink.is_closed() - }); - if entry.get().is_empty() { - entry.remove_entry(); } + if keep { + self.register_message(message_hash, topic, message); + } + } else { + trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); } - self.multicast_inner(protocol, message_hash, topic, status, || message.clone(), false); - Some((topic, message)) } else { - trace!(target:"gossip", "Ignored statement from unregistered peer {}", who); - None + trace!(target:"gossip", "Handled valid one hop message from peer {}", who); + } + } + + /// Send all messages with given topic to a peer. + pub fn send_topic(&mut self, protocol: &mut Context, who: &PeerId, topic: B::Hash, engine_id: ConsensusEngineId, force: bool) { + if let Some(ref mut peer) = self.peers.get_mut(who) { + for entry in self.messages.iter().filter(|m| m.topic == topic && m.message.engine_id == engine_id) { + if !force && peer.known_messages.contains(&entry.message_hash) { + continue + } + peer.known_messages.insert(entry.message_hash.clone()); + trace!(target: "gossip", "Sending topic message to {}: {:?}", who, entry.message); + protocol.send_message(who.clone(), Message::Consensus(ConsensusMessage { + engine_id: engine_id.clone(), + data: entry.message.data.clone(), + })); + } } } @@ -364,24 +470,30 @@ impl ConsensusGossip { force: bool, ) { let message_hash = HashFor::::hash(&message.data); - self.multicast_inner(protocol, message_hash, topic, Status::Live, || message.clone(), force); + self.register_message(message_hash, topic, message.clone()); + let intent = if force { MessageIntent::ForcedBroadcast } else { MessageIntent::Broadcast }; + propagate(protocol, iter::once((&message_hash, &topic, &message)), intent, &mut self.peers, &self.validators); } - fn multicast_inner( + /// Send addressed message to a peer. The message is not kept or multicast + /// later on. + pub fn send_message( &mut self, protocol: &mut Context, - message_hash: B::Hash, - topic: B::Hash, - status: Status, - get_message: F, - force: bool, - ) - where F: Fn() -> ConsensusMessage - { - self.register_message(message_hash, topic, status, &get_message); - if let Status::Live = status { - self.propagate(protocol, message_hash, get_message, force); - } + who: &PeerId, + message: ConsensusMessage, + ) { + let peer = match self.peers.get_mut(who) { + None => return, + Some(peer) => peer, + }; + + let message_hash = HashFor::::hash(&message.data); + + trace!(target: "gossip", "Sending direct to {}: {:?}", who, message); + + peer.known_messages.insert(message_hash); + protocol.send_message(who.clone(), Message::Consensus(message.clone())); } } @@ -398,33 +510,36 @@ mod tests { ($consensus:expr, $topic:expr, $hash: expr, $m:expr) => { if $consensus.known_messages.insert($hash, ()).is_none() { $consensus.messages.push(MessageEntry { - topic: $topic, message_hash: $hash, - message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0] }, - status: Status::Live, + topic: $topic, + message: ConsensusMessage { data: $m, engine_id: [0, 0, 0, 0]}, }); } } } struct AllowAll; - impl Validator for AllowAll { - fn validate(&self, _data: &[u8]) -> ValidationResult { - ValidationResult::Valid(H256::default()) + impl Validator for AllowAll { + fn validate(&self, _context: &mut ValidatorContext, _sender: &PeerId, _data: &[u8]) -> ValidationResult { + ValidationResult::ProcessAndKeep(H256::default()) } } #[test] fn collects_garbage() { struct AllowOne; - impl Validator for AllowOne { - fn validate(&self, data: &[u8]) -> ValidationResult { + impl Validator for AllowOne { + fn validate(&self, _context: &mut ValidatorContext, _sender: &PeerId, data: &[u8]) -> ValidationResult { if data[0] == 1 { - ValidationResult::Valid(H256::default()) + ValidationResult::ProcessAndKeep(H256::default()) } else { - ValidationResult::Expired + ValidationResult::Discard } } + + fn message_expired<'a>(&'a self) -> Box bool + 'a> { + Box::new(move |_topic, data| data[0] != 1 ) + } } let prev_hash = H256::random(); @@ -441,12 +556,12 @@ mod tests { consensus.known_messages.insert(m2_hash, ()); let test_engine_id = Default::default(); - consensus.register_validator(test_engine_id, Arc::new(AllowAll)); + consensus.register_validator_internal(test_engine_id, Arc::new(AllowAll)); consensus.collect_garbage(); assert_eq!(consensus.messages.len(), 2); assert_eq!(consensus.known_messages.len(), 2); - consensus.register_validator(test_engine_id, Arc::new(AllowOne)); + consensus.register_validator_internal(test_engine_id, Arc::new(AllowOne)); // m2 is expired consensus.collect_garbage(); @@ -461,17 +576,17 @@ mod tests { use futures::Stream; let mut consensus = ConsensusGossip::::new(); - consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); + consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, Status::Live, || message.clone()); + consensus.register_message(message_hash, topic, message.clone()); let stream = consensus.messages_for([0, 0, 0, 0], topic); - assert_eq!(stream.wait().next(), Some(Ok(message.data))); + assert_eq!(stream.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None }))); } #[test] @@ -482,8 +597,8 @@ mod tests { let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; - consensus.register_message(HashFor::::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); - consensus.register_message(HashFor::::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); + consensus.register_message(HashFor::::hash(&msg_a.data), topic,msg_a); + consensus.register_message(HashFor::::hash(&msg_b.data), topic,msg_b); assert_eq!(consensus.messages.len(), 2); } @@ -491,37 +606,37 @@ mod tests { #[test] fn can_keep_multiple_subscribers_per_topic() { let mut consensus = ConsensusGossip::::new(); - consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); + consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); let message = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 0] }; let message_hash = HashFor::::hash(&message.data); let topic = HashFor::::hash(&[1,2,3]); - consensus.register_message(message_hash, topic, Status::Live, || message.clone()); + consensus.register_message(message_hash, topic, message.clone()); let stream1 = consensus.messages_for([0, 0, 0, 0], topic); let stream2 = consensus.messages_for([0, 0, 0, 0], topic); - assert_eq!(stream1.wait().next(), Some(Ok(message.data.clone()))); - assert_eq!(stream2.wait().next(), Some(Ok(message.data))); + assert_eq!(stream1.wait().next(), Some(Ok(TopicNotification { message: message.data.clone(), sender: None }))); + assert_eq!(stream2.wait().next(), Some(Ok(TopicNotification { message: message.data, sender: None }))); } #[test] fn topics_are_localized_to_engine_id() { let mut consensus = ConsensusGossip::::new(); - consensus.register_validator([0, 0, 0, 0], Arc::new(AllowAll)); + consensus.register_validator_internal([0, 0, 0, 0], Arc::new(AllowAll)); let topic = [1; 32].into(); let msg_a = ConsensusMessage { data: vec![1, 2, 3], engine_id: [0, 0, 0, 0] }; let msg_b = ConsensusMessage { data: vec![4, 5, 6], engine_id: [0, 0, 0, 1] }; - consensus.register_message(HashFor::::hash(&msg_a.data), topic, Status::Live, || msg_a.clone()); - consensus.register_message(HashFor::::hash(&msg_b.data), topic, Status::Live, || msg_b.clone()); + consensus.register_message(HashFor::::hash(&msg_a.data), topic, msg_a); + consensus.register_message(HashFor::::hash(&msg_b.data), topic, msg_b); let mut stream = consensus.messages_for([0, 0, 0, 0], topic).wait(); - assert_eq!(stream.next(), Some(Ok(vec![1, 2, 3]))); + assert_eq!(stream.next(), Some(Ok(TopicNotification { message: vec![1, 2, 3], sender: None }))); let _ = consensus.live_message_sinks.remove(&([0, 0, 0, 0], topic)); assert_eq!(stream.next(), None); } diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 185f595cc091..99520ea99a7a 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -24,7 +24,7 @@ use runtime_primitives::traits::{As, Block as BlockT, Header as HeaderT, NumberF use consensus::import_queue::ImportQueue; use crate::message::{self, Message}; use crate::message::generic::{Message as GenericMessage, ConsensusMessage}; -use crate::consensus_gossip::ConsensusGossip; +use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::on_demand::OnDemandService; use crate::specialization::NetworkSpecialization; use crate::sync::{ChainSync, Status as SyncStatus, SyncState}; @@ -237,7 +237,7 @@ pub enum ProtocolMsg> { /// Execute a closure with the consensus gossip. ExecuteWithGossip(Box + Send + 'static>), /// Incoming gossip consensus message. - GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, bool), + GossipConsensusMessage(B::Hash, ConsensusEngineId, Vec, GossipMessageRecipient), /// Tell protocol to abort sync (does not stop protocol). /// Only used in tests. #[cfg(any(test, feature = "test-helpers"))] @@ -377,8 +377,8 @@ impl, H: ExHashT> Protocol { ProtocolContext::new(&mut self.context_data, &self.network_chan); task.call_box(&mut self.consensus_gossip, &mut context); } - ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, force) => { - self.gossip_consensus_message(topic, engine_id, message, force) + ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => { + self.gossip_consensus_message(topic, engine_id, message, recipient) } ProtocolMsg::BlocksProcessed(hashes, has_error) => { self.sync.blocks_processed(hashes, has_error); @@ -523,14 +523,18 @@ impl, H: ExHashT> Protocol { topic: B::Hash, engine_id: ConsensusEngineId, message: Vec, - force: bool, + recipient: GossipMessageRecipient, ) { - self.consensus_gossip.multicast( - &mut ProtocolContext::new(&mut self.context_data, &self.network_chan), - topic, - ConsensusMessage{ data: message, engine_id }, - force, - ); + let mut context = ProtocolContext::new(&mut self.context_data, &self.network_chan); + let message = ConsensusMessage { data: message, engine_id }; + match recipient { + GossipMessageRecipient::BroadcastToAll => + self.consensus_gossip.multicast(&mut context, topic, message, true), + GossipMessageRecipient::BroadcastNew => + self.consensus_gossip.multicast(&mut context, topic, message, false), + GossipMessageRecipient::Peer(who) => + self.send_message(who, GenericMessage::Consensus(message)), + } } /// Called when a new peer is connected @@ -669,7 +673,7 @@ impl, H: ExHashT> Protocol { /// Perform time based maintenance. fn tick(&mut self) { - self.consensus_gossip.collect_garbage(); + self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); self.maintain_peers(); self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &self.network_chan)); self.on_demand diff --git a/core/network/src/service.rs b/core/network/src/service.rs index 8e8658739884..8ca4f0525904 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::{io, thread}; + use log::{warn, debug, error, trace, info}; use futures::{Async, Future, Stream, stream, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; @@ -26,15 +27,16 @@ use network_libp2p::{start_service, parse_str_addr, Service as NetworkService, S use network_libp2p::{multiaddr, RegisteredProtocol, NetworkState}; use peerset::Peerset; use consensus::import_queue::{ImportQueue, Link}; -use crate::consensus_gossip::ConsensusGossip; +use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; + +use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient}; use crate::message::Message; use crate::protocol::{self, Context, FromNetworkMsg, Protocol, ConnectedPeer, ProtocolMsg, ProtocolStatus, PeerInfo}; use crate::config::Params; -use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use crate::error::Error; -use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; use crate::specialization::NetworkSpecialization; +use crossbeam_channel::{self as channel, Receiver, Sender, TryRecvError}; use tokio::prelude::task::AtomicTask; use tokio::runtime::Builder as RuntimeBuilder; @@ -257,12 +259,12 @@ impl> Service { topic: B::Hash, engine_id: ConsensusEngineId, message: Vec, - force: bool, + recipient: GossipMessageRecipient, ) { let _ = self .protocol_sender .send(ProtocolMsg::GossipConsensusMessage( - topic, engine_id, message, force, + topic, engine_id, message, recipient, )); } diff --git a/core/network/src/test/mod.rs b/core/network/src/test/mod.rs index b4b1c4f4e249..1768073a40e8 100644 --- a/core/network/src/test/mod.rs +++ b/core/network/src/test/mod.rs @@ -35,7 +35,7 @@ use consensus::import_queue::{BasicQueue, ImportQueue, IncomingBlock}; use consensus::import_queue::{Link, SharedBlockImport, SharedJustificationImport, Verifier}; use consensus::{Error as ConsensusError, ErrorKind as ConsensusErrorKind}; use consensus::{BlockOrigin, ForkChoiceStrategy, ImportBlock, JustificationImport}; -use crate::consensus_gossip::ConsensusGossip; +use crate::consensus_gossip::{ConsensusGossip, MessageRecipient as GossipMessageRecipient, TopicNotification}; use crossbeam_channel::{self as channel, Sender, select}; use futures::Future; use futures::sync::{mpsc, oneshot}; @@ -260,14 +260,12 @@ impl + Clone> Peer { } // SyncOracle: are we connected to any peer? - #[cfg(test)] - fn is_offline(&self) -> bool { + pub fn is_offline(&self) -> bool { self.is_offline.load(Ordering::Relaxed) } // SyncOracle: are we in the process of catching-up with the chain? - #[cfg(test)] - fn is_major_syncing(&self) -> bool { + pub fn is_major_syncing(&self) -> bool { self.is_major_syncing.load(Ordering::Relaxed) } @@ -366,9 +364,10 @@ impl + Clone> Peer { data: Vec, force: bool, ) { + let recipient = if force { GossipMessageRecipient::BroadcastToAll } else { GossipMessageRecipient::BroadcastNew }; let _ = self .protocol_sender - .send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, force)); + .send(ProtocolMsg::GossipConsensusMessage(topic, engine_id, data, recipient)); } pub fn consensus_gossip_collect_garbage_for_topic(&self, _topic: ::Hash) { @@ -380,7 +379,7 @@ impl + Clone> Peer { &self, engine_id: ConsensusEngineId, topic: ::Hash, - ) -> mpsc::UnboundedReceiver> { + ) -> mpsc::UnboundedReceiver { let (tx, rx) = oneshot::channel(); self.with_gossip(move |gossip, _| { let inner_rx = gossip.messages_for(engine_id, topic); diff --git a/core/test-runtime/wasm/Cargo.lock b/core/test-runtime/wasm/Cargo.lock index 5c6e85a69e0e..1a5d2ee24a69 100644 --- a/core/test-runtime/wasm/Cargo.lock +++ b/core/test-runtime/wasm/Cargo.lock @@ -611,7 +611,7 @@ dependencies = [ [[package]] name = "hash-db" -version = "0.12.0" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -631,6 +631,11 @@ dependencies = [ "scopeguard 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "hashmap_core" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "heapsize" version = "0.4.2" @@ -1154,10 +1159,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "memory-db" -version = "0.12.0" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", + "hashmap_core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2020,7 +2026,7 @@ name = "sr-io" version = "1.0.0" dependencies = [ "environmental 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "libsecp256k1 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2256,7 +2262,7 @@ dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)", - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "kvdb 0.1.0 (git+https://github.com/paritytech/parity-common?rev=b0317f649ab2c665b7987b8475878fc4d2e1f81d)", @@ -2384,7 +2390,7 @@ dependencies = [ "base58 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "blake2-rfc 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "hash256-std-hasher 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2419,7 +2425,7 @@ dependencies = [ name = "substrate-state-machine" version = "1.0.0" dependencies = [ - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2428,8 +2434,8 @@ dependencies = [ "substrate-panic-handler 1.0.0", "substrate-primitives 1.0.0", "substrate-trie 1.0.0", - "trie-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "trie-root 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "trie-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", + "trie-root 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2456,6 +2462,7 @@ dependencies = [ "cfg-if 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", + "memory-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.89 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2472,6 +2479,8 @@ dependencies = [ "substrate-keyring 1.0.0", "substrate-offchain-primitives 0.1.0", "substrate-primitives 1.0.0", + "substrate-trie 1.0.0", + "trie-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2485,11 +2494,12 @@ dependencies = [ name = "substrate-trie" version = "1.0.0" dependencies = [ - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "memory-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", + "memory-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-codec 3.3.0 (registry+https://github.com/rust-lang/crates.io-index)", - "trie-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", - "trie-root 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "sr-std 1.0.0", + "trie-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", + "trie-root 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -2769,21 +2779,22 @@ dependencies = [ [[package]] name = "trie-db" -version = "0.12.0" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "elastic-array 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)", - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", + "hashmap_core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] name = "trie-root" -version = "0.12.0" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -3064,9 +3075,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum futures-cpupool 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "ab90cde24b3319636588d0c35fe03b1333857621051837ed769faefb4c2162e4" "checksum generic-array 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3c0f28c2f5bfb5960175af447a2da7c18900693738343dc896ffbcabd9839592" "checksum generic-array 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)" = "fceb69994e330afed50c93524be68c42fa898c2d9fd4ee8da03bd7363acd26f2" -"checksum hash-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "07463834729d0ce8d475e7dd6d302e407093ad9a9c02d77eb07fb74b5373829d" +"checksum hash-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ba7fb417e5c470acdd61068c79767d0e65962e70836cf6c9dfd2409f06345ce0" "checksum hash256-std-hasher 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1224388a21c88a80ae7087a2a245ca6d80acc97a9186b75789fb3eeefd0609af" "checksum hashbrown 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3bae29b6653b3412c2e71e9d486db9f9df5d701941d86683005efb9f2d28e3da" +"checksum hashmap_core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "8e04cb7a5051270ef3fa79f8c7604d581ecfa73d520e74f554e45541c4b5881a" "checksum heapsize 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1679e6ea370dee694f91f1dc469bf94cf8f52051d147aec3e1f9497c6fc22461" "checksum heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" "checksum hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" @@ -3111,7 +3123,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" "checksum memchr 2.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2efc7bc57c883d4a4d6e3246905283d8dae951bb3bd32f49d6ef297f546e1c39" "checksum memoffset 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0f9dc261e2b62d7a622bf416ea3c5245cdd5d9a7fcc428c0d06804dfce1775b3" -"checksum memory-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3dd87d4d64f7b86d8804bbb419f8ecb187cb8f40a50e91c72848075c604ba88d" +"checksum memory-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7623b01a4f1b7acb7cf8e3f678f05e15e6ae26cb0b738dfeb5cc186fd6b82ef4" "checksum memory_units 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "71d96e3f3c0b6325d8ccd83c33b28acb183edcb6c67938ba104ec546854b0882" "checksum merlin 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "83c2dda19c01176e8e7148f7bdb88bbdf215a8db0641f89fc40e4b81736aeda5" "checksum mio 0.6.16 (registry+https://github.com/rust-lang/crates.io-index)" = "71646331f2619b1026cc302f87a2b8b648d5c6dd6937846a16cc8ce0f347f432" @@ -3241,8 +3253,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum tokio-udp 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "66268575b80f4a4a710ef83d087fdfeeabdce9b74c797535fbac18a2cb906e92" "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445" "checksum toml 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "758664fc71a3a69038656bee8b6be6477d2a6c315a6b81f7081f591bffa4111f" -"checksum trie-db 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7f23230c31251bdbdba89cd9caa0cbe2aa9e4aa5f92a80e6eb6296bb290e9146" -"checksum trie-root 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e45632ecaf2b8b4a40b5208383cd659b4e66f58ccd40086467a4614b45781430" +"checksum trie-db 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "1ba73747fd3a64ab531274c04cb588dfa9d30d972d62990831e63fbce2cfec59" +"checksum trie-root 0.12.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cfa2e20c4f1418ac2e71ddc418e35e1b56e34022e2146209ffdbf1b2de8b1bd9" "checksum twofish 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712d261e83e727c8e2dbb75dacac67c36e35db36a958ee504f2164fc052434e1" "checksum twox-hash 1.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "555cd4909480122bbbf21e34faac4cb08a171f324775670447ed116726c474af" "checksum typenum 1.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "612d636f949607bdf9b123b4a6f6d966dedf3ff669f7f045890d3a4a73948169" diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index a1fb61f175b4..afc1ed39a8e2 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -119,7 +119,7 @@ construct_service_factory! { name: Some(service.config.name.clone()) }, link_half, - grandpa::NetworkBridge::new(service.network()), + service.network(), service.config.custom.inherent_data_providers.clone(), service.on_exit(), )?);