Skip to content

Commit d2cb5cd

Browse files
committed
drop gossipsub stale messages
1 parent 8fb6989 commit d2cb5cd

File tree

1 file changed

+30
-13
lines changed
  • beacon_node/lighthouse_network/src/gossipsub

1 file changed

+30
-13
lines changed

beacon_node/lighthouse_network/src/gossipsub/handler.rs

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,6 @@ impl EnabledHandler {
247247
});
248248
}
249249

250-
// We may need to inform the behviour if we have a dropped a message. This gets set if that
251-
// is the case.
252-
let mut dropped_message = None;
253-
254250
// process outbound stream
255251
loop {
256252
match std::mem::replace(
@@ -271,10 +267,11 @@ impl EnabledHandler {
271267
} => {
272268
if Pin::new(timeout).poll(cx).is_ready() {
273269
// Inform the behaviour and end the poll.
274-
dropped_message = Some(HandlerEvent::MessageDropped(message));
275270
self.outbound_substream =
276271
Some(OutboundSubstreamState::WaitingOutput(substream));
277-
break;
272+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
273+
HandlerEvent::MessageDropped(message),
274+
));
278275
}
279276
}
280277
_ => {} // All other messages are not time-bound.
@@ -348,13 +345,7 @@ impl EnabledHandler {
348345
}
349346
}
350347

351-
// If there was a timeout in sending a message, inform the behaviour before restarting the
352-
// poll
353-
if let Some(handler_event) = dropped_message {
354-
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(handler_event));
355-
}
356-
357-
// Handle inbound messages
348+
// Handle inbound messages.
358349
loop {
359350
match std::mem::replace(
360351
&mut self.inbound_substream,
@@ -419,6 +410,32 @@ impl EnabledHandler {
419410
}
420411
}
421412

413+
// Drop the next message in queue if it's stale.
414+
let mut peakable = self.send_queue.clone().peekable();
415+
if let Poll::Ready(Some(mut message)) = peakable.poll_next_unpin(cx) {
416+
match message {
417+
RpcOut::Publish {
418+
message: _,
419+
ref mut timeout,
420+
}
421+
| RpcOut::Forward {
422+
message: _,
423+
ref mut timeout,
424+
} => {
425+
if Pin::new(timeout).poll(cx).is_ready() {
426+
// Drop the message.
427+
let dropped = futures::ready!(self.send_queue.poll_next_unpin(cx))
428+
.expect("There should be a message");
429+
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
430+
HandlerEvent::MessageDropped(dropped),
431+
));
432+
}
433+
}
434+
// the next message in queue is not time bound.
435+
_ => {}
436+
}
437+
}
438+
422439
Poll::Pending
423440
}
424441
}

0 commit comments

Comments
 (0)