Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions client/network/src/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use log::{debug, error, trace, warn};
use rand::distributions::{Distribution as _, Uniform};
use smallvec::SmallVec;
use sr_primitives::ConsensusEngineId;
use std::{borrow::Cow, collections::hash_map::Entry, cmp, error, marker::PhantomData, mem, pin::Pin};
use std::time::{Duration, Instant};
use tokio_io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -386,20 +387,49 @@ impl<TSubstream> GenericProto<TSubstream> {
pub fn send_packet(
&mut self,
target: &PeerId,
proto_name: Option<Cow<'static, [u8]>>,
message: impl Into<Vec<u8>>,
) {
if !self.is_open(target) {
return;
}

trace!(target: "sub-libp2p", "External API => Packet for {:?} with protocol {:?}", target, proto_name);
trace!(target: "sub-libp2p", "External API => Packet for {:?}", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);

self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: target.clone(),
event: NotifsHandlerIn::Send {
message: message.into(),
proto_name: proto_name.map(Into::into),
},
});
}

/// Sends a notification to a peer.
///
/// Has no effect if the custom protocol is not open with the given peer.
///
/// Also note that even we have a valid open substream, it may in fact be already closed
/// without us knowing, in which case the packet will not be received.
pub fn write_notif(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_notification please

&mut self,
target: &PeerId,
proto_name: Cow<'static, [u8]>,
engine_id: ConsensusEngineId,
message: impl Into<Vec<u8>>,
) {
if !self.is_open(target) {
return;
}

trace!(target: "sub-libp2p", "External API => Notification for {:?} with protocol {:?}", target, proto_name);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);

self.events.push(NetworkBehaviourAction::SendEvent {
peer_id: target.clone(),
event: NotifsHandlerIn::SendNotif {
message: message.into(),
proto_name,
engine_id,
},
});
}
Expand Down
43 changes: 30 additions & 13 deletions client/network/src/generic_proto/handler/group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::generic_proto::{
handler::notif_out::{NotifsOutHandlerProto, NotifsOutHandler, NotifsOutHandlerIn, NotifsOutHandlerOut},
upgrade::{NotificationsIn, NotificationsOut, RegisteredProtocol, SelectUpgrade, UpgradeCollec},
};
use crate::protocol::message::generic::ConsensusMessage;
use bytes::BytesMut;
use codec::Encode as _;
use futures::prelude::*;
use libp2p::core::{ConnectedPoint, PeerId};
use libp2p::core::either::{EitherError, EitherOutput};
Expand All @@ -33,6 +35,7 @@ use libp2p::swarm::{
SubstreamProtocol,
};
use log::error;
use sr_primitives::ConsensusEngineId;
use std::{borrow::Cow, error, io};
use tokio_io::{AsyncRead, AsyncWrite};

Expand Down Expand Up @@ -119,13 +122,22 @@ pub enum NotifsHandlerIn {
/// The node should stop using custom protocols.
Disable,

/// Sends a message through a custom protocol substream.
/// Sends a message through the custom protocol substream.
Send {
/// Name of the protocol for the message, or `None` to force the legacy protocol.
/// The message to send.
message: Vec<u8>,
},

/// Sends a notifications message.
SendNotif {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendNotification

/// Name of the protocol for the message.
///
/// If `Some`, must match one of the registered protocols. For backwards-compatibility
/// reasons, if the remote doesn't support this protocol, we use the legacy substream.
proto_name: Option<Cow<'static, [u8]>>,
/// Must match one of the registered protocols. For backwards-compatibility reasons, if
/// the remote doesn't support this protocol, we use the legacy substream.
proto_name: Cow<'static, [u8]>,

/// For legacy reasons, the name to use if we send the message on the legacy substream.
engine_id: ConsensusEngineId,

/// The message to send.
message: Vec<u8>,
Expand Down Expand Up @@ -257,17 +269,22 @@ where TSubstream: AsyncRead + AsyncWrite + Send + 'static {
self.in_handlers[num].inject_event(NotifsInHandlerIn::Refuse);
}
},
NotifsHandlerIn::Send { proto_name, message } => {
if let Some(proto_name) = proto_name {
for handler in &mut self.out_handlers {
if handler.is_open() && handler.protocol_name() == &proto_name[..] {
handler.inject_event(NotifsOutHandlerIn::Send(message));
return;
}
NotifsHandlerIn::Send { message } =>
self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message }),
NotifsHandlerIn::SendNotif { message, engine_id, proto_name } => {
for handler in &mut self.out_handlers {
if handler.is_open() && handler.protocol_name() == &proto_name[..] {
handler.inject_event(NotifsOutHandlerIn::Send(message));
return;
}
}

self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message });
let message = ConsensusMessage {
engine_id,
data: message,
};

