From 5ad08ffae90316e1c7a0db6386a92dfc7ed4f220 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 26 Mar 2020 11:55:46 +0100 Subject: [PATCH 1/3] Fix tried to send handshake twice --- .../protocol/generic_proto/handler/group.rs | 6 ++++ .../generic_proto/handler/notif_in.rs | 28 +++++++++++++------ .../generic_proto/upgrade/notifications.rs | 15 +++++----- 3 files changed, 32 insertions(+), 17 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index d6d9919d3e14d..20684be1d1e77 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -302,6 +302,9 @@ impl ProtocolsHandler for NotifsHandler { fn inject_event(&mut self, message: NotifsHandlerIn) { match message { NotifsHandlerIn::Enable => { + if let EnabledState::Enabled = self.enabled { + error!("enabling already-enabled handler"); + } self.enabled = EnabledState::Enabled; self.legacy.inject_event(LegacyProtoHandlerIn::Enable); for (handler, _) in &mut self.out_handlers { @@ -314,6 +317,9 @@ impl ProtocolsHandler for NotifsHandler { } }, NotifsHandlerIn::Disable => { + if let EnabledState::Disabled = self.enabled { + error!("disabling already-disabled handler"); + } self.legacy.inject_event(LegacyProtoHandlerIn::Disable); // The notifications protocols start in the disabled state. If we were in the // "Initial" state, then we shouldn't disable the notifications protocols again. diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index 4e16fb1af419f..7558d1d361fd7 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -36,7 +36,7 @@ use libp2p::swarm::{ }; use log::{error, warn}; use smallvec::SmallVec; -use std::{borrow::Cow, fmt, pin::Pin, str, task::{Context, Poll}}; +use std::{borrow::Cow, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -156,16 +156,19 @@ impl ProtocolsHandler for NotifsInHandler { &mut self, (msg, proto): >::Output ) { + // If a substream already exists, we drop it and replace it with the new incoming one. if self.substream.is_some() { - warn!( - target: "sub-libp2p", - "Received duplicate inbound notifications substream for {:?}", - str::from_utf8(self.in_protocol.protocol_name()), - ); - return; + self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); } + // Note that we drop the existing substream, which will send an equivalent to a TCP "RST" + // to the remote and force-close the substream. It might seem like an unclean way to get + // rid of a substream. However, keep in mind that it is invalid for the remote to open + // multiple such substreams, and therefore sending a "RST" is the correct thing to do. + // Also note that we have already closed our writing side during the initial handshake, + // and we can't close "more" than that anyway. self.substream = Some(proto); + self.events_queue.push(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(msg))); self.pending_accept_refuses = self.pending_accept_refuses .checked_add(1) @@ -235,8 +238,15 @@ impl ProtocolsHandler for NotifsInHandler { match self.substream.as_mut().map(|s| Stream::poll_next(Pin::new(s), cx)) { None | Some(Poll::Pending) => {}, - Some(Poll::Ready(Some(Ok(msg)))) => - return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(msg))), + Some(Poll::Ready(Some(Ok(msg)))) => { + if self.pending_accept_refuses != 0 { + warn!( + target: "sub-libp2p", + "Bad state in inbound-only handler: notif before accepting substream" + ); + } + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(msg))) + }, Some(Poll::Ready(None)) | Some(Poll::Ready(Some(Err(_)))) => { self.substream = None; return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index 68898a08fe1af..b4ab2c3e0ee35 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -164,12 +164,9 @@ where TSubstream: AsyncRead + AsyncWrite, { /// Sends the handshake in order to inform the remote that we accept the substream. pub fn send_handshake(&mut self, message: impl Into>) { - match self.handshake { - NotificationsInSubstreamHandshake::NotSent => {} - _ => { - error!(target: "sub-libp2p", "Tried to send handshake twice"); - return; - } + if matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) { + error!(target: "sub-libp2p", "Tried to send handshake twice"); + return; } self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into()); @@ -189,8 +186,10 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin, match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { NotificationsInSubstreamHandshake::Sent => return Stream::poll_next(this.socket.as_mut(), cx), - NotificationsInSubstreamHandshake::NotSent => - return Poll::Pending, + NotificationsInSubstreamHandshake::NotSent => { + *this.handshake = NotificationsInSubstreamHandshake::NotSent; + return Poll::Pending + }, NotificationsInSubstreamHandshake::PendingSend(msg) => match Sink::poll_ready(this.socket.as_mut(), cx) { Poll::Ready(_) => { From 44ebdad0478cdf0682e2263f025aa4e76e6b102e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 26 Mar 2020 15:27:32 +0100 Subject: [PATCH 2/3] Fix wrong boolean --- .../network/src/protocol/generic_proto/upgrade/notifications.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index b4ab2c3e0ee35..b6ae1425f1161 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -164,7 +164,7 @@ where TSubstream: AsyncRead + AsyncWrite, { /// Sends the handshake in order to inform the remote that we accept the substream. pub fn send_handshake(&mut self, message: impl Into>) { - if matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) { + if !matches!(self.handshake, NotificationsInSubstreamHandshake::NotSent) { error!(target: "sub-libp2p", "Tried to send handshake twice"); return; } From 387a143e57b14b4c525fac623bf1f526da3c7c09 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 27 Mar 2020 12:30:36 +0100 Subject: [PATCH 3/3] Change to debug --- client/network/src/protocol/generic_proto/handler/group.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 20684be1d1e77..0804096b80ca4 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -65,7 +65,7 @@ use libp2p::swarm::{ SubstreamProtocol, NegotiatedSubstream, }; -use log::error; +use log::{debug, error}; use sp_runtime::ConsensusEngineId; use std::{borrow::Cow, error, io, task::{Context, Poll}}; @@ -303,7 +303,7 @@ impl ProtocolsHandler for NotifsHandler { match message { NotifsHandlerIn::Enable => { if let EnabledState::Enabled = self.enabled { - error!("enabling already-enabled handler"); + debug!("enabling already-enabled handler"); } self.enabled = EnabledState::Enabled; self.legacy.inject_event(LegacyProtoHandlerIn::Enable); @@ -318,7 +318,7 @@ impl ProtocolsHandler for NotifsHandler { }, NotifsHandlerIn::Disable => { if let EnabledState::Disabled = self.enabled { - error!("disabling already-disabled handler"); + debug!("disabling already-disabled handler"); } self.legacy.inject_event(LegacyProtoHandlerIn::Disable); // The notifications protocols start in the disabled state. If we were in the