Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 24 additions & 71 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ use polkadot_subsystem::messages::{
};
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash, BlockNumber};
use polkadot_node_network_protocol::{
ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
ObservedRole, ReputationChange, PeerId, peer_set::PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1, OurView,
};

/// Peer set infos for network initialization.
///
/// To be added to [`NetworkConfiguration::extra_sets`].
pub use polkadot_node_network_protocol::peer_set::peer_sets_info;

use std::collections::{HashMap, hash_map};
use std::iter::ExactSizeIterator;
use std::pin::Pin;
Expand All @@ -54,10 +59,6 @@ mod validator_discovery;
/// We use the same limit to compute the view sent to peers locally.
const MAX_VIEW_HEADS: usize = 5;

/// The protocol name for the validation peer-set.
pub const VALIDATION_PROTOCOL_NAME: &'static str = "/polkadot/validation/1";
/// The protocol name for the collation peer-set.
pub const COLLATION_PROTOCOL_NAME: &'static str = "/polkadot/collation/1";

const MALFORMED_MESSAGE_COST: ReputationChange
= ReputationChange::new(-500, "Malformed Network-bridge message");
Expand All @@ -69,7 +70,7 @@ const MALFORMED_VIEW_COST: ReputationChange
// network bridge log target
const LOG_TARGET: &'static str = "network_bridge";

/// Messages received on the network.
/// Messages from and to the network.
#[derive(Debug, Encode, Decode, Clone)]
pub enum WireMessage<M> {
/// A message from a peer on a specific protocol.
Expand All @@ -80,31 +81,6 @@ pub enum WireMessage<M> {
ViewUpdate(View),
}

/// Information about the extra peers set. Should be used during network configuration
/// to register the protocol with the network service.
pub fn peers_sets_info() -> Vec<sc_network::config::NonDefaultSetConfig> {
vec![
sc_network::config::NonDefaultSetConfig {
notifications_protocol: VALIDATION_PROTOCOL_NAME.into(),
set_config: sc_network::config::SetConfig {
in_peers: 25,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
},
},
sc_network::config::NonDefaultSetConfig {
notifications_protocol: COLLATION_PROTOCOL_NAME.into(),
set_config: sc_network::config::SetConfig {
in_peers: 25,
out_peers: 0,
reserved_nodes: Vec::new(),
non_reserved_mode: sc_network::config::NonReservedPeerMode::Accept,
},
}
]
}

/// An action to be carried out by the network.
#[derive(Debug, PartialEq)]
pub enum NetworkAction {
Expand Down Expand Up @@ -176,20 +152,8 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
cost_benefit,
)
}
NetworkAction::WriteNotification(peer, peer_set, message) => {
match peer_set {
PeerSet::Validation => self.0.write_notification(
peer,
VALIDATION_PROTOCOL_NAME.into(),
message,
),
PeerSet::Collation => self.0.write_notification(
peer,
COLLATION_PROTOCOL_NAME.into(),
message,
),
}
}
NetworkAction::WriteNotification(peer, peer_set, message) =>
self.0.write_notification(peer, peer_set.into_protocol_name(), message)
}

Ok(())
Expand Down Expand Up @@ -258,6 +222,11 @@ struct PeerData {
view: View,
}

