-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Add a DirectedGossip struct #6803
Changes from 11 commits
6384b85
b5b0bca
02dff05
67e531d
bf5fbb0
c11bcfb
da88d2b
126c3a7
ffbe0bb
3a79f62
a4071a1
9ebc8f8
b1b1d62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,245 @@ | ||||||
| // This file is part of Substrate. | ||||||
|
|
||||||
| // Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. | ||||||
| // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 | ||||||
|
|
||||||
| // This program is free software: you can redistribute it and/or modify | ||||||
| // it under the terms of the GNU General Public License as published by | ||||||
| // the Free Software Foundation, either version 3 of the License, or | ||||||
| // (at your option) any later version. | ||||||
|
|
||||||
| // This program is distributed in the hope that it will be useful, | ||||||
| // but WITHOUT ANY WARRANTY; without even the implied warranty of | ||||||
| // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||||||
| // GNU General Public License for more details. | ||||||
|
|
||||||
| // You should have received a copy of the GNU General Public License | ||||||
| // along with this program. If not, see <https://www.gnu.org/licenses/>. | ||||||
|
|
||||||
| //! Helper for sending rate-limited gossip messages. | ||||||
tomaka marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| //! | ||||||
| //! # Context | ||||||
| //! | ||||||
| //! The [`NetworkService`] struct provides a way to send notifications to a certain peer through | ||||||
| //! the [`NetworkService::notification_sender`] method. This method is quite low level and isn't | ||||||
| //! expected to be used directly. | ||||||
| //! | ||||||
| //! The [`QueueSender`] struct provided by this module is built on top of | ||||||
| //! [`NetworkService::notification_sender`] and provides a cleaner way to send notifications. | ||||||
| //! | ||||||
| //! # Behaviour | ||||||
| //! | ||||||
| //! An instance of [`QueueSender`] is specific to a certain combination of `PeerId` and | ||||||
| //! protocol name. It maintains a buffer of messages waiting to be sent out. The user of this API | ||||||
| //! is able to manipulate that queue, adding or removing obsolete messages. | ||||||
| //! | ||||||
| //! Creating a [`QueueSender`] also returns a opaque `Future` whose responsibility it to | ||||||
| //! drain that queue and actually send the messages. If the substream with the given combination | ||||||
| //! of peer and protocol is closed, the queue is silently discarded. It is the role of the user | ||||||
| //! to track which peers we are connected to. | ||||||
| //! | ||||||
| //! In normal situations, messages sent through a [`QueueSender`] will arrive in the same | ||||||
| //! order as they have been sent. | ||||||
| //! It is possible, in the situation of disconnects and reconnects, that messages arrive in a | ||||||
| //! different order. See also https://github.com/paritytech/substrate/issues/6756. | ||||||
| //! However, if multiple instances of [`QueueSender`] exist for the same peer and protocol, or | ||||||
| //! if some other code uses the [`NetworkService`] to send notifications to this combination or | ||||||
| //! peer and protocol, then the notifications will be interleaved in an unpredictable way. | ||||||
| //! | ||||||
| use crate::{ExHashT, NetworkService}; | ||||||
|
|
||||||
| use async_std::sync::{Condvar, Mutex, MutexGuard}; | ||||||
| use futures::prelude::*; | ||||||
| use libp2p::PeerId; | ||||||
| use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; | ||||||
| use std::{ | ||||||
| collections::VecDeque, | ||||||
| fmt, | ||||||
| sync::{atomic, Arc}, | ||||||
| time::Duration, | ||||||
| }; | ||||||
|
|
||||||
| #[cfg(test)] | ||||||
| mod tests; | ||||||
|
|
||||||
| /// Notifications sender for a specific combination of network service, peer, and protocol. | ||||||
| pub struct QueueSender<M> { | ||||||
|
||||||
| pub struct QueueSender<M> { | |
| pub struct QueuedSender<M> { |
Not sure whether this is a typo or intentional. In case it is the former I would prefer QueuedSender.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why offer two ways to queue_or_discard? Is writing self.lock_queue().await.push_or_discard(message); as a user not fine as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This API could return a Result<(), M> or similar as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a convenient shortcut.
mxinden marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. It would be good to get back the message if the queue is full
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what would you do with the message that is returned? Put it in another queue?
The only sane things you can do when the queue is full is either discard the message or force-close the connection.
Additionally, what if the connection with the remote is closed? Are we supposed to return back the message as well? If so, then it's very problematic because we can't detect this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only sane things you can do when the queue is full is either discard the message or force-close the connection.
Isn't retain another option? We could decide to do that based on the type of the message we are trying to send.
And is there no sane thing you can do? Why can't you wait for space to appear in the queue or something like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I see retain is that you'd call it all the time, even if there is space in the buffer. As far as I can tell, when a message is obsolete, there is no point in leaving it in the queue anyway.
Why can't you wait for space to appear in the queue or something like that?
The entire reason for this API to exist is to remove the need for any waiting. See also this paragraph.
Ultimately there has to be a code somewhere that holds some sort of HashMap<PeerId, DirectedGossip>. If it needs to send a message to one of the peers and its buffer is full, then it shouldn't wait for this peer and instead continue its processing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have a way to wait, although I agree that the API for that should be used sparingly so you don't degrade to the performance of the slowest peer. There are cases where we don't want to drop messages, for instance when responding to a validator's request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are cases where we don't want to drop messages
I believe that everything that would fall in this category should be covered by request-response protocols.
Rather than adding a wait, I could restore the push_unbounded method that I've removed after a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that this uses the
unstablefeature to havefutures-timer? If so, should we not do that across the entire crate for all usage offutures-timer?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is for
Convar. It is unfortunate that we have to depend on theunstablefeature, but I couldn't find any crate other thanasync-stdthat provides an asynchronousCondvar.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is something we want to avoid, one can use a channel as a condvar.
Problem is that this can't be used in the
Dropimplementations and thus cleanup would depend on the 10 sec timeout.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not always need to. When the
Senderis dropped, the receiver will notice and the task can terminate. Instead oflet _ = rx.next().await;one would writeif rx.next().await.is_some() { ... }. In the Drop impl of QueueGuard one can useSender::try_send.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a bit weird to use a channel, which involves an additional
Arc,MutexandVec, just to wake up a task, rather than aWaker.But I have now also tried using a
Waker, and the implementation is considerably more tricky and difficult to read because of potential race conditions and having to introduce manual polling within anasyncfunction and having to implement your ownWaker.Before going on, I'd like to understand what is wrong with the
Condvarsolution, as aCondvaris exactly the tool that is designed for this specific job.