Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ pub struct GrandpaParams<Block: BlockT, C, N, SC, VR> {
pub fn grandpa_peers_set_config() -> sc_network::config::NonDefaultSetConfig {
sc_network::config::NonDefaultSetConfig {
notifications_protocol: communication::GRANDPA_PROTOCOL_NAME.into(),
// Notifications reach ~256kiB in size at the time of writing on Kusama and Polkadot.
max_notification_size: 1024 * 1024,
set_config: sc_network::config::SetConfig {
in_peers: 25,
out_peers: 25,
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ pub struct NonDefaultSetConfig {
/// > **Note**: This field isn't present for the default set, as this is handled internally
/// > by the networking code.
pub notifications_protocol: Cow<'static, str>,
/// Maximum allowed size of single notifications.
pub max_notification_size: u64,
/// Base configuration.
pub set_config: SetConfig,
}
Expand Down
2 changes: 2 additions & 0 deletions client/network/src/gossip/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: Default::default()
}
],
Expand All @@ -157,6 +158,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
Expand Down
13 changes: 8 additions & 5 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,16 +475,19 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
best_hash,
genesis_hash,
).encode();

GenericProto::new(
protocol_id.clone(),
versions,
build_status_message::<B>(&config, best_number, best_hash, genesis_hash),
peerset,
iter::once((block_announces_protocol, block_announces_handshake))
.chain(iter::once((transactions_protocol, vec![])))
.chain(network_config.extra_sets.iter()
.map(|s| (s.notifications_protocol.clone(), handshake_message.clone()))
),
iter::once((block_announces_protocol, block_announces_handshake, 1024 * 1024))
.chain(iter::once((transactions_protocol, vec![], 1024 * 1024)))
.chain(network_config.extra_sets.iter().map(|s| (
s.notifications_protocol.clone(),
handshake_message.clone(),
s.max_notification_size
))),
)
};

Expand Down
6 changes: 3 additions & 3 deletions client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub struct GenericProto {
/// Notification protocols. Entries are only ever added and not removed.
/// Contains, for each protocol, the protocol name and the message to send as part of the
/// initial handshake.
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>,
notif_protocols: Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>,

/// Receiver for instructions about who to connect to or disconnect from.
peerset: sc_peerset::Peerset,
Expand Down Expand Up @@ -374,10 +374,10 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>, u64)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.map(|(n, hs, sz)| (n, Arc::new(RwLock::new(hs)), sz))
.collect::<Vec<_>>();