self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { message: message.encode() });
},
}
}
Expand Down
34 changes: 23 additions & 11 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ pub struct Protocol<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
handshaking_peers: HashMap<PeerId, HandshakingPeer>,
/// For each legacy gossiping engine ID, the corresponding new protocol name.
protocol_name_by_engine: HashMap<ConsensusEngineId, Cow<'static, [u8]>>,
/// For each protocol name, the legacy gossiping engine ID.
protocol_engine_by_name: HashMap<Cow<'static, [u8]>, ConsensusEngineId>,
/// Used to report reputation changes.
peerset_handle: peerset::PeersetHandle,
transaction_pool: Arc<dyn TransactionPool<H, B>>,
Expand Down Expand Up @@ -192,7 +194,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
block,
});

self.behaviour.send_packet(who, None, message.encode())
self.behaviour.send_packet(who, message.encode())
}

fn send_read_request(
Expand All @@ -208,7 +210,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
keys,
});

self.behaviour.send_packet(who, None, message.encode())
self.behaviour.send_packet(who, message.encode())
}

fn send_read_child_request(
Expand All @@ -226,7 +228,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
keys,
});

self.behaviour.send_packet(who, None, message.encode())
self.behaviour.send_packet(who, message.encode())
}

fn send_call_request(
Expand All @@ -244,7 +246,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
data,
});

self.behaviour.send_packet(who, None, message.encode())
self.behaviour.send_packet(who, message.encode())
}

fn send_changes_request(
Expand All @@ -268,7 +270,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
key,
});

self.behaviour.send_packet(who, None, message.encode())
self.behaviour.send_packet(who, message.encode())
}

fn send_body_request(
Expand All @@ -290,7 +292,7 @@ impl<'a, B: BlockT> LightDispatchNetwork<B> for LightDispatchIn<'a> {
max,
});

self.behaviour.send_packet(who, None, message.encode())
self.behaviour.send_packet(who, message.encode())
}
}

Expand Down Expand Up @@ -337,6 +339,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context<B> for ProtocolContext<'a, B,
}

fn send_consensus(&mut self, who: PeerId, messages: Vec<ConsensusMessage>) {
panic!(); // TODO: shouldn't be reached

if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 4) {
let mut batch = Vec::new();
let len = messages.len();
Expand Down Expand Up @@ -442,6 +446,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
sync,
specialization,
protocol_name_by_engine: HashMap::new(),
protocol_engine_by_name: HashMap::new(),
handshaking_peers: HashMap::new(),
transaction_pool,
finality_proof_provider,
Expand Down Expand Up @@ -626,6 +631,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
GenericMessage::Consensus(msg) => {
let outcome = if let Some(proto_name) = self.protocol_name_by_engine.get(&msg.engine_id) {
// TODO: what if not open? check if open?
panic!("notif message!");
CustomMessageOutcome::NotifMessages {
remote: who.clone(),
messages: vec![(proto_name.clone(), msg.data.clone())],
Expand All @@ -641,6 +647,7 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
.iter()
.filter_map(|msg| {
if let Some(proto_name) = self.protocol_name_by_engine.get(&msg.engine_id) {
panic!("notif message!");
// TODO: what if not open? check if open?
Some((proto_name.clone(), msg.data.clone()))
} else {
Expand Down Expand Up @@ -1055,7 +1062,12 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
proto_name: impl Into<Cow<'static, [u8]>>,
message: impl Into<Vec<u8>>
) {
self.behaviour.send_packet(&target, Some(proto_name.into()), message)
let proto_name = proto_name.into();
if let Some(engine_id) = self.protocol_engine_by_name.get(&proto_name) {
self.behaviour.write_notif(&target, proto_name, *engine_id, message);
} else {
error!(target: "sub-libp2p", "Sending a notification with a protocol that wasn't registered");
}
}

/// Registers a new notifications protocol.
Expand All @@ -1070,9 +1082,10 @@ impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Protocol<B, S, H> {
) {
let proto_name = proto_name.into();
if self.protocol_name_by_engine.insert(engine_id, proto_name.clone()).is_some() {
error!("Notifications protocol already registered: {:?}", proto_name);
error!(target: "sub-libp2p", "Notifications protocol already registered: {:?}", proto_name);
} else {
self.behaviour.register_notif_protocol(proto_name, handshake);
self.behaviour.register_notif_protocol(proto_name.clone(), handshake);
self.protocol_engine_by_name.insert(proto_name, engine_id);
}
}

Expand Down Expand Up @@ -1748,7 +1761,7 @@ fn send_message<B: BlockT>(
let mut stats = stats.entry(message.id()).or_default();
stats.bytes_out += encoded.len() as u64;
stats.count_out += 1;
behaviour.send_packet(who, None, encoded);
behaviour.send_packet(who, encoded);
}

impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviour for
Expand Down Expand Up @@ -1857,7 +1870,6 @@ Protocol<B, S, H> {
}
},
GenericProtoOut::CustomMessage { peer_id, message } =>
// TODO: NotifMessages
self.on_custom_message(peer_id, message),
GenericProtoOut::Clogged { peer_id, messages } => {
debug!(target: "sync", "{} clogging messages:", messages.len());
Expand Down