diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 0db249f90a8b7..5d87d3ad9d7e6 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -765,28 +765,13 @@ impl ProtocolsHandler for NotifsHandler { } // Poll outbound substream. - match &mut self.protocols[protocol_index].state { - State::Open { out_substream: out_substream @ Some(_), .. } => { - match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) { - Poll::Pending | Poll::Ready(Ok(())) => {}, - Poll::Ready(Err(_)) => { - *out_substream = None; - let event = NotifsHandlerOut::CloseDesired { protocol_index }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); - } - }; - } - - State::Closed { .. } | - State::Opening { .. } | - State::Open { out_substream: None, .. } | - State::OpenDesiredByRemote { .. } => {} - } - - if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. } + if let State::Open { notifications_sink_rx, out_substream: out_substream @ Some(_), .. } = &mut self.protocols[protocol_index].state { + let mut flush_needed = false; loop { + let out_substream = out_substream.as_mut() + .expect("We already matched on `Some` pattern above. qed."); // Before we poll the notifications sink receiver, check that the substream // is ready to accept a message. match out_substream.poll_ready_unpin(cx) { @@ -803,17 +788,7 @@ impl ProtocolsHandler for NotifsHandler { match message { NotificationsSinkMessage::Notification { message } => { let _ = out_substream.start_send_unpin(message); - - // Calling `start_send_unpin` only queues the message. Actually - // emitting the message is done with `poll_flush`. In order to - // not introduce too much complexity, this flushing is done earlier - // in the body of this `poll()` method. As such, we schedule a task - // wake-up now in order to guarantee that `poll()` will be called - // again and the flush happening. - // At the time of the writing of this comment, a rewrite of this - // code is being planned. If you find this comment in the wild and - // the rewrite didn't happen, please consider a refactor. - cx.waker().wake_by_ref(); + flush_needed = true; } NotificationsSinkMessage::ForceClose => { return Poll::Ready( @@ -822,6 +797,23 @@ impl ProtocolsHandler for NotifsHandler { } } } + if flush_needed { + match Sink::poll_flush( + Pin::new( + out_substream + .as_mut() + .expect("We already matched on `Some` pattern above; qed.") + ), + cx + ) { + Poll::Pending | Poll::Ready(Ok(())) => {}, + Poll::Ready(Err(_)) => { + *out_substream = None; + let event = NotifsHandlerOut::CloseDesired { protocol_index }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); + } + } + } } }