assert!(!notif_protocols.is_empty());
Expand Down
31 changes: 19 additions & 12 deletions client/network/src/protocol/generic_proto/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
pub struct NotifsHandlerProto {
/// Name of protocols, prototypes for upgrades for inbound substreams, and the message we
/// send or respond with in the handshake.
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>)>,
protocols: Vec<(Cow<'static, str>, NotificationsIn, Arc<RwLock<Vec<u8>>>, u64)>,

/// Configuration for the legacy protocol upgrade.
legacy_protocol: RegisteredProtocol,
Expand Down Expand Up @@ -161,6 +161,9 @@ struct Protocol {
/// Handshake to send when opening a substream or receiving an open request.
handshake: Arc<RwLock<Vec<u8>>>,

/// Maximum allowed size of individual notifications.
max_notification_size: u64,

/// Current state of the substreams for this protocol.
state: State,
}
Expand Down Expand Up @@ -226,22 +229,23 @@ impl IntoProtocolsHandler for NotifsHandlerProto {

fn inbound_protocol(&self) -> SelectUpgrade<UpgradeCollec<NotificationsIn>, RegisteredProtocol> {
let protocols = self.protocols.iter()
.map(|(_, p, _)| p.clone())
.map(|(_, p, _, _)| p.clone())
.collect::<UpgradeCollec<_>>();

SelectUpgrade::new(protocols, self.legacy_protocol.clone())
}

fn into_handler(self, peer_id: &PeerId, connected_point: &ConnectedPoint) -> Self::Handler {
NotifsHandler {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake)| {
protocols: self.protocols.into_iter().map(|(name, in_upgrade, handshake, max_size)| {
Protocol {
name,
in_upgrade,
handshake,
state: State::Closed {
pending_opening: false,
}
},
max_notification_size: max_size,
}
}).collect(),
peer_id: peer_id.clone(),
Expand Down Expand Up @@ -467,18 +471,19 @@ pub enum NotifsHandlerError {
impl NotifsHandlerProto {
/// Builds a new handler.
///
/// `list` is a list of notification protocols names, and the message to send as part of the
/// handshake. At the moment, the message is always the same whether we open a substream
/// ourselves or respond to handshake from the remote.
/// `list` is a list of notification protocols names, the message to send as part of the
/// handshake, and the maximum allowed size of a notification. At the moment, the message
/// is always the same whether we open a substream ourselves or respond to handshake from
/// the remote.
pub fn new(
legacy_protocol: RegisteredProtocol,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>)>>,
list: impl Into<Vec<(Cow<'static, str>, Arc<RwLock<Vec<u8>>>, u64)>>,
) -> Self {
let protocols = list
.into()
.into_iter()
.map(|(proto_name, msg)| {
(proto_name.clone(), NotificationsIn::new(proto_name), msg)
.map(|(proto_name, msg, max_notif_size)| {
(proto_name.clone(), NotificationsIn::new(proto_name, max_notif_size), msg, max_notif_size)
})
.collect();

Expand Down Expand Up @@ -624,7 +629,8 @@ impl ProtocolsHandler for NotifsHandler {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
protocol_info.handshake.read().clone()
protocol_info.handshake.read().clone(),
protocol_info.max_notification_size
);

self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
Expand All @@ -643,7 +649,8 @@ impl ProtocolsHandler for NotifsHandler {
if !*pending_opening {
let proto = NotificationsOut::new(
protocol_info.name.clone(),
handshake_message.clone()
handshake_message.clone(),
protocol_info.max_notification_size,
);

self.events_queue.push_back(ProtocolsHandlerEvent::OutboundSubstreamRequest {
Expand Down
26 changes: 21 additions & 5 deletions client/network/src/protocol/generic_proto/upgrade/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use futures::prelude::*;
use asynchronous_codec::Framed;
use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade};
use log::error;
use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}};
use std::{borrow::Cow, convert::{Infallible, TryFrom as _}, io, iter, mem, pin::Pin, task::{Context, Poll}};
use unsigned_varint::codec::UviBytes;

/// Maximum allowed size of the two handshake messages, in bytes.
Expand All @@ -53,6 +53,8 @@ const MAX_HANDSHAKE_SIZE: usize = 1024;
pub struct NotificationsIn {
/// Protocol name to use when negotiating the substream.
protocol_name: Cow<'static, str>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}

/// Upgrade that opens a substream, waits for the remote to accept by sending back a status
Expand All @@ -63,6 +65,8 @@ pub struct NotificationsOut {
protocol_name: Cow<'static, str>,
/// Message to send when we start the handshake.
initial_message: Vec<u8>,
/// Maximum allowed size for a single notification.
max_notification_size: u64,
}

/// A substream for incoming notification messages.
Expand Down Expand Up @@ -102,9 +106,10 @@ pub struct NotificationsOutSubstream<TSubstream> {

impl NotificationsIn {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>) -> Self {
pub fn new(protocol_name: impl Into<Cow<'static, str>>, max_notification_size: u64) -> Self {
NotificationsIn {
protocol_name: protocol_name.into(),
max_notification_size,
}
}
}
Expand Down Expand Up @@ -148,8 +153,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
socket.read_exact(&mut initial_message).await?;
}

let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need that try_from? Why not simply make max_notification_size an usize to begin with?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of something that is transmitted on the network should never be a usize, as we want this size limit to be the same for all participants.
However a buffer length, however, is correctly a usize.


let substream = NotificationsInSubstream {
socket: Framed::new(socket, UviBytes::default()),
socket: Framed::new(socket, codec),
handshake: NotificationsInSubstreamHandshake::NotSent,
};

Expand Down Expand Up @@ -287,7 +295,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin,

impl NotificationsOut {
/// Builds a new potential upgrade.
pub fn new(protocol_name: impl Into<Cow<'static, str>>, initial_message: impl Into<Vec<u8>>) -> Self {
pub fn new(
protocol_name: impl Into<Cow<'static, str>>,
initial_message: impl Into<Vec<u8>>,
max_notification_size: u64,
) -> Self {
let initial_message = initial_message.into();
if initial_message.len() > MAX_HANDSHAKE_SIZE {
error!(target: "sub-libp2p", "Outbound networking handshake is above allowed protocol limit");
Expand All @@ -296,6 +308,7 @@ impl NotificationsOut {
NotificationsOut {
protocol_name: protocol_name.into(),
initial_message,
max_notification_size,
}
}
}
Expand Down Expand Up @@ -342,8 +355,11 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static,
socket.read_exact(&mut handshake).await?;
}

let mut codec = UviBytes::default();
codec.set_max_len(usize::try_from(self.max_notification_size).unwrap_or(usize::max_value()));

Ok((handshake, NotificationsOutSubstream {
socket: Framed::new(socket, UviBytes::default()),
socket: Framed::new(socket, codec),
}))
})
}
Expand Down
56 changes: 42 additions & 14 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound
use std::{
borrow::Cow,
collections::{HashMap, HashSet},
convert::TryFrom as _,
fs,
iter,
marker::PhantomData,
num:: NonZeroUsize,
pin::Pin,
Expand Down Expand Up @@ -283,6 +285,46 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
config
};

let (transport, bandwidth) = {
let (config_mem, config_wasm) = match params.network_config.transport {
TransportConfig::MemoryOnly => (true, None),
TransportConfig::Normal { wasm_external_transport, .. } =>
(false, wasm_external_transport)
};

// The yamux buffer size limit is configured to be equal to the maximum frame size
// of all protocols. 10 bytes are added to each limit for the length prefix that
// is not included in the upper layer protocols limit but is still present in the
// yamux buffer.
let yamux_maximum_buffer_size = {
let requests_max = params.network_config
.request_response_protocols.iter()
.map(|cfg| usize::try_from(cfg.max_request_size).unwrap_or(usize::max_value()));
let responses_max = params.network_config
.request_response_protocols.iter()
.map(|cfg| usize::try_from(cfg.max_response_size).unwrap_or(usize::max_value()));
let notifs_max = params.network_config
.extra_sets.iter()
.map(|cfg| usize::try_from(cfg.max_notification_size).unwrap_or(usize::max_value()));

// A "default" max is added to cover all the other protocols: ping, identify,
// kademlia.
let default_max = 1024 * 1024;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am seeing this number (1024 * 1024) a lot, maybe we can define it somewhere as default value? Either just a constant or maybe even make a NotificationSize newtype which has a Default instance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has a different meaning every time, and each 1024 * 1024 should ideally be tweaked individually. Here it's the maximum message size for identify/ping/kademlia. In the grandpa crate, it's the maximum message size of grandpa.

iter::once(default_max)
.chain(requests_max).chain(responses_max).chain(notifs_max)
.max().expect("iterator known to always yield at least one element; qed")
.saturating_add(10)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly, that the addition here accounts for the additional bytes needed for the length delimiter added via UviBytes? If so, why 10?

@tomaka tomaka Jan 18, 2021

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed.
10 is the maximum number of bytes required to encode a variable-length u64: https://docs.rs/unsigned-varint/0.6.0/unsigned_varint/encode/fn.u64_buffer.html
I'm taking the assumption that we'll never send a message larger than u64::max_size().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Would you mind documenting that?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe even checked_add? Not that I think we will ever exhaust an u64 though :-)

};

transport::build_transport(
local_identity,
config_mem,
config_wasm,
params.network_config.yamux_window_size,
yamux_maximum_buffer_size
)
};

let behaviour = {
let result = Behaviour::new(
protocol,
Expand All @@ -305,20 +347,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
}
};

let (transport, bandwidth) = {
let (config_mem, config_wasm) = match params.network_config.transport {
TransportConfig::MemoryOnly => (true, None),
TransportConfig::Normal { wasm_external_transport, .. } =>
(false, wasm_external_transport)
};

transport::build_transport(
local_identity,
config_mem,
config_wasm,
params.network_config.yamux_window_size
)
};
let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone())
.connection_limits(ConnectionLimits::default()
.with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32))
Expand Down
4 changes: 4 additions & 0 deletions client/network/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: Default::default()
}
],
Expand All @@ -156,6 +157,7 @@ fn build_nodes_one_proto()
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr,
Expand Down Expand Up @@ -311,6 +313,7 @@ fn lots_of_incoming_peers_works() {
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
in_peers: u32::max_value(),
.. Default::default()
Expand All @@ -335,6 +338,7 @@ fn lots_of_incoming_peers_works() {
extra_sets: vec![
config::NonDefaultSetConfig {
notifications_protocol: PROTOCOL_NAME,
max_notification_size: 1024 * 1024,
set_config: config::SetConfig {
reserved_nodes: vec![config::MultiaddrWithPeerId {
multiaddr: listen_addr.clone(),
Expand Down
Loading