Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions core/finality-grandpa/src/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use log::{debug, trace};
use parity_codec::{Encode, Decode};
use substrate_primitives::{ed25519, Pair};
use substrate_telemetry::{telemetry, CONSENSUS_INFO};
use network::consensus_gossip as network_gossip;
use runtime_primitives::traits::Block as BlockT;
use tokio::timer::Interval;
use crate::{Error, Network, Message, SignedMessage, Commit,
Expand Down Expand Up @@ -268,13 +269,13 @@ pub(crate) fn checked_message_stream<Block: BlockT, S>(
voters: Arc<VoterSet<AuthorityId>>,
)
-> impl Stream<Item=SignedMessage<Block>,Error=Error> where
S: Stream<Item=Vec<u8>,Error=()>
S: Stream<Item=network_gossip::TopicNotification, Error=()>
{
inner
.filter_map(|raw| {
let decoded = GossipMessage::<Block>::decode(&mut &raw[..]);
.filter_map(|notification| {
let decoded = GossipMessage::<Block>::decode(&mut &notification.message[..]);
if decoded.is_none() {
debug!(target: "afg", "Skipping malformed message {:?}", raw);
debug!(target: "afg", "Skipping malformed message {:?}", notification);
}
decoded
})
Expand Down Expand Up @@ -442,14 +443,14 @@ pub(crate) fn checked_commit_stream<Block: BlockT, S>(
voters: Arc<VoterSet<AuthorityId>>,
)
-> impl Stream<Item=(u64, CompactCommit<Block>),Error=Error> where
S: Stream<Item=Vec<u8>,Error=()>
S: Stream<Item=network_gossip::TopicNotification, Error=()>
{
inner
.filter_map(|raw| {
.filter_map(|notification| {
// this could be optimized by decoding piecewise.
let decoded = GossipMessage::<Block>::decode(&mut &raw[..]);
let decoded = GossipMessage::<Block>::decode(&mut &notification.message[..]);
if decoded.is_none() {
trace!(target: "afg", "Skipping malformed commit message {:?}", raw);
trace!(target: "afg", "Skipping malformed commit message {:?}", notification);
}
decoded
})
Expand Down
48 changes: 30 additions & 18 deletions core/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,12 @@ impl From<ClientError> for Error {

/// A stream used by NetworkBridge in its implementation of Network.
pub struct NetworkStream {
inner: Option<mpsc::UnboundedReceiver<Vec<u8>>>,
outer: oneshot::Receiver<mpsc::UnboundedReceiver<Vec<u8>>>
inner: Option<mpsc::UnboundedReceiver<network_gossip::TopicNotification>>,
outer: oneshot::Receiver<mpsc::UnboundedReceiver<network_gossip::TopicNotification>>
}

impl Stream for NetworkStream {
type Item = Vec<u8>;
type Item = network_gossip::TopicNotification;
type Error = ();

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -336,7 +336,7 @@ impl<Block: BlockT> GossipValidator<Block> {
-> network_gossip::ValidationResult<Block::Hash>
{
if self.is_expired(full.round, full.set_id) {
return network_gossip::ValidationResult::Expired;
return network_gossip::ValidationResult::Discard;
}

if let Err(()) = communication::check_message_sig::<Block>(
Expand All @@ -348,11 +348,11 @@ impl<Block: BlockT> GossipValidator<Block> {
) {
debug!(target: "afg", "Bad message signature {}", full.message.id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id);
return network_gossip::ValidationResult::Invalid;
return network_gossip::ValidationResult::Discard;
}

let topic = message_topic::<Block>(full.round, full.set_id);
network_gossip::ValidationResult::Valid(topic)
network_gossip::ValidationResult::ProcessAndKeep(topic)
}

fn validate_commit_message(&self, full: FullCommitMessage<Block>)
Expand All @@ -361,7 +361,7 @@ impl<Block: BlockT> GossipValidator<Block> {
use grandpa::Message as GrandpaMessage;

if self.is_expired(full.round, full.set_id) {
return network_gossip::ValidationResult::Expired;
return network_gossip::ValidationResult::Discard;
}

if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() {
Expand All @@ -371,7 +371,7 @@ impl<Block: BlockT> GossipValidator<Block> {
"auth_data_len" => ?full.message.auth_data.len(),
"precommits_is_empty" => ?full.message.precommits.is_empty(),
);
return network_gossip::ValidationResult::Invalid;
return network_gossip::ValidationResult::Discard;
}

// check signatures on all contained precommits.
Expand All @@ -385,7 +385,7 @@ impl<Block: BlockT> GossipValidator<Block> {
) {
debug!(target: "afg", "Bad commit message signature {}", id);
telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id);
return network_gossip::ValidationResult::Invalid;
return network_gossip::ValidationResult::Discard;
}
}

Expand All @@ -402,19 +402,21 @@ impl<Block: BlockT> GossipValidator<Block> {
"topic" => ?topic,
"block_hash" => ?full.message,
);
network_gossip::ValidationResult::Valid(topic)
network_gossip::ValidationResult::ProcessAndKeep(topic)
}
}

impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<Block> {
fn validate(&self, mut data: &[u8]) -> network_gossip::ValidationResult<Block::Hash> {
impl<Block: BlockT> network_gossip::Validator<Block> for GossipValidator<Block> {
fn validate(&self, _context: &mut network_gossip::ValidatorContext<Block>, _sender: &network::PeerId, mut data: &[u8])
-> network_gossip::ValidationResult<Block::Hash>
{
match GossipMessage::<Block>::decode(&mut data) {
Some(GossipMessage::VoteOrPrecommit(message)) => self.validate_round_message(message),
Some(GossipMessage::Commit(message)) => self.validate_commit_message(message),
None => {
debug!(target: "afg", "Error decoding message");
telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => "");
network_gossip::ValidationResult::Invalid
network_gossip::ValidationResult::Discard
}
}
}
Expand All @@ -438,7 +440,7 @@ impl<Block: BlockT> network_gossip::Validator<Block::Hash> for GossipValidator<B
/// Intended to be a lightweight handle such as an `Arc`.
pub trait Network<Block: BlockT>: Clone {
/// A stream of input messages for a topic.
type In: Stream<Item=Vec<u8>,Error=()>;
type In: Stream<Item=network_gossip::TopicNotification, Error=()>;

/// Get a stream of messages for a specific round. This stream should
/// never logically conclude.
Expand Down Expand Up @@ -475,8 +477,8 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>> NetworkBri
pub fn new(service: Arc<NetworkService<B, S>>) -> Self {
let validator = Arc::new(GossipValidator::new());
let v = validator.clone();
service.with_gossip(move |gossip, _| {
gossip.register_validator(GRANDPA_ENGINE_ID, v);
service.with_gossip(move |gossip, context| {
gossip.register_validator(context, GRANDPA_ENGINE_ID, v);
});
NetworkBridge { service, validator: validator }
}
Expand Down Expand Up @@ -513,7 +515,12 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B

fn send_message(&self, round: u64, set_id: u64, message: Vec<u8>, force: bool) {
let topic = message_topic::<B>(round, set_id);
self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, force);
let recipient = if force {
network_gossip::MessageRecipient::BroadcastToAll
} else {
network_gossip::MessageRecipient::BroadcastNew
};
self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, recipient);
}

fn drop_round_messages(&self, round: u64, set_id: u64) {
Expand All @@ -538,7 +545,12 @@ impl<B: BlockT, S: network::specialization::NetworkSpecialization<B>,> Network<B

fn send_commit(&self, _round: u64, set_id: u64, message: Vec<u8>, force: bool) {
let topic = commit_topic::<B>(set_id);
self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, force);
let recipient = if force {
network_gossip::MessageRecipient::BroadcastToAll
} else {
network_gossip::MessageRecipient::BroadcastNew
};
self.service.gossip_consensus_message(topic, GRANDPA_ENGINE_ID, message, recipient);
}

fn announce(&self, round: u64, _set_id: u64, block: B::Hash) {
Expand Down
6 changes: 3 additions & 3 deletions core/finality-grandpa/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ impl MessageRouting {
{
let inner = inner.lock();
let peer = inner.peer(peer_id);
peer.with_gossip(move |gossip, _| {
gossip.register_validator(GRANDPA_ENGINE_ID, v);
peer.with_gossip(move |gossip, context| {
gossip.register_validator(context, GRANDPA_ENGINE_ID, v);
});
}
MessageRouting {
Expand All @@ -175,7 +175,7 @@ fn make_commit_topic(set_id: u64) -> Hash {
}

impl Network<Block> for MessageRouting {
type In = Box<Stream<Item=Vec<u8>,Error=()> + Send>;
type In = Box<Stream<Item=network_gossip::TopicNotification, Error=()> + Send>;

fn messages_for(&self, round: u64, set_id: u64) -> Self::In {
self.validator.note_round(round, set_id);
Expand Down
Loading