Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 22 additions & 30 deletions client/network/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,28 +765,13 @@ impl ProtocolsHandler for NotifsHandler {
}

// Poll outbound substream.
match &mut self.protocols[protocol_index].state {
State::Open { out_substream: out_substream @ Some(_), .. } => {
match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
};
}

State::Closed { .. } |
State::Opening { .. } |
State::Open { out_substream: None, .. } |
State::OpenDesiredByRemote { .. } => {}
}

if let State::Open { notifications_sink_rx, out_substream: Some(out_substream), .. }
if let State::Open { notifications_sink_rx, out_substream: out_substream @ Some(_), .. }
= &mut self.protocols[protocol_index].state
{
let mut flush_needed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is local, if a flush is needed but poll_flush returns Poll::Pending, it looks like the handler may not call poll_flush() again when it is next polled? I would think that not wanting to track such state is why the current code always calls poll_flush() for each open substream in the removed block above. Please correct me if I'm mistaken.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

The problem that we have encountered is that flush wasn't called at all, we assume that this was probably due to other substreams being more busy or whatever.

As I'm thinking more about this, the current implementation isn't really robust. It could happen that a stream with a lower procotol_index could starve other streams. There is already this events_queue. We should insert every event in there and to ensure that we poll all incoming and outgoing streams. At the end of this loop we should check if there is something in the events_queue and return that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @romanb ! Thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I'm thinking more about this, the current implementation isn't really robust. It could happen that a stream with a lower procotol_index could starve other streams. There is already this events_queue. We should insert every event in there and to ensure that we poll all incoming and outgoing streams. At the end of this loop we should check if there is something in the events_queue and return that.

Problem: We lose back pressure. events_queue could grow indefinitely in that case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we loose the back pressure? The event queue was checked before we polled for new events?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wanted to avoid checking before and after and if we only check afterwards, we got that issue. If we only checked first, we would return Pending although we are actually Ready. Either way @tomaka seems skeptic that we solve anything here.

loop {
let out_substream = out_substream.as_mut()
.expect("We already matched on `Some` pattern above. qed.");
// Before we poll the notifications sink receiver, check that the substream
// is ready to accept a message.
match out_substream.poll_ready_unpin(cx) {
Expand All @@ -803,17 +788,7 @@ impl ProtocolsHandler for NotifsHandler {
match message {
NotificationsSinkMessage::Notification { message } => {
let _ = out_substream.start_send_unpin(message);

// Calling `start_send_unpin` only queues the message. Actually
// emitting the message is done with `poll_flush`. In order to
// not introduce too much complexity, this flushing is done earlier
// in the body of this `poll()` method. As such, we schedule a task
// wake-up now in order to guarantee that `poll()` will be called
// again and the flush happening.
// At the time of the writing of this comment, a rewrite of this
// code is being planned. If you find this comment in the wild and
// the rewrite didn't happen, please consider a refactor.
cx.waker().wake_by_ref();
flush_needed = true;
}
NotificationsSinkMessage::ForceClose => {
return Poll::Ready(
Expand All @@ -822,6 +797,23 @@ impl ProtocolsHandler for NotifsHandler {
}
}
}
if flush_needed {
match Sink::poll_flush(
Pin::new(
out_substream
.as_mut()
.expect("We already matched on `Some` pattern above; qed.")
),
cx
) {
Poll::Pending | Poll::Ready(Ok(())) => {},
Poll::Ready(Err(_)) => {
*out_substream = None;
let event = NotifsHandlerOut::CloseDesired { protocol_index };
return Poll::Ready(ProtocolsHandlerEvent::Custom(event));
}
}
}
}
}

Expand Down