Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ mod periodic;
pub(crate) mod tests;

pub use sp_finality_grandpa::GRANDPA_ENGINE_ID;
pub const GRANDPA_PROTOCOL_NAME: &[u8] = b"/paritytech/grandpa/1";

// cost scalars for reporting peers.
mod cost {
Expand Down Expand Up @@ -185,7 +186,12 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
);

let validator = Arc::new(validator);
let gossip_engine = GossipEngine::new(service.clone(), GRANDPA_ENGINE_ID, validator.clone());
let gossip_engine = GossipEngine::new(
service.clone(),
GRANDPA_ENGINE_ID,
GRANDPA_PROTOCOL_NAME,
validator.clone()
);

{
// register all previous votes with the gossip service so that they're
Expand Down
4 changes: 2 additions & 2 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;
use sp_keyring::Ed25519Keyring;
use parity_scale_codec::Encode;
use sp_runtime::{ConsensusEngineId, traits::NumberFor};
use std::{pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, pin::Pin, task::{Context, Poll}};
use crate::environment::SharedVoterSetState;
use sp_finality_grandpa::{AuthorityList, GRANDPA_ENGINE_ID};
use super::gossip::{self, GossipValidator};
Expand Down Expand Up @@ -61,7 +61,7 @@ impl sc_network_gossip::Network<Block> for TestNetwork {
let _ = self.sender.unbounded_send(Event::WriteNotification(who, message));
}

fn register_notifications_protocol(&self, _: ConsensusEngineId) {}
fn register_notifications_protocol(&self, _: ConsensusEngineId, _: Cow<'static, [u8]>) {}

fn announce(&self, block: Hash, _associated_data: Vec<u8>) {
let _ = self.sender.unbounded_send(Event::Announce(block));
Expand Down
5 changes: 4 additions & 1 deletion client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,10 @@ pub fn setup_disabled_grandpa<B, E, Block: BlockT, RA, N>(
// We register the GRANDPA protocol so that we don't consider it an anomaly
// to receive GRANDPA messages on the network. We don't process the
// messages.
network.register_notifications_protocol(communication::GRANDPA_ENGINE_ID);
network.register_notifications_protocol(
communication::GRANDPA_ENGINE_ID,
From::from(communication::GRANDPA_PROTOCOL_NAME),
);

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions client/network-gossip/src/bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::{prelude::*, channel::mpsc};
use libp2p::PeerId;
use parking_lot::Mutex;
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{pin::Pin, sync::Arc, task::{Context, Poll}};
use std::{borrow::Cow, pin::Pin, sync::Arc, task::{Context, Poll}};

/// Wraps around an implementation of the `Network` crate and provides gossiping capabilities on
/// top of it.
Expand All @@ -48,6 +48,7 @@ impl<B: BlockT> GossipEngine<B> {
pub fn new<N: Network<B> + Send + Clone + 'static>(
mut network: N,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>,
validator: Arc<dyn Validator<B>>,
) -> Self where B: 'static {
let mut state_machine = ConsensusGossip::new();
Expand All @@ -56,7 +57,7 @@ impl<B: BlockT> GossipEngine<B> {
// might miss events.
let network_event_stream = network.event_stream();

network.register_notifications_protocol(engine_id);
network.register_notifications_protocol(engine_id, protocol_name.into());
state_machine.register_validator(&mut network, engine_id, validator);

let inner = Arc::new(Mutex::new(GossipEngineInner {
Expand Down
8 changes: 5 additions & 3 deletions client/network-gossip/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub use self::validator::{DiscardAll, MessageIntent, Validator, ValidatorContext
use futures::prelude::*;
use sc_network::{specialization::NetworkSpecialization, Event, ExHashT, NetworkService, PeerId, ReputationChange};
use sp_runtime::{traits::Block as BlockT, ConsensusEngineId};
use std::{pin::Pin, sync::Arc};
use std::{borrow::Cow, pin::Pin, sync::Arc};

mod bridge;
mod state_machine;
Expand All @@ -86,7 +86,8 @@ pub trait Network<B: BlockT> {
/// See the documentation of [`NetworkService:register_notifications_protocol`] for more information.
fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, [u8]>,
);

/// Notify everyone we're connected to that we have the given block.
Expand Down Expand Up @@ -116,8 +117,9 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Network<B> for Arc<Netw
fn register_notifications_protocol(
&self,
engine_id: ConsensusEngineId,
protocol_name: Cow<'static, [u8]>,
) {
NetworkService::register_notifications_protocol(self, engine_id)
NetworkService::register_notifications_protocol(self, engine_id, protocol_name)
}

fn announce(&self, block: B::Hash, associated_data: Vec<u8>) {
Expand Down
3 changes: 2 additions & 1 deletion client/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sc-block-builder = { version = "0.8", path = "../block-builder" }
sc-client = { version = "0.8", path = "../" }
sc-client-api = { version = "2.0.0", path = "../api" }
sc-peerset = { version = "2.0.0", path = "../peerset" }
pin-project = "0.4.6"
serde = { version = "1.0.101", features = ["derive"] }
serde_json = "1.0.41"
slog = { version = "2.5.2", features = ["nested-values"] }
Expand All @@ -51,7 +52,7 @@ sp-runtime = { version = "2.0.0", path = "../../primitives/runtime" }
substrate-test-client = { version = "2.0.0", optional = true, path = "../../test-utils/client" }
substrate-test-runtime-client = { version = "2.0.0", optional = true, path = "../../test-utils/runtime/client" }
thiserror = "1"
unsigned-varint = { version = "0.3.0", features = ["futures-codec"] }
unsigned-varint = { version = "0.3.1", features = ["futures", "futures-codec"] }
void = "1.0.2"
zeroize = "1.0.0"

Expand Down
70 changes: 36 additions & 34 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

use crate::{DiscoveryNetBehaviour, config::ProtocolId};
use legacy_proto::{LegacyProto, LegacyProtoOut};
use crate::utils::interval;
use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use generic_proto::{GenericProto, GenericProtoOut};
use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, nodes::listeners::ListenerId};
use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
Expand All @@ -36,13 +36,14 @@ use sp_runtime::traits::{
};
use sp_arithmetic::traits::SaturatedConversion;
use message::{BlockAnnounce, BlockAttributes, Direction, FromBlock, Message, RequestId};
use message::generic::{Message as GenericMessage, ConsensusMessage};
use message::generic::Message as GenericMessage;
use light_dispatch::{LightDispatch, LightDispatchNetwork, RequestData};
use specialization::NetworkSpecialization;
use sync::{ChainSync, SyncState};
use crate::service::{TransactionPool, ExHashT};
use crate::config::{BoxFinalityProofRequestBuilder, Roles};
use rustc_hex::ToHex;
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::fmt::Write;
Expand All @@ -64,7 +65,7 @@ pub mod api {
}
}

mod legacy_proto;
mod generic_proto;
mod util;

pub mod block_requests;
Expand Down Expand Up @@ -158,9 +159,11 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// When asked for a proof of finality, we use this struct to build one.
finality_proof_provider: Option<Arc<dyn FinalityProofProvider<B>>>,
/// Handles opening the unique substream and sending and receiving raw messages.
behaviour: LegacyProto,
/// List of notification protocols that have been registered.
registered_notif_protocols: HashSet<ConsensusEngineId>,
behaviour: GenericProto,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
/// For each protocol name, the legacy gossiping engine ID.
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
}

#[derive(Default)]
Expand Down Expand Up @@ -207,7 +210,7 @@ pub struct PeerInfo<B: BlockT> {
}

struct LightDispatchIn<'a> {
behaviour: &'a mut LegacyProto,
behaviour: &'a mut GenericProto,
peerset: sc_peerset::PeersetHandle,
}

Expand Down Expand Up @@ -347,15 +350,15 @@ pub trait Context<B: BlockT> {

/// Protocol context.
struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> {
behaviour: &'a mut LegacyProto,
behaviour: &'a mut GenericProto,
context_data: &'a mut ContextData<B, H>,
peerset_handle: &'a sc_peerset::PeersetHandle,
}

impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> {
fn new(
context_data: &'a mut ContextData<B, H>,
behaviour: &'a mut LegacyProto,
behaviour: &'a mut GenericProto,
peerset_handle: &'a sc_peerset::PeersetHandle,
) -> Self {
ProtocolContext { context_data, peerset_handle, behaviour }
Expand Down Expand Up @@ -442,7 +445,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {

let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let behaviour = LegacyProto::new(protocol_id, versions, peerset);
let behaviour = GenericProto::new(protocol_id, versions, peerset);

let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
Expand All @@ -463,7 +466,8 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
finality_proof_provider,
peerset_handle: peerset_handle.clone(),
behaviour,
registered_notif_protocols: HashSet::new(),
protocol_name_by_engine: HashMap::new(),
protocol_engine_by_name: HashMap::new(),
};

Ok((protocol, peerset_handle))
Expand Down Expand Up @@ -646,7 +650,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::RemoteReadChildRequest(request) =>
self.on_remote_read_child_request(who, request),
GenericMessage::Consensus(msg) =>
return if self.registered_notif_protocols.contains(&msg.engine_id) {
return if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
CustomMessageOutcome::NotificationsReceived {
remote: who.clone(),
messages: vec![(msg.engine_id, From::from(msg.data))],
Expand All @@ -659,7 +663,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
let messages = messages
.into_iter()
.filter_map(|msg| {
if self.registered_notif_protocols.contains(&msg.engine_id) {
if self.protocol_name_by_engine.contains_key(&msg.engine_id) {
Some((msg.engine_id, From::from(msg.data)))
} else {
warn!(target: "sync", "Received message on non-registered protocol: {:?}", msg.engine_id);
Expand Down Expand Up @@ -1060,7 +1064,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
// Notify all the notification protocols as open.
CustomMessageOutcome::NotificationStreamOpened {
remote: who,
protocols: self.registered_notif_protocols.iter().cloned().collect(),
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
roles: info.roles,
}
}
Expand All @@ -1075,18 +1079,15 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
engine_id: ConsensusEngineId,
message: impl Into<Vec<u8>>
) {
if !self.registered_notif_protocols.contains(&engine_id) {
if let Some(protocol_name) = self.protocol_name_by_engine.get(&engine_id) {
self.behaviour.write_notification(&target, engine_id, protocol_name.clone(), message);
} else {
error!(
target: "sub-libp2p",
"Sending a notification with a protocol that wasn't registered: {:?}",
engine_id
);
}

self.send_message(&target, GenericMessage::Consensus(ConsensusMessage {
engine_id,
data: message.into(),
}));
}

/// Registers a new notifications protocol.
Expand All @@ -1096,9 +1097,14 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
pub fn register_notifications_protocol(
&mut self,
engine_id: ConsensusEngineId,
protocol_name: impl Into<Cow<'static, [u8]>>,
) -> Vec<event::Event> {
if !self.registered_notif_protocols.insert(engine_id) {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", engine_id);
let protocol_name = protocol_name.into();
if self.protocol_name_by_engine.insert(engine_id, protocol_name.clone()).is_some() {
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", protocol_name);
} else {
self.behaviour.register_notif_protocol(protocol_name.clone(), engine_id, Vec::new());
self.protocol_engine_by_name.insert(protocol_name, engine_id);
}

// Registering a protocol while we already have open connections isn't great, but for now
Expand Down Expand Up @@ -1833,7 +1839,7 @@ pub enum CustomMessageOutcome<B: BlockT> {
}

fn send_request<B: BlockT, H: ExHashT>(
behaviour: &mut LegacyProto,
behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>,
peers: &mut HashMap<PeerId, Peer<B, H>>,
who: &PeerId,
Expand All @@ -1854,7 +1860,7 @@ fn send_request<B: BlockT, H: ExHashT>(
}

fn send_message<B: BlockT>(
behaviour: &mut LegacyProto,
behaviour: &mut GenericProto,
stats: &mut HashMap<&'static str, PacketStats>,
who: &PeerId,
message: Message<B>,
Expand All @@ -1868,7 +1874,7 @@ fn send_message<B: BlockT>(

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for
Protocol<B, S, H> {
type ProtocolsHandler = <LegacyProto as NetworkBehaviour>::ProtocolsHandler;
type ProtocolsHandler = <GenericProto as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = CustomMessageOutcome<B>;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
Expand Down Expand Up @@ -1954,25 +1960,21 @@ Protocol<B, S, H> {
};

let outcome = match event {
LegacyProtoOut::CustomProtocolOpen { peer_id, version, .. } => {
debug_assert!(
version <= CURRENT_VERSION as u8
&& version >= MIN_VERSION as u8
);
GenericProtoOut::CustomProtocolOpen { peer_id, .. } => {
self.on_peer_connected(peer_id.clone());
CustomMessageOutcome::None
}
LegacyProtoOut::CustomProtocolClosed { peer_id, .. } => {
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id.clone());
// Notify all the notification protocols as closed.
CustomMessageOutcome::NotificationStreamClosed {
remote: peer_id,
protocols: self.registered_notif_protocols.iter().cloned().collect(),
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
}
},
LegacyProtoOut::CustomMessage { peer_id, message } =>
GenericProtoOut::CustomMessage { peer_id, message } =>
self.on_custom_message(peer_id, message),
LegacyProtoOut::Clogged { peer_id, messages } => {
GenericProtoOut::Clogged { peer_id, messages } => {
debug!(target: "sync", "{} clogging messages:", messages.len());
for msg in messages.into_iter().take(5) {
let message: Option<Message<B>> = Decode::decode(&mut &msg[..]).ok();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
//! Implementation of libp2p's `NetworkBehaviour` trait that opens a single substream with the
//! remote and then allows any communication with them.
//!
//! The `Protocol` struct uses `LegacyProto` in order to open substreams with the rest of the
//! The `Protocol` struct uses `GenericProto` in order to open substreams with the rest of the
//! network, then performs the Substrate protocol handling on top.

pub use self::behaviour::{LegacyProto, LegacyProtoOut};
pub use self::behaviour::{GenericProto, GenericProtoOut};

mod behaviour;
mod handler;
Expand Down
Loading