/// Internal type combining all actions a `NetworkBridge` might perform.
///
/// Both messages coming from the network (`NetworkEvent`) and messages coming from other
/// subsystems (`FromOverseer`) will be converted to `Action` in `run_network` before being
/// processed.
#[derive(Debug)]
enum Action {
SendValidationMessage(Vec<PeerId>, protocol_v1::ValidationProtocol),
Expand Down Expand Up @@ -321,26 +290,16 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
Some(NetworkEvent::SyncDisconnected { .. }) => Action::Nop,
Some(NetworkEvent::NotificationStreamOpened { remote, protocol, role }) => {
let role = role.into();
match protocol {
x if x == VALIDATION_PROTOCOL_NAME
=> Action::PeerConnected(PeerSet::Validation, remote, role),
x if x == COLLATION_PROTOCOL_NAME
=> Action::PeerConnected(PeerSet::Collation, remote, role),
_ => Action::Nop,
}
PeerSet::try_from_protocol_name(&protocol)
.map_or(Action::Nop, |peer_set| Action::PeerConnected(peer_set, remote, role))
}
Some(NetworkEvent::NotificationStreamClosed { remote, protocol }) => {
match protocol {
x if x == VALIDATION_PROTOCOL_NAME
=> Action::PeerDisconnected(PeerSet::Validation, remote),
x if x == COLLATION_PROTOCOL_NAME
=> Action::PeerDisconnected(PeerSet::Collation, remote),
_ => Action::Nop,
}
PeerSet::try_from_protocol_name(&protocol)
.map_or(Action::Nop, |peer_set| Action::PeerDisconnected(peer_set, remote))
}
Some(NetworkEvent::NotificationsReceived { remote, messages }) => {
let v_messages: Result<Vec<_>, _> = messages.iter()
.filter(|(protocol, _)| protocol == &VALIDATION_PROTOCOL_NAME)
.filter(|(protocol, _)| protocol == &PeerSet::Validation.into_protocol_name())
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

Expand All @@ -350,7 +309,7 @@ fn action_from_network_message(event: Option<NetworkEvent>) -> Action {
};

let c_messages: Result<Vec<_>, _> = messages.iter()
.filter(|(protocol, _)| protocol == &COLLATION_PROTOCOL_NAME)
.filter(|(protocol, _)| protocol == &PeerSet::Collation.into_protocol_name())
.map(|(_, msg_bytes)| WireMessage::decode(&mut msg_bytes.as_ref()))
.collect();

Expand Down Expand Up @@ -584,6 +543,7 @@ async fn dispatch_collation_events_to_all<I>(
ctx.send_messages(events.into_iter().flat_map(messages_for)).await
}

/// Main driver, processing network events and messages from other subsystems.
#[tracing::instrument(skip(network_service, authority_discovery_service, ctx), fields(subsystem = LOG_TARGET))]
async fn run_network<N, AD>(
mut network_service: N,
Expand Down Expand Up @@ -824,13 +784,6 @@ mod tests {
)
}

fn peer_set_protocol(peer_set: PeerSet) -> std::borrow::Cow<'static, str> {
match peer_set {
PeerSet::Validation => VALIDATION_PROTOCOL_NAME.into(),
PeerSet::Collation => COLLATION_PROTOCOL_NAME.into(),
}
}

impl Network for TestNetwork {
fn event_stream(&mut self) -> BoxStream<'static, NetworkEvent> {
self.net_events.lock()
Expand Down Expand Up @@ -887,22 +840,22 @@ mod tests {
async fn connect_peer(&mut self, peer: PeerId, peer_set: PeerSet, role: ObservedRole) {
self.send_network_event(NetworkEvent::NotificationStreamOpened {
remote: peer,
protocol: peer_set_protocol(peer_set),
protocol: peer_set.into_protocol_name(),
role: role.into(),
}).await;
}

async fn disconnect_peer(&mut self, peer: PeerId, peer_set: PeerSet) {
self.send_network_event(NetworkEvent::NotificationStreamClosed {
remote: peer,
protocol: peer_set_protocol(peer_set),
protocol: peer_set.into_protocol_name(),
}).await;
}

async fn peer_message(&mut self, peer: PeerId, peer_set: PeerSet, message: Vec<u8>) {
self.send_network_event(NetworkEvent::NotificationsReceived {
remote: peer,
messages: vec![(peer_set_protocol(peer_set), message.into())],
messages: vec![(peer_set.into_protocol_name(), message.into())],
}).await;
}

Expand Down
9 changes: 5 additions & 4 deletions node/network/bridge/src/validator_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use sc_network::multiaddr::{Multiaddr, Protocol};
use sc_authority_discovery::Service as AuthorityDiscoveryService;
use polkadot_node_network_protocol::PeerId;
use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash};
use polkadot_node_network_protocol::peer_set::PeerSet;

const LOG_TARGET: &str = "validator_discovery";

Expand Down Expand Up @@ -276,24 +277,24 @@ impl<N: Network, AD: AuthorityDiscovery> Service<N, AD> {
// ask the network to connect to these nodes and not disconnect
// from them until removed from the set
if let Err(e) = network_service.add_peers_to_reserved_set(
super::COLLATION_PROTOCOL_NAME.into(),
PeerSet::Collation.into_protocol_name(),
multiaddr_to_add.clone(),
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
if let Err(e) = network_service.add_peers_to_reserved_set(
super::VALIDATION_PROTOCOL_NAME.into(),
PeerSet::Validation.into_protocol_name(),
multiaddr_to_add,
).await {
tracing::warn!(target: LOG_TARGET, err = ?e, "AuthorityDiscoveryService returned an invalid multiaddress");
}
// the addresses are known to be valid
let _ = network_service.remove_peers_from_reserved_set(
super::COLLATION_PROTOCOL_NAME.into(),
PeerSet::Collation.into_protocol_name(),
multiaddr_to_remove.clone()
).await;
let _ = network_service.remove_peers_from_reserved_set(
super::VALIDATION_PROTOCOL_NAME.into(),
PeerSet::Validation.into_protocol_name(),
multiaddr_to_remove
).await;

Expand Down
1 change: 1 addition & 0 deletions node/network/protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "1.3.5", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
strum = { version = "0.20", features = ["derive"] }
13 changes: 4 additions & 9 deletions node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub use polkadot_node_jaeger::JaegerSpan;
#[doc(hidden)]
pub use std::sync::Arc;


/// Peer-sets and protocols used for parachains.
pub mod peer_set;

/// A unique identifier of a request.
pub type RequestId = u64;

Expand All @@ -47,15 +51,6 @@ impl fmt::Display for WrongVariant {

impl std::error::Error for WrongVariant {}

/// The peer-sets that the network manages. Different subsystems will use different peer-sets.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum PeerSet {
/// The validation peer-set is responsible for all messages related to candidate validation and communication among validators.
Validation,
/// The collation peer-set is used for validator<>collator communication.
Collation,
}

/// The advertised role of a node.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ObservedRole {
Expand Down
Loading