From 4a36485fe6c59c121eafff25b9265e7c155502b2 Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Fri, 19 Mar 2021 20:54:34 +0100 Subject: [PATCH 1/3] Flush stream after we try to send data - always. --- .../src/protocol/generic_proto/handler.rs | 45 +++++++------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 0db249f90a8b7..f6c38095f24fb 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -765,28 +765,13 @@ impl ProtocolsHandler for NotifsHandler { } // Poll outbound substream. - match &mut self.protocols[protocol_index].state { - State::Open { out_substream: out_substream @ Some(_), .. } => { - match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) { - Poll::Pending | Poll::Ready(Ok(())) => {}, - Poll::Ready(Err(_)) => { - *out_substream = None; - let event = NotifsHandlerOut::CloseDesired { protocol_index }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); - } - }; - } - - State::Closed { .. } | - State::Opening { .. } | - State::Open { out_substream: None, .. } | - State::OpenDesiredByRemote { .. } => {} - } - - if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. } + if let State::Open { notifications_sink_rx, out_substream: out_substream @ Some(_), .. } = &mut self.protocols[protocol_index].state { + let mut flush_needed = false; loop { + let out_substream = out_substream.as_mut() + .expect("We already matched on `Some` pattern above. qed."); // Before we poll the notifications sink receiver, check that the substream // is ready to accept a message. match out_substream.poll_ready_unpin(cx) { @@ -803,17 +788,7 @@ impl ProtocolsHandler for NotifsHandler { match message { NotificationsSinkMessage::Notification { message } => { let _ = out_substream.start_send_unpin(message); - - // Calling `start_send_unpin` only queues the message. Actually - // emitting the message is done with `poll_flush`. In order to - // not introduce too much complexity, this flushing is done earlier - // in the body of this `poll()` method. As such, we schedule a task - // wake-up now in order to guarantee that `poll()` will be called - // again and the flush happening. - // At the time of the writing of this comment, a rewrite of this - // code is being planned. If you find this comment in the wild and - // the rewrite didn't happen, please consider a refactor. - cx.waker().wake_by_ref(); + flush_needed = true; } NotificationsSinkMessage::ForceClose => { return Poll::Ready( @@ -822,6 +797,16 @@ impl ProtocolsHandler for NotifsHandler { } } } + if flush_needed { + match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) { + Poll::Pending | Poll::Ready(Ok(())) => {}, + Poll::Ready(Err(_)) => { + *out_substream = None; + let event = NotifsHandlerOut::CloseDesired { protocol_index }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); + } + } + } } } From 4125ee663838c60f0b025446cb7c7819a8e47a22 Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Fri, 19 Mar 2021 21:19:43 +0100 Subject: [PATCH 2/3] Update client/network/src/protocol/generic_proto/handler.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/network/src/protocol/generic_proto/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index f6c38095f24fb..2cc13b18db3b2 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -798,7 +798,7 @@ impl ProtocolsHandler for NotifsHandler { } } if flush_needed { - match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) { + match Sink::poll_flush(Pin::new(out_substream.as_mut().expect("We already matched on `Some` pattern above; qed.")), cx) { Poll::Pending | Poll::Ready(Ok(())) => {}, Poll::Ready(Err(_)) => { *out_substream = None; From 3c0afdb25d3dbf1d56f6f0706b5012380c1f04b3 Mon Sep 17 00:00:00 2001 From: Robert Klotzner Date: Fri, 19 Mar 2021 21:47:08 +0100 Subject: [PATCH 3/3] Fix linewidth. --- client/network/src/protocol/generic_proto/handler.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 2cc13b18db3b2..5d87d3ad9d7e6 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -798,7 +798,14 @@ impl ProtocolsHandler for NotifsHandler { } } if flush_needed { - match Sink::poll_flush(Pin::new(out_substream.as_mut().expect("We already matched on `Some` pattern above; qed.")), cx) { + match Sink::poll_flush( + Pin::new( + out_substream + .as_mut() + .expect("We already matched on `Some` pattern above; qed.") + ), + cx + ) { Poll::Pending | Poll::Ready(Ok(())) => {}, Poll::Ready(Err(_)) => { *out_substream = None;