diff --git a/client/network/src/protocol/generic_proto/handler.rs b/client/network/src/protocol/generic_proto/handler.rs index 0db249f90a8b7..5029e1c6525ee 100644 --- a/client/network/src/protocol/generic_proto/handler.rs +++ b/client/network/src/protocol/generic_proto/handler.rs @@ -712,10 +712,32 @@ impl ProtocolsHandler for NotifsHandler { ) -> Poll< ProtocolsHandlerEvent > { + + // Only fill up queue more when rather empty, otherwise impose back pressure: + // + // NOTE: Compared to an approach of just returning events from `poll_work`, instead of + // pushing to `events_queue`, we impose back pressure evenly to all substreams. + if 2 * self.events_queue.len() < self.events_queue.capacity() { + self.poll_work(cx) + } + if let Some(ev) = self.events_queue.pop_front() { - return Poll::Ready(ev); + Poll::Ready(ev) + } else { + Poll::Pending } + } +} +impl NotifsHandler { + /// Do the actual work of `poll`. + /// + /// Instead of just returning events, we push to `events_queue`. We do this, to avoid busy + /// substreams starving others. In particular we had problems of `poll_flush` not getting + /// called in a timely manner. + fn poll_work( &mut self, cx: &mut Context) + { + // Employ some back pressure when queue is getting full: for protocol_index in 0..self.protocols.len() { // Poll inbound substreams. // Inbound substreams being closed is always tolerated, except for the @@ -733,7 +755,7 @@ impl ProtocolsHandler for NotifsHandler { protocol_index, message, }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(event)) + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); }, Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None, @@ -748,9 +770,9 @@ impl ProtocolsHandler for NotifsHandler { self.protocols[protocol_index].state = State::Closed { pending_opening: *pending_opening, }; - return Poll::Ready(ProtocolsHandlerEvent::Custom( + self.events_queue.push_back(ProtocolsHandlerEvent::Custom( NotifsHandlerOut::CloseDesired { protocol_index } - )) + )); }, } } @@ -772,7 +794,7 @@ impl ProtocolsHandler for NotifsHandler { Poll::Ready(Err(_)) => { *out_substream = None; let event = NotifsHandlerOut::CloseDesired { protocol_index }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(event)); + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(event)); } }; } @@ -816,7 +838,7 @@ impl ProtocolsHandler for NotifsHandler { cx.waker().wake_by_ref(); } NotificationsSinkMessage::ForceClose => { - return Poll::Ready( + self.events_queue.push_back( ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged) ); } @@ -824,7 +846,5 @@ impl ProtocolsHandler for NotifsHandler { } } } - - Poll::Pending } }