Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,17 @@ impl GenericProto {
Some(sink) => sink
};

let message = message.into();

trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?})",
"External API => Notification({:?}, {:?}, {} bytes)",
target,
protocol_name,
message.len(),
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target);
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);

notifs_sink.send_sync_notification(
protocol_name,
message
Expand Down Expand Up @@ -1930,9 +1934,10 @@ impl NetworkBehaviour for GenericProto {
if self.is_open(&source) {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Notification({:?})",
"Handler({:?}) => Notification({:?}, {} bytes)",
source,
protocol_name,
message.len()
);
trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source);
let event = GenericProtoOut::Notification {
Expand All @@ -1945,9 +1950,10 @@ impl NetworkBehaviour for GenericProto {
} else {
trace!(
target: "sub-libp2p",
"Handler({:?}) => Post-close notification({:?})",
"Handler({:?}) => Post-close notification({:?}, {} bytes)",
source,
protocol_name,
message.len()
);
}
}
Expand Down
21 changes: 20 additions & 1 deletion client/network/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ pub struct NotifsHandler {
/// Whether we are the connection dialer or listener.
endpoint: ConnectedPoint,

/// Remote we are connected to.
peer_id: PeerId,

/// State of this handler.
state: State,

Expand Down Expand Up @@ -260,12 +263,13 @@ impl IntoProtocolsHandler for NotifsHandlerProto {
SelectUpgrade::new(in_protocols, self.legacy_protocol.clone())
}

fn into_handler(self, _: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
let num_out_proto = self.out_protocols.len();

NotifsHandler {
in_protocols: self.in_protocols,
out_protocols: self.out_protocols,
peer_id: peer_id.clone(),
endpoint: connected_point.clone(),
when_connection_open: Instant::now(),
state: State::Closed {
Expand Down Expand Up @@ -365,6 +369,8 @@ pub struct NotificationsSink {

#[derive(Debug)]
struct NotificationsSinkInner {
/// Target of the sink.
peer_id: PeerId,
/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
/// Sender to use in synchronous contexts. Uses a synchronous mutex.
Expand All @@ -390,6 +396,11 @@ enum NotificationsSinkMessage {
}

impl NotificationsSink {
/// Returns the [`PeerId`] the sink is connected to.
pub fn peer_id(&self) -> &PeerId {
&self.inner.peer_id
}

/// Sends a notification to the peer.
///
/// If too many messages are already buffered, the notification is silently discarded and the
Expand Down Expand Up @@ -447,6 +458,12 @@ pub struct Ready<'a> {
}

impl<'a> Ready<'a> {
/// Returns the name of the protocol. Matches the one passed to
/// [`NotificationsSink::reserve_notification`].
pub fn protocol_name(&self) -> &Cow<'static, str> {
&self.protocol_name
}

/// Consumes this slots reservation and actually queues the notification.
///
/// Returns an error if the substream has been closed.
Expand Down Expand Up @@ -622,6 +639,7 @@ impl ProtocolsHandler for NotifsHandler {
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
peer_id: self.peer_id.clone(),
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
}),
Expand Down Expand Up @@ -782,6 +800,7 @@ impl ProtocolsHandler for NotifsHandler {
let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
let notifications_sink = NotificationsSink {
inner: Arc::new(NotificationsSinkInner {
peer_id: self.peer_id.clone(),
async_channel: FuturesMutex::new(async_tx),
sync_channel: Mutex::new(sync_tx),
}),
Expand Down
23 changes: 22 additions & 1 deletion client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
// `peers_notifications_sinks` mutex as soon as possible.
let sink = {
let peers_notifications_sinks = self.peers_notifications_sinks.lock();
if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) {
if let Some(sink) = peers_notifications_sinks.get(&(target.clone(), protocol.clone())) {
sink.clone()
} else {
// Notification silently discarded, as documented.
Expand All @@ -648,6 +648,14 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
}

// Sending is communicated to the `NotificationsSink`.
trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
target,
protocol,
message.len()
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target);
sink.send_sync_notification(protocol, message);
}

Expand Down Expand Up @@ -1103,6 +1111,7 @@ impl NotificationSender {
Ok(r) => r,
Err(()) => return Err(NotificationSenderError::Closed),
},
peer_id: self.sink.peer_id(),
notification_size_metric: self.notification_size_metric.clone(),
})
}
Expand All @@ -1113,6 +1122,9 @@ impl NotificationSender {
pub struct NotificationSenderReady<'a> {
ready: Ready<'a>,

/// Target of the notification.
peer_id: &'a PeerId,

/// Field extracted from the [`Metrics`] struct and necessary to report the
/// notifications-related metrics.
notification_size_metric: Option<Histogram>,
Expand All @@ -1127,6 +1139,15 @@ impl<'a> NotificationSenderReady<'a> {
notification_size_metric.observe(notification.len() as f64);
}

trace!(
target: "sub-libp2p",
"External API => Notification({:?}, {:?}, {} bytes)",
self.peer_id,
self.ready.protocol_name(),
notification.len()
);
trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id);

self.ready
.send(notification)
.map_err(|()| NotificationSenderError::Closed)
Expand Down