diff --git a/bin/node/src/service.rs b/bin/node/src/service.rs index 044cfa2021..056149b205 100644 --- a/bin/node/src/service.rs +++ b/bin/node/src/service.rs @@ -9,11 +9,11 @@ use aleph_primitives::AlephSessionApi; use aleph_runtime::{self, opaque::Block, RuntimeApi, MAX_BLOCK_SIZE}; use finality_aleph::{ run_nonvalidator_node, run_validator_node, AlephBlockImport, AlephConfig, - JustificationNotification, Metrics, MillisecsPerBlock, Protocol, SessionPeriod, + JustificationNotification, Metrics, MillisecsPerBlock, Protocol, ProtocolNaming, SessionPeriod, }; use futures::channel::mpsc; use log::warn; -use sc_client_api::{Backend, HeaderBackend}; +use sc_client_api::{Backend, BlockBackend, HeaderBackend}; use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams}; use sc_consensus_slots::BackoffAuthoringBlocksStrategy; use sc_network::NetworkService; @@ -212,14 +212,35 @@ fn setup( ( RpcHandlers, Arc::Hash>>, + ProtocolNaming, NetworkStarter, ), ServiceError, > { + let genesis_hash = client + .block_hash(0) + .ok() + .flatten() + .expect("we should have a hash"); + let chain_prefix = match config.chain_spec.fork_id() { + Some(fork_id) => format!("/{}/{}", genesis_hash, fork_id), + None => format!("/{}", genesis_hash), + }; + let protocol_naming = ProtocolNaming::new(chain_prefix); + config + .network + .extra_sets + .push(finality_aleph::peers_set_config( + protocol_naming.clone(), + Protocol::Authentication, + )); config .network .extra_sets - .push(finality_aleph::peers_set_config(Protocol::Authentication)); + .push(finality_aleph::peers_set_config( + protocol_naming.clone(), + Protocol::BlockSync, + )); let (network, system_rpc_tx, tx_handler_controller, network_starter) = sc_service::build_network(sc_service::BuildNetworkParams { @@ -262,7 +283,7 @@ fn setup( telemetry: telemetry.as_mut(), })?; - Ok((rpc_handlers, network, network_starter)) + Ok((rpc_handlers, network, protocol_naming, network_starter)) } /// Builds a new service for a full client. @@ -308,7 +329,7 @@ pub fn new_authority( let backoff_authoring_blocks = Some(LimitNonfinalized(aleph_config.max_nonfinalized_blocks())); let prometheus_registry = config.prometheus_registry().cloned(); - let (_rpc_handlers, network, network_starter) = setup( + let (_rpc_handlers, network, protocol_naming, network_starter) = setup( config, backend.clone(), &keystore_container, @@ -383,6 +404,7 @@ pub fn new_authority( backup_saving_path: backup_path, external_addresses: aleph_config.external_addresses(), validator_port: aleph_config.validator_port(), + protocol_naming, }; task_manager.spawn_essential_handle().spawn_blocking( "aleph", @@ -418,7 +440,7 @@ pub fn new_full( .path(), ); - let (_rpc_handlers, network, network_starter) = setup( + let (_rpc_handlers, network, protocol_naming, network_starter) = setup( config, backend.clone(), &keystore_container, @@ -460,6 +482,7 @@ pub fn new_full( backup_saving_path: backup_path, external_addresses: aleph_config.external_addresses(), validator_port: aleph_config.validator_port(), + protocol_naming, }; task_manager.spawn_essential_handle().spawn_blocking( diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 62c630282f..9de90b43de 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -22,7 +22,7 @@ use tokio::time::Duration; use crate::{ abft::{CurrentNetworkData, LegacyNetworkData, CURRENT_VERSION, LEGACY_VERSION}, aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData}, - network::{data::split::Split, protocol_name}, + network::data::split::Split, session::{ first_block_of_session, last_block_of_session, session_id_from_block_num, SessionBoundaries, SessionId, @@ -51,7 +51,7 @@ pub use abft::{Keychain, NodeCount, NodeIndex, Recipient, SignatureSet, SpawnHan pub use aleph_primitives::{AuthorityId, AuthorityPair, AuthoritySignature}; pub use import::AlephBlockImport; pub use justification::{AlephJustification, JustificationNotification}; -pub use network::Protocol; +pub use network::{Protocol, ProtocolNaming}; pub use nodes::{run_nonvalidator_node, run_validator_node}; pub use session::SessionPeriod; @@ -67,11 +67,12 @@ enum Error { } /// Returns a NonDefaultSetConfig for the specified protocol. -pub fn peers_set_config(protocol: Protocol) -> sc_network_common::config::NonDefaultSetConfig { - let name = protocol_name(&protocol); - +pub fn peers_set_config( + naming: ProtocolNaming, + protocol: Protocol, +) -> sc_network_common::config::NonDefaultSetConfig { let mut config = sc_network_common::config::NonDefaultSetConfig::new( - name, + naming.protocol_name(&protocol), // max_notification_size should be larger than the maximum possible honest message size (in bytes). // Max size of alert is UNIT_SIZE * MAX_UNITS_IN_ALERT ~ 100 * 5000 = 50000 bytes // Max size of parents response UNIT_SIZE * N_MEMBERS ~ 100 * N_MEMBERS @@ -79,9 +80,8 @@ pub fn peers_set_config(protocol: Protocol) -> sc_network_common::config::NonDef 1024 * 1024, ); - config.set_config = match protocol { - Protocol::Authentication => sc_network_common::config::SetConfig::default(), - }; + config.set_config = sc_network_common::config::SetConfig::default(); + config.add_fallback_names(naming.fallback_protocol_names(&protocol)); config } @@ -254,6 +254,7 @@ pub struct AlephConfig { pub backup_saving_path: Option, pub external_addresses: Vec, pub validator_port: u16, + pub protocol_naming: ProtocolNaming, } pub trait BlockchainBackend { diff --git a/finality-aleph/src/network/gossip/mod.rs b/finality-aleph/src/network/gossip/mod.rs index 4b8143d9ea..9edd411206 100644 --- a/finality-aleph/src/network/gossip/mod.rs +++ b/finality-aleph/src/network/gossip/mod.rs @@ -46,10 +46,13 @@ pub trait Network: Send + 'static { async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error>; } -/// The Authentication protocol is used for validator discovery. +/// Protocols used by the network. #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] pub enum Protocol { + /// The authentication protocol is used for validator discovery. Authentication, + /// The block synchronization protocol. + BlockSync, } /// Abstraction over a sender to the raw network. diff --git a/finality-aleph/src/network/gossip/service.rs b/finality-aleph/src/network/gossip/service.rs index 876349d2b7..3b73ea7c74 100644 --- a/finality-aleph/src/network/gossip/service.rs +++ b/finality-aleph/src/network/gossip/service.rs @@ -35,13 +35,17 @@ enum Command { pub struct Service { network: N, messages_from_user: mpsc::UnboundedReceiver>, - messages_for_user: mpsc::UnboundedSender<(D, N::PeerId)>, + messages_for_authentication_user: mpsc::UnboundedSender<(D, N::PeerId)>, + messages_for_block_sync_user: mpsc::UnboundedSender<(D, N::PeerId)>, authentication_connected_peers: HashSet, authentication_peer_senders: HashMap>, + block_sync_connected_peers: HashSet, + block_sync_peer_senders: HashMap>, spawn_handle: SpawnTaskHandle, } struct ServiceInterface { + protocol: Protocol, messages_from_service: mpsc::UnboundedReceiver<(D, P)>, messages_for_service: mpsc::UnboundedSender>, } @@ -70,7 +74,7 @@ impl Network for Serv fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error> { self.messages_for_service - .unbounded_send(Command::Send(data, peer_id, Protocol::Authentication)) + .unbounded_send(Command::Send(data, peer_id, self.protocol)) .map_err(|_| Error::ServiceStopped) } @@ -80,17 +84,13 @@ impl Network for Serv peer_ids: HashSet, ) -> Result<(), Self::Error> { self.messages_for_service - .unbounded_send(Command::SendToRandom( - data, - peer_ids, - Protocol::Authentication, - )) + .unbounded_send(Command::SendToRandom(data, peer_ids, self.protocol)) .map_err(|_| Error::ServiceStopped) } fn broadcast(&mut self, data: D) -> Result<(), Self::Error> { self.messages_for_service - .unbounded_send(Command::Broadcast(data, Protocol::Authentication)) + .unbounded_send(Command::Broadcast(data, self.protocol)) .map_err(|_| Error::ServiceStopped) } @@ -115,20 +115,32 @@ impl Service { ) -> ( Service, impl Network, + impl Network, ) { - let (messages_for_user, messages_from_service) = mpsc::unbounded(); + let (messages_for_authentication_user, messages_from_authentication_service) = + mpsc::unbounded(); + let (messages_for_block_sync_user, messages_from_block_sync_service) = mpsc::unbounded(); let (messages_for_service, messages_from_user) = mpsc::unbounded(); ( Service { network, messages_from_user, - messages_for_user, + messages_for_authentication_user, + messages_for_block_sync_user, spawn_handle, authentication_connected_peers: HashSet::new(), authentication_peer_senders: HashMap::new(), + block_sync_connected_peers: HashSet::new(), + block_sync_peer_senders: HashMap::new(), + }, + ServiceInterface { + protocol: Protocol::Authentication, + messages_from_service: messages_from_authentication_service, + messages_for_service: messages_for_service.clone(), }, ServiceInterface { - messages_from_service, + protocol: Protocol::BlockSync, + messages_from_service: messages_from_block_sync_service, messages_for_service, }, ) @@ -141,6 +153,7 @@ impl Service { ) -> Option<&mut TracingUnboundedSender> { match protocol { Protocol::Authentication => self.authentication_peer_senders.get_mut(peer), + Protocol::BlockSync => self.block_sync_peer_senders.get_mut(peer), } } @@ -211,6 +224,7 @@ impl Service { fn protocol_peers(&self, protocol: Protocol) -> &HashSet { match protocol { Protocol::Authentication => &self.authentication_connected_peers, + Protocol::BlockSync => &self.block_sync_connected_peers, } } @@ -262,6 +276,12 @@ impl Service { self.authentication_peer_senders.insert(peer.clone(), tx); rx } + Protocol::BlockSync => { + let (tx, rx) = tracing_unbounded("mpsc_notification_stream_block_sync"); + self.block_sync_connected_peers.insert(peer.clone()); + self.block_sync_peer_senders.insert(peer.clone(), tx); + rx + } }; self.spawn_handle.spawn( "aleph/network/peer_sender", @@ -276,6 +296,10 @@ impl Service { self.authentication_connected_peers.remove(&peer); self.authentication_peer_senders.remove(&peer); } + Protocol::BlockSync => { + self.block_sync_connected_peers.remove(&peer); + self.block_sync_peer_senders.remove(&peer); + } } } Messages(peer_id, messages) => { @@ -283,12 +307,22 @@ impl Service { match protocol { Protocol::Authentication => match D::decode(&mut &data[..]) { Ok(data) => self - .messages_for_user + .messages_for_authentication_user .unbounded_send((data, peer_id.clone()))?, Err(e) => { warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) } }, + // This is a bit of a placeholder for now, as we are not yet using this + // protocol. In the future we will not be using the same D as above. + Protocol::BlockSync => match D::decode(&mut &data[..]) { + Ok(data) => self + .messages_for_block_sync_user + .unbounded_send((data, peer_id.clone()))?, + Err(e) => { + warn!(target: "aleph-network", "Error decoding block sync protocol message: {}", e) + } + }, }; } } @@ -303,6 +337,10 @@ impl Service { "authentication connected peers - {:?}; ", self.authentication_connected_peers.len() )); + status.push_str(&format!( + "block sync connected peers - {:?}; ", + self.block_sync_connected_peers.len() + )); info!(target: "aleph-network", "{}", status); } @@ -379,7 +417,7 @@ mod tests { // Prepare service let network = MockRawNetwork::new(event_stream_oneshot_tx); - let (service, gossip_network) = + let (service, gossip_network, _) = Service::new(network.clone(), task_manager.spawn_handle()); let gossip_network = Box::new(gossip_network); diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 35945465ed..7ae7b269e6 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -19,7 +19,7 @@ pub mod tcp; #[cfg(test)] pub use gossip::mock::{MockEvent, MockRawNetwork}; pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService}; -pub use substrate::protocol_name; +pub use substrate::{ProtocolNaming, SubstrateNetwork}; /// Represents the id of an arbitrary node. pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send { diff --git a/finality-aleph/src/network/substrate.rs b/finality-aleph/src/network/substrate.rs index 123423b9a8..c0823e36f9 100644 --- a/finality-aleph/src/network/substrate.rs +++ b/finality-aleph/src/network/substrate.rs @@ -1,12 +1,12 @@ -use std::{fmt, iter, pin::Pin, sync::Arc}; +use std::{collections::HashMap, fmt, iter, pin::Pin, sync::Arc}; use async_trait::async_trait; use futures::stream::{Stream, StreamExt}; use log::{error, trace}; use sc_consensus::JustificationSyncLink; use sc_network::{ - multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, NetworkService, - NetworkSyncForkRequest, PeerId, + multiaddr::Protocol as MultiaddressProtocol, Event as SubstrateEvent, Multiaddr, + NetworkService, NetworkSyncForkRequest, PeerId, }; use sc_network_common::{ protocol::ProtocolName, @@ -45,24 +45,70 @@ impl RequestBlocks for Arc> { } } -/// Name of the network protocol used by Aleph Zero. This is how messages -/// are subscribed to ensure that we are gossiping and communicating with our -/// own network. -const AUTHENTICATION_PROTOCOL_NAME: &str = "/aleph/1"; +/// Name of the network protocol used by Aleph Zero to disseminate validator +/// authentications. +const AUTHENTICATION_PROTOCOL_NAME: &str = "/auth/0"; -/// Returns the canonical name of the protocol. -pub fn protocol_name(protocol: &Protocol) -> ProtocolName { - use Protocol::*; - match protocol { - Authentication => AUTHENTICATION_PROTOCOL_NAME.into(), - } +/// Legacy name of the network protocol used by Aleph Zero to disseminate validator +/// authentications. Might be removed after some updates. +const LEGACY_AUTHENTICATION_PROTOCOL_NAME: &str = "/aleph/1"; + +/// Name of the network protocol used by Aleph Zero to synchronize the block state. +const BLOCK_SYNC_PROTOCOL_NAME: &str = "/sync/0"; + +/// Convert protocols to their names and vice versa. +#[derive(Clone)] +pub struct ProtocolNaming { + authentication_name: ProtocolName, + authentication_fallback_names: Vec, + block_sync_name: ProtocolName, + protocols_by_name: HashMap, } -/// Attempts to convert the protocol name to a protocol. -fn to_protocol(protocol_name: &str) -> Result { - match protocol_name { - AUTHENTICATION_PROTOCOL_NAME => Ok(Protocol::Authentication), - _ => Err(()), +impl ProtocolNaming { + /// Create a new protocol naming scheme with the given chain prefix. + pub fn new(chain_prefix: String) -> Self { + let authentication_name: ProtocolName = + format!("{}{}", chain_prefix, AUTHENTICATION_PROTOCOL_NAME).into(); + let mut protocols_by_name = HashMap::new(); + protocols_by_name.insert(authentication_name.clone(), Protocol::Authentication); + let authentication_fallback_names: Vec = + vec![LEGACY_AUTHENTICATION_PROTOCOL_NAME.into()]; + for protocol_name in &authentication_fallback_names { + protocols_by_name.insert(protocol_name.clone(), Protocol::Authentication); + } + let block_sync_name: ProtocolName = + format!("{}{}", chain_prefix, BLOCK_SYNC_PROTOCOL_NAME).into(); + protocols_by_name.insert(block_sync_name.clone(), Protocol::BlockSync); + ProtocolNaming { + authentication_name, + authentication_fallback_names, + block_sync_name, + protocols_by_name, + } + } + + /// Returns the canonical name of the protocol. + pub fn protocol_name(&self, protocol: &Protocol) -> ProtocolName { + use Protocol::*; + match protocol { + Authentication => self.authentication_name.clone(), + BlockSync => self.block_sync_name.clone(), + } + } + + /// Returns the fallback names of the protocol. + pub fn fallback_protocol_names(&self, protocol: &Protocol) -> Vec { + use Protocol::*; + match protocol { + Authentication => self.authentication_fallback_names.clone(), + _ => Vec::new(), + } + } + + /// Attempts to convert the protocol name to a protocol. + fn to_protocol(&self, protocol_name: &str) -> Option { + self.protocols_by_name.get(protocol_name).copied() } } @@ -127,6 +173,7 @@ impl NetworkSender for SubstrateNetworkSender { pub struct NetworkEventStream { stream: Pin + Send>>, + naming: ProtocolNaming, network: Arc>, } @@ -139,36 +186,46 @@ impl EventStream for NetworkEventStream { match self.stream.next().await { Some(event) => match event { SyncConnected { remote } => { - let multiaddress = + let multiaddress: Multiaddr = iter::once(MultiaddressProtocol::P2p(remote.into())).collect(); trace!(target: "aleph-network", "Connected event from address {:?}", multiaddress); if let Err(e) = self.network.add_peers_to_reserved_set( - protocol_name(&Protocol::Authentication), + self.naming.protocol_name(&Protocol::Authentication), + iter::once(multiaddress.clone()).collect(), + ) { + error!(target: "aleph-network", "add_reserved failed for authentications: {}", e); + } + if let Err(e) = self.network.add_peers_to_reserved_set( + self.naming.protocol_name(&Protocol::BlockSync), iter::once(multiaddress).collect(), ) { - error!(target: "aleph-network", "add_reserved failed: {}", e); + error!(target: "aleph-network", "add_reserved failed for block sync: {}", e); } continue; } SyncDisconnected { remote } => { trace!(target: "aleph-network", "Disconnected event for peer {:?}", remote); - let addresses = iter::once(remote).collect(); + let addresses: Vec<_> = iter::once(remote).collect(); + self.network.remove_peers_from_reserved_set( + self.naming.protocol_name(&Protocol::Authentication), + addresses.clone(), + ); self.network.remove_peers_from_reserved_set( - protocol_name(&Protocol::Authentication), + self.naming.protocol_name(&Protocol::BlockSync), addresses, ); continue; } NotificationStreamOpened { remote, protocol, .. - } => match to_protocol(protocol.as_ref()) { - Ok(protocol) => return Some(StreamOpened(remote, protocol)), - Err(_) => continue, + } => match self.naming.to_protocol(protocol.as_ref()) { + Some(protocol) => return Some(StreamOpened(remote, protocol)), + None => continue, }, NotificationStreamClosed { remote, protocol } => { - match to_protocol(protocol.as_ref()) { - Ok(protocol) => return Some(StreamClosed(remote, protocol)), - Err(_) => continue, + match self.naming.to_protocol(protocol.as_ref()) { + Some(protocol) => return Some(StreamClosed(remote, protocol)), + None => continue, } } NotificationsReceived { messages, remote } => { @@ -177,12 +234,9 @@ impl EventStream for NetworkEventStream { messages .into_iter() .filter_map(|(protocol, data)| { - match to_protocol(protocol.as_ref()) { - Ok(protocol) => Some((protocol, data)), - // This might end with us returning an empty vec, but it's probably not - // worth it to handle this situation here. - Err(_) => None, - } + self.naming + .to_protocol(protocol.as_ref()) + .map(|protocol| (protocol, data)) }) .collect(), )); @@ -195,7 +249,21 @@ impl EventStream for NetworkEventStream { } } -impl RawNetwork for Arc> { +/// A wrapper around the substrate network that includes information about protocol names. +#[derive(Clone)] +pub struct SubstrateNetwork { + network: Arc>, + naming: ProtocolNaming, +} + +impl SubstrateNetwork { + /// Create a new substrate network wrapper. + pub fn new(network: Arc>, naming: ProtocolNaming) -> Self { + SubstrateNetwork { network, naming } + } +} + +impl RawNetwork for SubstrateNetwork { type SenderError = SenderError; type NetworkSender = SubstrateNetworkSender; type PeerId = PeerId; @@ -203,8 +271,9 @@ impl RawNetwork for Arc> { fn event_stream(&self) -> Self::EventStream { NetworkEventStream { - stream: Box::pin(self.as_ref().event_stream("aleph-network")), - network: self.clone(), + stream: Box::pin(self.network.as_ref().event_stream("aleph-network")), + naming: self.naming.clone(), + network: self.network.clone(), } } @@ -217,7 +286,8 @@ impl RawNetwork for Arc> { // Currently method `notification_sender` does not distinguish whether we are not connected to the peer // or there is no such protocol so we need to have this worthless `SenderError::CannotCreateSender` error here notification_sender: self - .notification_sender(peer_id, protocol_name(&protocol)) + .network + .notification_sender(peer_id, self.naming.protocol_name(&protocol)) .map_err(|_| SenderError::CannotCreateSender(peer_id, protocol))?, peer_id, }) diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index 5301fa25a6..d59f0c3bc7 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -15,7 +15,7 @@ use crate::{ clique::Service, session::{ConnectionManager, ConnectionManagerConfig}, tcp::{new_tcp_network, KEY_TYPE}, - GossipService, + GossipService, SubstrateNetwork, }, nodes::{setup_justification_handler, JustificationParams}, party::{ @@ -62,6 +62,7 @@ where backup_saving_path, external_addresses, validator_port, + protocol_naming, .. } = aleph_config; @@ -92,8 +93,10 @@ where validator_network_service.run(exit).await }); - let (gossip_network_service, gossip_network) = - GossipService::new(network.clone(), spawn_handle.clone()); + let (gossip_network_service, authentication_network, _block_sync_network) = GossipService::new( + SubstrateNetwork::new(network.clone(), protocol_naming), + spawn_handle.clone(), + ); let gossip_network_task = async move { gossip_network_service.run().await }; let block_requester = network.clone(); @@ -122,7 +125,7 @@ where let (connection_manager_service, connection_manager) = ConnectionManager::new( network_identity, validator_network, - gossip_network, + authentication_network, ConnectionManagerConfig::with_session_period(&session_period, &millisecs_per_block), ); diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 5d6733f1b5..78cbbb009e 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -95,7 +95,7 @@ async fn prepare_one_session_test_data() -> TestData { let network = MockRawNetwork::new(event_stream_tx); let validator_network = MockCliqueNetwork::new(); - let (gossip_service, gossip_network) = + let (gossip_service, gossip_network, _) = GossipService::new(network.clone(), task_manager.spawn_handle()); let (connection_manager_service, session_manager) = ConnectionManager::new(