diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index f84aead47283a..f76b3cc716026 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -650,13 +650,17 @@ impl GenericProto { Some(sink) => sink }; + let message = message.into(); + trace!( target: "sub-libp2p", - "External API => Notification({:?}, {:?})", + "External API => Notification({:?}, {:?}, {} bytes)", target, protocol_name, + message.len(), ); - trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); + trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target); + notifs_sink.send_sync_notification( protocol_name, message @@ -1930,9 +1934,10 @@ impl NetworkBehaviour for GenericProto { if self.is_open(&source) { trace!( target: "sub-libp2p", - "Handler({:?}) => Notification({:?})", + "Handler({:?}) => Notification({:?}, {} bytes)", source, protocol_name, + message.len() ); trace!(target: "sub-libp2p", "External API <= Message({:?}, {:?})", protocol_name, source); let event = GenericProtoOut::Notification { @@ -1945,9 +1950,10 @@ impl NetworkBehaviour for GenericProto { } else { trace!( target: "sub-libp2p", - "Handler({:?}) => Post-close notification({:?})", + "Handler({:?}) => Post-close notification({:?}, {} bytes)", source, protocol_name, + message.len() ); } } diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 42cf02f1b77d7..13d44cd1a09af 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -138,6 +138,9 @@ pub struct NotifsHandler { /// Whether we are the connection dialer or listener. endpoint: ConnectedPoint, + /// Remote we are connected to. + peer_id: PeerId, + /// State of this handler. state: State, @@ -260,12 +263,13 @@ impl IntoProtocolsHandler for NotifsHandlerProto { SelectUpgrade::new(in_protocols, self.legacy_protocol.clone()) } - fn into_handler(self, _: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { + fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler { let num_out_proto = self.out_protocols.len(); NotifsHandler { in_protocols: self.in_protocols, out_protocols: self.out_protocols, + peer_id: peer_id.clone(), endpoint: connected_point.clone(), when_connection_open: Instant::now(), state: State::Closed { @@ -365,6 +369,8 @@ pub struct NotificationsSink { #[derive(Debug)] struct NotificationsSinkInner { + /// Target of the sink. + peer_id: PeerId, /// Sender to use in asynchronous contexts. Uses an asynchronous mutex. async_channel: FuturesMutex>, /// Sender to use in synchronous contexts. Uses a synchronous mutex. @@ -390,6 +396,11 @@ enum NotificationsSinkMessage { } impl NotificationsSink { + /// Returns the [`PeerId`] the sink is connected to. + pub fn peer_id(&self) -> &PeerId { + &self.inner.peer_id + } + /// Sends a notification to the peer. /// /// If too many messages are already buffered, the notification is silently discarded and the @@ -447,6 +458,12 @@ pub struct Ready<'a> { } impl<'a> Ready<'a> { + /// Returns the name of the protocol. Matches the one passed to + /// [`NotificationsSink::reserve_notification`]. + pub fn protocol_name(&self) -> &Cow<'static, str> { + &self.protocol_name + } + /// Consumes this slots reservation and actually queues the notification. /// /// Returns an error if the substream has been closed. @@ -622,6 +639,7 @@ impl ProtocolsHandler for NotifsHandler { let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { inner: Arc::new(NotificationsSinkInner { + peer_id: self.peer_id.clone(), async_channel: FuturesMutex::new(async_tx), sync_channel: Mutex::new(sync_tx), }), @@ -782,6 +800,7 @@ impl ProtocolsHandler for NotifsHandler { let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE); let notifications_sink = NotificationsSink { inner: Arc::new(NotificationsSinkInner { + peer_id: self.peer_id.clone(), async_channel: FuturesMutex::new(async_tx), sync_channel: Mutex::new(sync_tx), }), diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 0a87c37703d89..e5196f97d724d 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -628,7 +628,7 @@ impl NetworkService { // `peers_notifications_sinks` mutex as soon as possible. let sink = { let peers_notifications_sinks = self.peers_notifications_sinks.lock(); - if let Some(sink) = peers_notifications_sinks.get(&(target, protocol.clone())) { + if let Some(sink) = peers_notifications_sinks.get(&(target.clone(), protocol.clone())) { sink.clone() } else { // Notification silently discarded, as documented. @@ -648,6 +648,14 @@ impl NetworkService { } // Sending is communicated to the `NotificationsSink`. + trace!( + target: "sub-libp2p", + "External API => Notification({:?}, {:?}, {} bytes)", + target, + protocol, + message.len() + ); + trace!(target: "sub-libp2p", "Handler({:?}) <= Sync notification", target); sink.send_sync_notification(protocol, message); } @@ -1103,6 +1111,7 @@ impl NotificationSender { Ok(r) => r, Err(()) => return Err(NotificationSenderError::Closed), }, + peer_id: self.sink.peer_id(), notification_size_metric: self.notification_size_metric.clone(), }) } @@ -1113,6 +1122,9 @@ impl NotificationSender { pub struct NotificationSenderReady<'a> { ready: Ready<'a>, + /// Target of the notification. + peer_id: &'a PeerId, + /// Field extracted from the [`Metrics`] struct and necessary to report the /// notifications-related metrics. notification_size_metric: Option, @@ -1127,6 +1139,15 @@ impl<'a> NotificationSenderReady<'a> { notification_size_metric.observe(notification.len() as f64); } + trace!( + target: "sub-libp2p", + "External API => Notification({:?}, {:?}, {} bytes)", + self.peer_id, + self.ready.protocol_name(), + notification.len() + ); + trace!(target: "sub-libp2p", "Handler({:?}) <= Async notification", self.peer_id); + self.ready .send(notification) .map_err(|()| NotificationSenderError::Closed)