diff --git a/Cargo.lock b/Cargo.lock index 7247b949ad31..c88b4dc89f36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4799,6 +4799,7 @@ name = "polkadot-network-bridge" version = "0.1.0" dependencies = [ "assert_matches", + "async-trait", "futures 0.3.5", "futures-timer 3.0.2", "log 0.4.11", @@ -4808,6 +4809,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-primitives", + "sc-authority-discovery", "sc-network", "sp-core", "sp-keyring", @@ -5168,6 +5170,7 @@ dependencies = [ "sp-api", "sp-application-crypto", "sp-arithmetic", + "sp-authority-discovery", "sp-core", "sp-inherents", "sp-runtime", @@ -5334,6 +5337,7 @@ dependencies = [ "hex-literal 0.2.1", "libsecp256k1", "log 0.3.9", + "pallet-authority-discovery", "pallet-authorship", "pallet-babe", "pallet-balances", diff --git a/node/core/runtime-api/src/lib.rs b/node/core/runtime-api/src/lib.rs index 59a6684ab9fe..d73f4910b28a 100644 --- a/node/core/runtime-api/src/lib.rs +++ b/node/core/runtime-api/src/lib.rs @@ -120,6 +120,7 @@ fn make_runtime_api_request( Request::CandidatePendingAvailability(para, sender) => query!(candidate_pending_availability(para), sender), Request::CandidateEvents(sender) => query!(candidate_events(), sender), + Request::ValidatorDiscovery(ids, sender) => query!(validator_discovery(ids), sender), } } @@ -169,7 +170,7 @@ mod tests { use polkadot_primitives::v1::{ ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, PersistedValidationData, Id as ParaId, OccupiedCoreAssumption, ValidationData, SessionIndex, ValidationCode, - CommittedCandidateReceipt, CandidateEvent, + CommittedCandidateReceipt, CandidateEvent, AuthorityDiscoveryId, }; use polkadot_node_subsystem_test_helpers as test_helpers; use sp_core::testing::TaskExecutor; @@ -258,6 +259,10 @@ mod tests { fn candidate_events(&self) -> Vec { self.candidate_events.clone() } + + fn validator_discovery(ids: Vec) -> Vec> { + vec![None; ids.len()] + } } } diff --git a/node/network/bridge/Cargo.toml b/node/network/bridge/Cargo.toml index 555f158b782e..fbf93fd37e55 100644 --- a/node/network/bridge/Cargo.toml +++ b/node/network/bridge/Cargo.toml @@ -5,12 +5,14 @@ authors = ["Parity Technologies "] edition = "2018" [dependencies] +async-trait = "0.1" futures = "0.3.5" log = "0.4.8" futures-timer = "3.0.2" streamunordered = "0.5.1" polkadot-primitives = { path = "../../../primitives" } parity-scale-codec = "1.3.4" +sc-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } diff --git a/node/network/bridge/src/lib.rs b/node/network/bridge/src/lib.rs index 0ed6e11ce3b7..d15e71c6e671 100644 --- a/node/network/bridge/src/lib.rs +++ b/node/network/bridge/src/lib.rs @@ -20,7 +20,7 @@ use parity_scale_codec::{Encode, Decode}; use futures::prelude::*; use futures::future::BoxFuture; use futures::stream::BoxStream; -use futures::channel::oneshot; +use futures::channel::{mpsc, oneshot}; use sc_network::Event as NetworkEvent; use sp_runtime::ConsensusEngineId; @@ -34,16 +34,19 @@ use polkadot_subsystem::messages::{ BitfieldDistributionMessage, PoVDistributionMessage, StatementDistributionMessage, CollatorProtocolMessage, }; -use polkadot_primitives::v1::{Block, Hash, ValidatorId}; +use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; use polkadot_node_network_protocol::{ ObservedRole, ReputationChange, PeerId, PeerSet, View, NetworkBridgeEvent, v1 as protocol_v1 }; -use std::collections::hash_map::{HashMap, Entry as HEntry}; +use std::collections::{HashMap, hash_map}; use std::iter::ExactSizeIterator; use std::pin::Pin; use std::sync::Arc; + +mod validator_discovery; + /// The maximum amount of heads a peer is allowed to have in their view at any time. /// /// We use the same limit to compute the view sent to peers locally. @@ -188,29 +191,41 @@ impl Network for Arc> { } /// The network bridge subsystem. -pub struct NetworkBridge(N); +pub struct NetworkBridge { + network_service: N, + authority_discovery_service: AD, +} -impl NetworkBridge { - /// Create a new network bridge subsystem with underlying network service. +impl NetworkBridge { + /// Create a new network bridge subsystem with underlying network service and authority discovery service. /// /// This assumes that the network service has had the notifications protocol for the network /// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info). - pub fn new(net_service: N) -> Self { - NetworkBridge(net_service) + pub fn new(network_service: N, authority_discovery_service: AD) -> Self { + NetworkBridge { + network_service, + authority_discovery_service, + } } } -impl Subsystem for NetworkBridge +impl Subsystem for NetworkBridge where - Net: Network, + Net: Network + validator_discovery::Network, + AD: validator_discovery::AuthorityDiscovery, Context: SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { // Swallow error because failure is fatal to the node and we log with more precision // within `run_network`. + let Self { network_service, authority_discovery_service } = self; SpawnedSubsystem { name: "network-bridge-subsystem", - future: run_network(self.0, ctx).map(|_| ()).boxed(), + future: run_network( + network_service, + authority_discovery_service, + ctx, + ).map(|_| ()).boxed(), } } } @@ -224,7 +239,11 @@ struct PeerData { enum Action { SendValidationMessage(Vec, protocol_v1::ValidationProtocol), SendCollationMessage(Vec, protocol_v1::CollationProtocol), - ConnectToValidators(PeerSet, Vec, oneshot::Sender>), + ConnectToValidators { + validator_ids: Vec, + connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + revoke: oneshot::Receiver<()>, + }, ReportPeer(PeerId, ReputationChange), ActiveLeaves(ActiveLeavesUpdate), @@ -254,8 +273,11 @@ fn action_from_overseer_message( => Action::SendValidationMessage(peers, msg), NetworkBridgeMessage::SendCollationMessage(peers, msg) => Action::SendCollationMessage(peers, msg), - NetworkBridgeMessage::ConnectToValidators(peer_set, validators, res) - => Action::ConnectToValidators(peer_set, validators, res), + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + connected, + revoke, + } => Action::ConnectToValidators { validator_ids, connected, revoke }, }, Ok(FromOverseer::Signal(OverseerSignal::BlockFinalized(_))) => Action::Nop, @@ -538,11 +560,16 @@ async fn dispatch_collation_events_to_all( ctx.send_messages(events.into_iter().flat_map(messages_for)).await } -async fn run_network( - mut net: N, +async fn run_network( + mut network_service: N, + mut authority_discovery_service: AD, mut ctx: impl SubsystemContext, -) -> SubsystemResult<()> { - let mut event_stream = net.event_stream().fuse(); +) -> SubsystemResult<()> +where + N: Network + validator_discovery::Network, + AD: validator_discovery::AuthorityDiscovery, +{ + let mut event_stream = network_service.event_stream().fuse(); // Most recent heads are at the back. let mut live_heads: Vec = Vec::with_capacity(MAX_VIEW_HEADS); @@ -551,7 +578,10 @@ async fn run_network( let mut validation_peers: HashMap = HashMap::new(); let mut collation_peers: HashMap = HashMap::new(); + let mut validator_discovery = validator_discovery::Service::::new(); + loop { + let action = { let subsystem_next = ctx.recv().fuse(); let mut net_event_next = event_stream.next().fuse(); @@ -568,31 +598,43 @@ async fn run_network( Action::Abort => return Ok(()), Action::SendValidationMessage(peers, msg) => send_message( - &mut net, + &mut network_service, peers, PeerSet::Validation, WireMessage::ProtocolMessage(msg), ).await?, Action::SendCollationMessage(peers, msg) => send_message( - &mut net, + &mut network_service, peers, PeerSet::Collation, WireMessage::ProtocolMessage(msg), ).await?, - Action::ConnectToValidators(_peer_set, _validators, _res) => { - // TODO: https://github.com/paritytech/polkadot/issues/1461 - } + Action::ConnectToValidators { + validator_ids, + connected, + revoke, + } => { + let (ns, ads) = validator_discovery.on_request( + validator_ids, + connected, + revoke, + network_service, + authority_discovery_service, + ).await; + network_service = ns; + authority_discovery_service = ads; + }, - Action::ReportPeer(peer, rep) => net.report_peer(peer, rep).await?, + Action::ReportPeer(peer, rep) => network_service.report_peer(peer, rep).await?, Action::ActiveLeaves(ActiveLeavesUpdate { activated, deactivated }) => { live_heads.extend(activated); live_heads.retain(|h| !deactivated.contains(h)); update_view( - &mut net, + &mut network_service, &mut ctx, &live_heads, &mut local_view, @@ -607,9 +649,11 @@ async fn run_network( PeerSet::Collation => &mut collation_peers, }; + validator_discovery.on_peer_connected(&peer, &mut authority_discovery_service).await; + match peer_map.entry(peer.clone()) { - HEntry::Occupied(_) => continue, - HEntry::Vacant(vacant) => { + hash_map::Entry::Occupied(_) => continue, + hash_map::Entry::Vacant(vacant) => { vacant.insert(PeerData { view: View(Vec::new()), }); @@ -650,6 +694,8 @@ async fn run_network( PeerSet::Collation => &mut collation_peers, }; + validator_discovery.on_peer_disconnected(&peer, &mut authority_discovery_service).await; + if peer_map.remove(&peer).is_some() { let res = match peer_set { PeerSet::Validation => dispatch_validation_event_to_all( @@ -677,7 +723,7 @@ async fn run_network( peer.clone(), &mut validation_peers, v_messages, - &mut net, + &mut network_service, ).await?; if let Err(e) = dispatch_validation_events_to_all( @@ -697,7 +743,7 @@ async fn run_network( peer.clone(), &mut collation_peers, c_messages, - &mut net, + &mut network_service, ).await?; if let Err(e) = dispatch_collation_events_to_all( @@ -716,6 +762,7 @@ async fn run_network( } } + #[cfg(test)] mod tests { use super::*; @@ -723,6 +770,8 @@ mod tests { use futures::executor; use std::sync::Arc; + use std::collections::HashSet; + use async_trait::async_trait; use parking_lot::Mutex; use assert_matches::assert_matches; @@ -730,6 +779,7 @@ mod tests { use polkadot_node_subsystem_test_helpers::{ SingleItemSink, SingleItemStream, TestSubsystemContextHandle, }; + use sc_network::Multiaddr; use sp_keyring::Sr25519Keyring; // The subsystem's view of the network - only supports a single call to `event_stream`. @@ -738,6 +788,8 @@ mod tests { action_tx: mpsc::UnboundedSender, } + struct TestAuthorityDiscovery; + // The test's view of the network. This receives updates from the subsystem in the form // of `NetworkAction`s. struct TestNetworkHandle { @@ -748,6 +800,7 @@ mod tests { fn new_test_network() -> ( TestNetwork, TestNetworkHandle, + TestAuthorityDiscovery, ) { let (net_tx, net_rx) = polkadot_node_subsystem_test_helpers::single_item_sink(); let (action_tx, action_rx) = mpsc::unbounded(); @@ -761,6 +814,7 @@ mod tests { action_rx, net_tx, }, + TestAuthorityDiscovery, ) } @@ -786,6 +840,23 @@ mod tests { } } + impl validator_discovery::Network for TestNetwork { + fn set_priority_group(&self, _group_id: String, _multiaddresses: HashSet) -> Result<(), String> { + Ok(()) + } + } + + #[async_trait] + impl validator_discovery::AuthorityDiscovery for TestAuthorityDiscovery { + async fn get_addresses_by_authority_id(&mut self, _authority: AuthorityDiscoveryId) -> Option> { + None + } + + async fn get_authority_id_by_peer_id(&mut self, _peer_id: PeerId) -> Option { + None + } + } + impl TestNetworkHandle { // Get the next network action. async fn next_network_action(&mut self) -> NetworkAction { @@ -842,11 +913,12 @@ mod tests { fn test_harness>(test: impl FnOnce(TestHarness) -> T) { let pool = sp_core::testing::TaskExecutor::new(); - let (network, network_handle) = new_test_network(); + let (network, network_handle, discovery) = new_test_network(); let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); let network_bridge = run_network( network, + discovery, context, ) .map_err(|_| panic!("subsystem execution failed")) diff --git a/node/network/bridge/src/validator_discovery.rs b/node/network/bridge/src/validator_discovery.rs new file mode 100644 index 000000000000..26e63f2745d9 --- /dev/null +++ b/node/network/bridge/src/validator_discovery.rs @@ -0,0 +1,594 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! A validator discovery service for the Network Bridge. + +use core::marker::PhantomData; +use std::collections::{HashSet, HashMap, hash_map}; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::channel::{mpsc, oneshot}; + +use sc_network::Multiaddr; +use sc_authority_discovery::Service as AuthorityDiscoveryService; +use polkadot_node_network_protocol::PeerId; +use polkadot_primitives::v1::{AuthorityDiscoveryId, Block, Hash}; + +const PRIORITY_GROUP: &'static str = "parachain_validators"; + +/// An abstraction over networking for the purposes of validator discovery service. +pub trait Network: Send + 'static { + /// Ask the network to connect to these nodes and not disconnect from them until removed from the priority group. + fn set_priority_group(&self, group_id: String, multiaddresses: HashSet) -> Result<(), String>; + // TODO (ordian): we might want to add `add_to_priority_group` and `remove_from_priority_group` + // https://github.com/paritytech/polkadot/issues/1763 +} + +/// An abstraction over the authority discovery service. +#[async_trait] +pub trait AuthorityDiscovery: Send + 'static { + /// Get the addresses for the given [`AuthorityId`] from the local address cache. + async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option>; + /// Get the [`AuthorityId`] for the given [`PeerId`] from the local address cache. + async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option; +} + +impl Network for Arc> { + fn set_priority_group(&self, group_id: String, multiaddresses: HashSet) -> Result<(), String> { + sc_network::NetworkService::set_priority_group(&**self, group_id, multiaddresses) + } +} + +#[async_trait] +impl AuthorityDiscovery for AuthorityDiscoveryService { + async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option> { + AuthorityDiscoveryService::get_addresses_by_authority_id(self, authority).await + } + + async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option { + AuthorityDiscoveryService::get_authority_id_by_peer_id(self, peer_id).await + } +} + + +/// This struct tracks the state for one `ConnectToValidators` request. +struct NonRevokedConnectionRequestState { + requested: Vec, + pending: HashSet, + sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + revoke: oneshot::Receiver<()>, +} + +impl NonRevokedConnectionRequestState { + /// Create a new instance of `ConnectToValidatorsState`. + pub fn new( + requested: Vec, + pending: HashSet, + sender: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + revoke: oneshot::Receiver<()>, + ) -> Self { + Self { + requested, + pending, + sender, + revoke, + } + } + + pub fn on_authority_connected(&mut self, authority: &AuthorityDiscoveryId, peer_id: &PeerId) { + if self.pending.remove(authority) { + // an error may happen if the request was revoked or + // the channel's buffer is full, ignoring it is fine + let _ = self.sender.try_send((authority.clone(), peer_id.clone())); + } + } + + /// Returns `true` if the request is revoked. + pub fn is_revoked(&mut self) -> bool { + self.revoke + .try_recv() + .map_or(true, |r| r.is_some()) + } + + pub fn requested(&self) -> &[AuthorityDiscoveryId] { + self.requested.as_ref() + } +} + + +pub(super) struct Service { + // we assume one PeerId per AuthorityId is enough + connected_validators: HashMap, + // the `u64` counts the number of pending non-revoked requests for this validator + // note: the validators in this map are not necessarily present + // in the `connected_validators` map. + // Invariant: the value > 0 for non-revoked requests. + requested_validators: HashMap, + // keep for the network priority_group updates + validator_multiaddresses: HashSet, + non_revoked_discovery_requests: Vec, + // PhantomData used to make the struct generic instead of having generic methods + network: PhantomData, + authority_discovery: PhantomData, +} + +impl Service { + pub fn new() -> Self { + Self { + connected_validators: HashMap::new(), + requested_validators: HashMap::new(), + validator_multiaddresses: HashSet::new(), + non_revoked_discovery_requests: Vec::new(), + network: PhantomData, + authority_discovery: PhantomData, + } + } + + /// On a new connection request, a priority group update will be issued. + /// It will ask the network to connect to the validators and not disconnect + /// from them at least until all the pending requests containing them are revoked. + /// + /// This method will also clean up all previously revoked requests. + // it takes `network_service` and `authority_discovery_service` by value + // and returns them as a workaround for the Future: Send requirement imposed by async fn impl. + pub async fn on_request( + &mut self, + validator_ids: Vec, + mut connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + revoke: oneshot::Receiver<()>, + network_service: N, + mut authority_discovery_service: AD, + ) -> (N, AD) { + const MAX_ADDR_PER_PEER: usize = 3; + + let already_connected = validator_ids.iter() + .cloned() + .filter_map(|id| { + let counter = self.requested_validators.entry(id.clone()).or_default(); + // if the counter overflows, there is something really wrong going on + *counter += 1; + + self.connected_validators + .get(&id) + .map(|peer| (id, peer.clone())) + }); + + + let on_revoke = |map: &mut HashMap, id: AuthorityDiscoveryId| -> Option { + match map.entry(id) { + hash_map::Entry::Occupied(mut entry) => { + *entry.get_mut() -= 1; + if *entry.get() == 0 { + return Some(entry.remove_entry().0); + } + } + hash_map::Entry::Vacant(_) => { + // should be unreachable + } + } + None + }; + + // try to send already connected peers + for (id, peer) in already_connected { + match connected.try_send((id, peer)) { + Err(e) if e.is_disconnected() => { + // the request is already revoked + for peer_id in validator_ids { + on_revoke(&mut self.requested_validators, peer_id); + } + return (network_service, authority_discovery_service); + } + Err(_) => { + // the channel's buffer is full + // ignore the error, the receiver will miss out some peers + // but that's fine + break; + } + Ok(()) => continue, + } + } + + // collect multiaddress of validators + for authority in validator_ids.iter().cloned() { + let result = authority_discovery_service.get_addresses_by_authority_id(authority).await; + if let Some(addresses) = result { + // We might have several `PeerId`s per `AuthorityId` + // depending on the number of sentry nodes, + // so we limit the max number of sentries per node to connect to. + // They are going to be removed soon though: + // https://github.com/paritytech/substrate/issues/6845 + for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) { + self.validator_multiaddresses.insert(addr); + } + } + } + + // clean up revoked requests + let mut revoked_indices = Vec::new(); + let mut revoked_validators = Vec::new(); + for (i, maybe_revoked) in self.non_revoked_discovery_requests.iter_mut().enumerate() { + if maybe_revoked.is_revoked() { + for id in maybe_revoked.requested() { + if let Some(id) = on_revoke(&mut self.requested_validators, id.clone()) { + revoked_validators.push(id); + } + } + revoked_indices.push(i); + } + } + + // clean up revoked requests states + for to_revoke in revoked_indices.into_iter().rev() { + drop(self.non_revoked_discovery_requests.swap_remove(to_revoke)); + } + + // multiaddresses to remove + for id in revoked_validators.into_iter() { + let result = authority_discovery_service.get_addresses_by_authority_id(id).await; + if let Some(addresses) = result { + for addr in addresses.into_iter().take(MAX_ADDR_PER_PEER) { + self.validator_multiaddresses.remove(&addr); + } + } + } + + // ask the network to connect to these nodes and not disconnect + // from them until removed from the priority group + // TODO (ordian): this clones the whole set of multaddresses + // TODO (ordian): use add_to_priority_group for incremental updates? + if let Err(e) = network_service.set_priority_group( + PRIORITY_GROUP.to_owned(), + self.validator_multiaddresses.clone(), + ) { + log::warn!(target: super::TARGET, "AuthorityDiscoveryService returned an invalid multiaddress: {}", e); + } + + let pending = validator_ids.iter() + .cloned() + .filter(|id| !self.connected_validators.contains_key(id)) + .collect::>(); + + self.non_revoked_discovery_requests.push(NonRevokedConnectionRequestState::new( + validator_ids, + pending, + connected, + revoke, + )); + + (network_service, authority_discovery_service) + } + + pub async fn on_peer_connected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) { + // check if it's an authority we've been waiting for + let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await; + if let Some(authority) = maybe_authority { + for request in self.non_revoked_discovery_requests.iter_mut() { + request.on_authority_connected(&authority, peer_id); + } + self.connected_validators.insert(authority, peer_id.clone()); + } + } + + pub async fn on_peer_disconnected(&mut self, peer_id: &PeerId, authority_discovery_service: &mut AD) { + let maybe_authority = authority_discovery_service.get_authority_id_by_peer_id(peer_id.clone()).await; + if let Some(authority) = maybe_authority { + self.connected_validators.remove(&authority); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::stream::StreamExt as _; + + use sp_keyring::Sr25519Keyring; + + + fn new_service() -> Service { + Service::new() + } + + fn new_network() -> (TestNetwork, TestAuthorityDiscovery) { + (TestNetwork::default(), TestAuthorityDiscovery::new()) + } + + #[derive(Default)] + struct TestNetwork { + // Mutex is used because of &self signature of set_priority_group + priority_group: std::sync::Mutex>, + } + + struct TestAuthorityDiscovery { + by_authority_id: HashMap, + by_peer_id: HashMap, + } + + impl TestAuthorityDiscovery { + fn new() -> Self { + let peer_ids = known_peer_ids(); + let authorities = known_authorities(); + let multiaddr = known_multiaddr(); + Self { + by_authority_id: authorities.iter() + .cloned() + .zip(multiaddr.into_iter()) + .collect(), + by_peer_id: peer_ids.into_iter() + .zip(authorities.into_iter()) + .collect(), + } + } + } + + impl Network for TestNetwork { + fn set_priority_group(&self, _group_id: String, multiaddresses: HashSet) -> Result<(), String> { + let mut group = self.priority_group.lock().unwrap(); + *group = multiaddresses; + Ok(()) + } + } + + #[async_trait] + impl AuthorityDiscovery for TestAuthorityDiscovery { + async fn get_addresses_by_authority_id(&mut self, authority: AuthorityDiscoveryId) -> Option> { + self.by_authority_id.get(&authority).cloned().map(|addr| vec![addr]) + } + + async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option { + self.by_peer_id.get(&peer_id).cloned() + } + } + + fn known_authorities() -> Vec { + [ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + ].iter().map(|k| k.public().into()).collect() + } + + fn known_peer_ids() -> Vec { + (0..3).map(|_| PeerId::random()).collect() + } + + fn known_multiaddr() -> Vec { + vec![ + "/ip4/127.0.0.1/tcp/1234".parse().unwrap(), + "/ip4/127.0.0.1/tcp/1235".parse().unwrap(), + "/ip4/127.0.0.1/tcp/1236".parse().unwrap(), + ] + } + + #[test] + fn request_is_revoked_on_send() { + let (revoke_tx, revoke_rx) = oneshot::channel(); + let (sender, _receiver) = mpsc::channel(0); + + let mut request = NonRevokedConnectionRequestState::new( + Vec::new(), + HashSet::new(), + sender, + revoke_rx, + ); + + assert!(!request.is_revoked()); + + revoke_tx.send(()).unwrap(); + + assert!(request.is_revoked()); + } + + #[test] + fn request_is_revoked_when_the_sender_is_dropped() { + let (revoke_tx, revoke_rx) = oneshot::channel(); + let (sender, _receiver) = mpsc::channel(0); + + let mut request = NonRevokedConnectionRequestState::new( + Vec::new(), + HashSet::new(), + sender, + revoke_rx, + ); + + assert!(!request.is_revoked()); + + drop(revoke_tx); + + assert!(request.is_revoked()); + } + + #[test] + fn requests_are_fulfilled_immediately_for_already_connected_peers() { + let mut service = new_service(); + + let (ns, mut ads) = new_network(); + + let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); + let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); + + futures::executor::block_on(async move { + let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; + let (sender, mut receiver) = mpsc::channel(2); + let (_revoke_tx, revoke_rx) = oneshot::channel(); + + service.on_peer_connected(&peer_ids[0], &mut ads).await; + + let _ = service.on_request( + req1, + sender, + revoke_rx, + ns, + ads, + ).await; + + + // the results should be immediately available + let reply1 = receiver.next().await.unwrap(); + assert_eq!(reply1.0, authority_ids[0]); + assert_eq!(reply1.1, peer_ids[0]); + }); + } + + #[test] + fn requests_are_fulfilled_on_peer_connection() { + let mut service = new_service(); + + let (ns, ads) = new_network(); + + let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); + let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); + + futures::executor::block_on(async move { + let req1 = vec![authority_ids[0].clone(), authority_ids[1].clone()]; + let (sender, mut receiver) = mpsc::channel(2); + let (_revoke_tx, revoke_rx) = oneshot::channel(); + + let (_, mut ads) = service.on_request( + req1, + sender, + revoke_rx, + ns, + ads, + ).await; + + + service.on_peer_connected(&peer_ids[0], &mut ads).await; + let reply1 = receiver.next().await.unwrap(); + assert_eq!(reply1.0, authority_ids[0]); + assert_eq!(reply1.1, peer_ids[0]); + + service.on_peer_connected(&peer_ids[1], &mut ads).await; + let reply2 = receiver.next().await.unwrap(); + assert_eq!(reply2.0, authority_ids[1]); + assert_eq!(reply2.1, peer_ids[1]); + }); + } + + // Test cleanup works. + #[test] + fn requests_are_removed_on_revoke() { + let mut service = new_service(); + + let (ns, mut ads) = new_network(); + + let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); + let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); + + futures::executor::block_on(async move { + let (sender, mut receiver) = mpsc::channel(1); + let (revoke_tx, revoke_rx) = oneshot::channel(); + + service.on_peer_connected(&peer_ids[0], &mut ads).await; + service.on_peer_connected(&peer_ids[1], &mut ads).await; + + let (ns, ads) = service.on_request( + vec![authority_ids[0].clone()], + sender, + revoke_rx, + ns, + ads, + ).await; + + let _ = receiver.next().await.unwrap(); + // revoke the request + revoke_tx.send(()).unwrap(); + + let (sender, mut receiver) = mpsc::channel(1); + let (_revoke_tx, revoke_rx) = oneshot::channel(); + + let _ = service.on_request( + vec![authority_ids[1].clone()], + sender, + revoke_rx, + ns, + ads, + ).await; + + let reply = receiver.next().await.unwrap(); + assert_eq!(reply.0, authority_ids[1]); + assert_eq!(reply.1, peer_ids[1]); + assert_eq!(service.non_revoked_discovery_requests.len(), 1); + }); + } + + // More complex test with overlapping revoked requests + #[test] + fn revoking_requests_with_overlapping_validator_sets() { + let mut service = new_service(); + + let (ns, mut ads) = new_network(); + + let peer_ids: Vec<_> = ads.by_peer_id.keys().cloned().collect(); + let authority_ids: Vec<_> = ads.by_peer_id.values().cloned().collect(); + + futures::executor::block_on(async move { + let (sender, mut receiver) = mpsc::channel(1); + let (revoke_tx, revoke_rx) = oneshot::channel(); + + service.on_peer_connected(&peer_ids[0], &mut ads).await; + service.on_peer_connected(&peer_ids[1], &mut ads).await; + + let (ns, ads) = service.on_request( + vec![authority_ids[0].clone(), authority_ids[2].clone()], + sender, + revoke_rx, + ns, + ads, + ).await; + + let _ = receiver.next().await.unwrap(); + // revoke the first request + revoke_tx.send(()).unwrap(); + + let (sender, mut receiver) = mpsc::channel(1); + let (revoke_tx, revoke_rx) = oneshot::channel(); + + let (ns, ads) = service.on_request( + vec![authority_ids[0].clone(), authority_ids[1].clone()], + sender, + revoke_rx, + ns, + ads, + ).await; + + let _ = receiver.next().await.unwrap(); + assert_eq!(service.non_revoked_discovery_requests.len(), 1); + assert_eq!(ns.priority_group.lock().unwrap().len(), 2); + + // revoke the second request + revoke_tx.send(()).unwrap(); + + let (sender, mut receiver) = mpsc::channel(1); + let (_revoke_tx, revoke_rx) = oneshot::channel(); + + let (ns, _) = service.on_request( + vec![authority_ids[0].clone()], + sender, + revoke_rx, + ns, + ads, + ).await; + + let _ = receiver.next().await.unwrap(); + assert_eq!(service.non_revoked_discovery_requests.len(), 1); + assert_eq!(ns.priority_group.lock().unwrap().len(), 1); + }); + } +} diff --git a/node/network/collator-protocol/src/collator_side.rs b/node/network/collator-protocol/src/collator_side.rs index 11e50be723e3..2f2e6464f413 100644 --- a/node/network/collator-protocol/src/collator_side.rs +++ b/node/network/collator-protocol/src/collator_side.rs @@ -16,13 +16,17 @@ use std::collections::HashMap; +use super::{TARGET, Result}; + use futures::channel::oneshot; -use log::{trace, warn}; +use futures::stream::StreamExt as _; +use futures::task::Poll; +use log::warn; + use polkadot_primitives::v1::{ CollatorId, CoreIndex, CoreState, Hash, Id as ParaId, CandidateReceipt, PoV, ValidatorId, }; -use super::{TARGET, Result}; use polkadot_subsystem::{ FromOverseer, OverseerSignal, SubsystemContext, messages::{ @@ -31,9 +35,10 @@ use polkadot_subsystem::{ }, }; use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, PeerId, PeerSet, NetworkBridgeEvent, RequestId, + v1 as protocol_v1, View, PeerId, NetworkBridgeEvent, RequestId, }; use polkadot_node_subsystem_util::{ + validator_discovery, request_validators_ctx, request_validator_groups_ctx, metrics::{self, prometheus}, @@ -119,6 +124,9 @@ struct State { /// go out of scope with their respective deactivated leafs. known_validators: HashMap, + /// Use to await for the next validator connection and revoke the request. + last_connection_request: Option, + /// Metrics. metrics: Metrics, } @@ -188,7 +196,7 @@ where state.our_validators_groups.insert(relay_parent, our_validators.clone()); // Issue a discovery request for the validators of the current group and the next group. - connect_to_validators(ctx, state, our_validators).await?; + connect_to_validators(ctx, relay_parent, state, our_validators).await?; state.collations.insert(relay_parent, (receipt, pov)); @@ -289,26 +297,28 @@ where Ok(()) } -/// Issue a connection request to a set of validators. +/// Issue a connection request to a set of validators and +/// revoke the previous connection request. async fn connect_to_validators( ctx: &mut Context, + relay_parent: Hash, state: &mut State, validators: Vec, ) -> Result<()> where Context: SubsystemContext { - let (tx, rx) = oneshot::channel(); - - ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators(PeerSet::Collation, validators, tx), - )).await?; + if let Some(request) = state.last_connection_request.take() { + request.revoke(); + } - let mut validators_ids = rx.await?; + let request = validator_discovery::connect_to_validators( + ctx, + relay_parent, + validators, + ).await?; - for id in validators_ids.drain(..) { - state.known_validators.insert(id.1, id.0); - } + state.last_connection_request = Some(request); Ok(()) } @@ -533,11 +543,10 @@ where Ok(()) } -/// A peer is connected. +/// A validator is connected. /// -/// We first want to check if this is a validator we are expecting to talk to -/// and if so `Declare` that we are a collator with a given `CollatorId`. -async fn handle_peer_connected( +/// `Declare` that we are a collator with a given `CollatorId`. +async fn handle_validator_connected( ctx: &mut Context, state: &mut State, peer_id: PeerId, @@ -545,12 +554,6 @@ async fn handle_peer_connected( where Context: SubsystemContext { - if !state.known_validators.contains_key(&peer_id) { - trace!(target: TARGET, "An unknown peer has connected {:?}", peer_id); - - return Ok(()) - } - state.peer_views.entry(peer_id.clone()).or_default(); declare(ctx, state, vec![peer_id]).await?; @@ -570,8 +573,8 @@ where use NetworkBridgeEvent::*; match bridge_message { - PeerConnected(peer_id, _observed_role) => { - handle_peer_connected(ctx, state, peer_id).await?; + PeerConnected(_peer_id, _observed_role) => { + // validators first connection is handled by `handle_validator_connected` } PeerViewChange(peer_id, view) => { handle_peer_view_change(ctx, state, peer_id, view).await?; @@ -602,8 +605,8 @@ async fn handle_our_view_change( let removed = old_view.difference(&view).collect::>(); for removed in removed.into_iter() { - state.collations.remove(&removed); - if let Some(group) = state.our_validators_groups.remove(&removed) { + state.collations.remove(removed); + if let Some(group) = state.our_validators_groups.remove(removed) { state.known_validators.retain(|_, v| !group.contains(v)); } } @@ -631,25 +634,43 @@ where state.our_id = our_id; loop { - match ctx.recv().await? { - Communication { msg } => process_msg(&mut ctx, &mut state, msg).await?, - Signal(ActiveLeaves(_update)) => {} - Signal(BlockFinalized(_)) => {} - Signal(Conclude) => break, + if let Some(mut request) = state.last_connection_request.take() { + while let Poll::Ready(Some((validator_id, peer_id))) = futures::poll!(request.next()) { + state.known_validators.insert(peer_id.clone(), validator_id); + if let Err(err) = handle_validator_connected(&mut ctx, &mut state, peer_id).await { + warn!( + target: TARGET, + "Failed to declare our collator id: {:?}", + err, + ); + } + } + // put it back + state.last_connection_request = Some(request); } - } - Ok(()) + while let Poll::Ready(msg) = futures::poll!(ctx.recv()) { + match msg? { + Communication { msg } => process_msg(&mut ctx, &mut state, msg).await?, + Signal(ActiveLeaves(_update)) => {} + Signal(BlockFinalized(_)) => {} + Signal(Conclude) => return Ok(()), + } + } + + futures::pending!() + } } #[cfg(test)] mod tests { use super::*; - use log::trace; use std::time::Duration; - use futures::{executor, future, Future}; + use assert_matches::assert_matches; + use futures::{executor, future, Future}; + use log::trace; use smallvec::smallvec; use sp_core::crypto::Pair; @@ -657,12 +678,11 @@ mod tests { use polkadot_primitives::v1::{ BlockData, CandidateDescriptor, CollatorPair, ScheduledCore, - ValidatorIndex, GroupRotationInfo, + ValidatorIndex, GroupRotationInfo, AuthorityDiscoveryId, }; use polkadot_subsystem::ActiveLeavesUpdate; use polkadot_node_subsystem_util::TimeoutExt; use polkadot_subsystem_testhelpers as test_helpers; - use polkadot_node_network_protocol::ObservedRole; #[derive(Default)] struct TestCandidateBuilder { @@ -691,6 +711,7 @@ mod tests { chain_ids: Vec, validators: Vec, validator_public: Vec, + validator_authority_id: Vec, validator_peer_id: Vec, validator_groups: (Vec>, GroupRotationInfo), relay_parent: Hash, @@ -702,6 +723,10 @@ mod tests { val_ids.iter().map(|v| v.public().into()).collect() } + fn validator_authority_id(val_ids: &[Sr25519Keyring]) -> Vec { + val_ids.iter().map(|v| v.public().into()).collect() + } + impl Default for TestState { fn default() -> Self { let chain_a = ParaId::from(1); @@ -718,6 +743,7 @@ mod tests { ]; let validator_public = validator_pubkeys(&validators); + let validator_authority_id = validator_authority_id(&validators); let validator_peer_id = std::iter::repeat_with(|| PeerId::random()) .take(validator_public.len()) @@ -750,6 +776,7 @@ mod tests { chain_ids, validators, validator_public, + validator_authority_id, validator_peer_id, validator_groups, relay_parent, @@ -925,62 +952,82 @@ mod tests { } ); - // We now should connect to our validator group. + // obtain the validator_id to authority_id mapping assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridge( - NetworkBridgeMessage::ConnectToValidators( - peer_set, - validators, - tx, - ) - ) => { - assert_eq!(peer_set, PeerSet::Collation); + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators, tx), + )) => { + assert_eq!(relay_parent, current); assert_eq!(validators.len(), 4); assert!(validators.contains(&test_state.validator_public[2])); assert!(validators.contains(&test_state.validator_public[0])); assert!(validators.contains(&test_state.validator_public[4])); assert!(validators.contains(&test_state.validator_public[1])); - tx.send(vec![ - (test_state.validator_public[2].clone(), test_state.validator_peer_id[2].clone()), - (test_state.validator_public[0].clone(), test_state.validator_peer_id[0].clone()), - (test_state.validator_public[4].clone(), test_state.validator_peer_id[4].clone()), - (test_state.validator_public[1].clone(), test_state.validator_peer_id[1].clone()), - ]).unwrap(); + let result = vec![ + Some(test_state.validator_authority_id[2].clone()), + Some(test_state.validator_authority_id[0].clone()), + Some(test_state.validator_authority_id[4].clone()), + Some(test_state.validator_authority_id[1].clone()), + ]; + tx.send(Ok(result)).unwrap(); } ); - // Validator 2 connects. - overseer_send( - &mut virtual_overseer, - CollatorProtocolMessage::NetworkBridgeUpdateV1( - NetworkBridgeEvent::PeerConnected( - test_state.validator_peer_id[2].clone(), - ObservedRole::Authority, - ) - ), - ).await; - - // We declare to the connected validator that we are a collator. + // We now should connect to our validator group. assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - to, - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + mut connected, + .. + } ) => { - assert_eq!(to, vec![test_state.validator_peer_id[2].clone()]); - assert_matches!( - wire_message, - protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { - assert_eq!(collator_id, test_state.our_collator_pair.public()); - } - ); + assert_eq!(validator_ids.len(), 4); + assert!(validator_ids.contains(&test_state.validator_authority_id[2])); + assert!(validator_ids.contains(&test_state.validator_authority_id[0])); + assert!(validator_ids.contains(&test_state.validator_authority_id[4])); + assert!(validator_ids.contains(&test_state.validator_authority_id[1])); + + let result = vec![ + (test_state.validator_authority_id[2].clone(), test_state.validator_peer_id[2].clone()), + (test_state.validator_authority_id[0].clone(), test_state.validator_peer_id[0].clone()), + (test_state.validator_authority_id[4].clone(), test_state.validator_peer_id[4].clone()), + (test_state.validator_authority_id[1].clone(), test_state.validator_peer_id[1].clone()), + ]; + + for (id, peer_id) in result.into_iter() { + connected.try_send((id, peer_id)).unwrap(); + } } ); + // We declare to the connected validators that we are a collator. + // We need to catch all `Declare` messages to the validators we've + // previosly connected to. + for i in vec![2, 0, 4, 1].into_iter() { + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridge( + NetworkBridgeMessage::SendCollationMessage( + to, + protocol_v1::CollationProtocol::CollatorProtocol(wire_message), + ) + ) => { + assert_eq!(to, vec![test_state.validator_peer_id[i].clone()]); + assert_matches!( + wire_message, + protocol_v1::CollatorProtocolMessage::Declare(collator_id) => { + assert_eq!(collator_id, test_state.our_collator_pair.public()); + } + ); + } + ); + } + // Send info about peer's view. overseer_send( &mut virtual_overseer, diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index 647eb1ea46bc..ef659051e621 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -59,6 +59,16 @@ enum Error { Prometheus(prometheus::PrometheusError), } +impl From for Error { + fn from(me: util::validator_discovery::Error) -> Self { + match me { + util::validator_discovery::Error::Subsystem(s) => Error::Subsystem(s), + util::validator_discovery::Error::RuntimeApi(ra) => Error::RuntimeApi(ra), + util::validator_discovery::Error::Oneshot(c) => Error::Oneshot(c), + } + } +} + type Result = std::result::Result; enum ProtocolSide { diff --git a/node/subsystem-util/src/lib.rs b/node/subsystem-util/src/lib.rs index f796c6bc33d4..74797e67d8a3 100644 --- a/node/subsystem-util/src/lib.rs +++ b/node/subsystem-util/src/lib.rs @@ -57,6 +57,8 @@ use std::{ }; use streamunordered::{StreamUnordered, StreamYield}; +pub mod validator_discovery; + /// These reexports are required so that external crates can use the `delegated_subsystem` macro properly. pub mod reexports { pub use sp_core::traits::SpawnNamed; diff --git a/node/subsystem-util/src/validator_discovery.rs b/node/subsystem-util/src/validator_discovery.rs new file mode 100644 index 000000000000..4fb964945b01 --- /dev/null +++ b/node/subsystem-util/src/validator_discovery.rs @@ -0,0 +1,157 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Utility function to make it easier to connect to validators. + +use std::collections::HashMap; +use std::pin::Pin; + +use futures::{ + channel::{mpsc, oneshot}, + task::{Poll, self}, + stream, +}; + +use polkadot_node_subsystem::{ + errors::RuntimeApiError, SubsystemError, + messages::{AllMessages, RuntimeApiMessage, RuntimeApiRequest, NetworkBridgeMessage}, + SubsystemContext, +}; +use polkadot_primitives::v1::{Hash, ValidatorId, AuthorityDiscoveryId}; +use sc_network::PeerId; + +/// Error when making a request to connect to validators. +#[derive(Debug, derive_more::From)] +pub enum Error { + /// Attempted to send or receive on a oneshot channel which had been canceled + #[from] + Oneshot(oneshot::Canceled), + /// A subsystem error. + #[from] + Subsystem(SubsystemError), + /// An error in the Runtime API. + #[from] + RuntimeApi(RuntimeApiError), +} + +/// Utility function to make it easier to connect to validators. +pub async fn connect_to_validators( + ctx: &mut Context, + relay_parent: Hash, + validators: Vec, +) -> Result { + // ValidatorId -> AuthorityDiscoveryId + let (tx, rx) = oneshot::channel(); + + ctx.send_message(AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::ValidatorDiscovery(validators.clone(), tx), + ) + )).await?; + + let maybe_authorities = rx.await??; + let authorities: Vec<_> = maybe_authorities.iter() + .cloned() + .filter_map(|id| id) + .collect(); + + let validator_map = validators.into_iter() + .zip(maybe_authorities.into_iter()) + .filter_map(|(k, v)| v.map(|v| (v, k))) + .collect::>(); + + let (connections, revoke) = connect_to_authorities(ctx, authorities).await?; + + Ok(ConnectionRequest { + validator_map, + connections, + revoke, + }) +} + +async fn connect_to_authorities( + ctx: &mut Context, + validator_ids: Vec, +) -> Result<(mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, oneshot::Sender<()>), Error> { + const PEERS_CAPACITY: usize = 8; + + let (revoke_tx, revoke) = oneshot::channel(); + let (connected, connected_rx) = mpsc::channel(PEERS_CAPACITY); + + ctx.send_message(AllMessages::NetworkBridge( + NetworkBridgeMessage::ConnectToValidators { + validator_ids, + connected, + revoke, + } + )).await?; + + Ok((connected_rx, revoke_tx)) +} + +/// A pending connection request to validators. +/// This struct implements `Stream` to allow for asynchronous +/// discovery of validator addresses. +/// +/// NOTE: you should call `revoke` on this struct +/// when you're no longer interested in the requested validators. +#[must_use = "dropping a request will result in its immediate revokation"] +pub struct ConnectionRequest { + validator_map: HashMap, + #[must_use = "streams do nothing unless polled"] + connections: mpsc::Receiver<(AuthorityDiscoveryId, PeerId)>, + #[must_use = "a request should be revoked at some point"] + revoke: oneshot::Sender<()>, +} + +impl stream::Stream for ConnectionRequest { + type Item = (ValidatorId, PeerId); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll> { + if self.validator_map.is_empty() { + return Poll::Ready(None); + } + match Pin::new(&mut self.connections).poll_next(cx) { + Poll::Ready(Some((id, peer_id))) => { + if let Some(validator_id) = self.validator_map.remove(&id) { + return Poll::Ready(Some((validator_id, peer_id))); + } else { + // unknown authority_id + // should be unreachable + } + } + _ => {}, + } + Poll::Pending + } +} + +impl ConnectionRequest { + /// By revoking the request the caller allows the network to + /// free some peer slots thus freeing the resources. + /// It doesn't necessarily lead to peers disconnection though. + /// The revokation is enacted on in the next connection request. + /// + /// This can be done either by calling this function or dropping the request. + pub fn revoke(self) { + if let Err(_) = self.revoke.send(()) { + log::warn!( + "Failed to revoke a validator connection request", + ); + } + } +} diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index c97bd0087f62..35f92eca8aef 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -25,14 +25,14 @@ use futures::channel::{mpsc, oneshot}; use polkadot_node_network_protocol::{ - v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, PeerSet, + v1 as protocol_v1, NetworkBridgeEvent, ReputationChange, PeerId, }; use polkadot_node_primitives::{ CollationGenerationConfig, MisbehaviorReport, SignedFullStatement, ValidationResult, }; use polkadot_primitives::v1::{ - AvailableData, BackedCandidate, BlockNumber, CandidateDescriptor, CandidateEvent, - CandidateReceipt, CollatorId, CommittedCandidateReceipt, + AuthorityDiscoveryId, AvailableData, BackedCandidate, BlockNumber, CandidateDescriptor, + CandidateEvent, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState, ErasureChunk, GroupRotationInfo, Hash, Id as ParaId, OccupiedCoreAssumption, PersistedValidationData, PoV, SessionIndex, SignedAvailabilityBitfield, TransientValidationData, ValidationCode, ValidatorId, ValidationData, ValidatorIndex, @@ -196,11 +196,25 @@ pub enum NetworkBridgeMessage { /// Send a message to one or more peers on the collation peer-set. SendCollationMessage(Vec, protocol_v1::CollationProtocol), - /// Connect to peers who represent the given `ValidatorId`s at the given relay-parent. + /// Connect to peers who represent the given `validator_ids`. /// - /// Also accepts a response channel by which the issuer can learn the `PeerId`s of those - /// validators. - ConnectToValidators(PeerSet, Vec, oneshot::Sender>), + /// Also ask the network to stay connected to these peers at least + /// until the request is revoked. + ConnectToValidators { + /// Ids of the validators to connect to. + validator_ids: Vec, + /// Response sender by which the issuer can learn the `PeerId`s of + /// the validators as they are connected. + /// The response is sent immediately for already connected peers. + connected: mpsc::Sender<(AuthorityDiscoveryId, PeerId)>, + /// By revoking the request the caller allows the network to + /// free some peer slots thus freeing the resources. + /// It doesn't necessarily lead to peers disconnection though. + /// The revokation is enacted on in the next connection request. + /// + /// This can be done by sending to the channel or dropping the sender. + revoke: oneshot::Receiver<()>, + }, } impl NetworkBridgeMessage { @@ -210,7 +224,7 @@ impl NetworkBridgeMessage { Self::ReportPeer(_, _) => None, Self::SendValidationMessage(_, _) => None, Self::SendCollationMessage(_, _) => None, - Self::ConnectToValidators(_, _, _) => None, + Self::ConnectToValidators { .. } => None, } } } @@ -389,6 +403,11 @@ pub enum RuntimeApiRequest { /// Get all events concerning candidates (backing, inclusion, time-out) in the parent of /// the block in whose state this request is executed. CandidateEvents(RuntimeApiSender>), + /// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s. + /// Currently this request is limited to validators in the current session. + /// + /// Returns `None` for validators not found in the current session. + ValidatorDiscovery(Vec, RuntimeApiSender>>), } /// A message to the Runtime API subsystem. diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index bd3ad5b77823..ecba47aa9928 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -15,6 +15,7 @@ sp-version = { git = "https://github.com/paritytech/substrate", branch = "master sp-std = { package = "sp-std", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-arithmetic = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +sp-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } runtime_primitives = { package = "sp-runtime", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } polkadot-parachain = { path = "../parachain", default-features = false } polkadot-core-primitives = { path = "../core-primitives", default-features = false } diff --git a/primitives/src/v1.rs b/primitives/src/v1.rs index ded8cbc6fa77..b7f0b8427392 100644 --- a/primitives/src/v1.rs +++ b/primitives/src/v1.rs @@ -52,6 +52,7 @@ pub use crate::v0::{ pub use crate::v0::{ValidatorPair, CollatorPair}; pub use sp_staking::SessionIndex; +pub use sp_authority_discovery::AuthorityId as AuthorityDiscoveryId; /// Unique identifier for the Inclusion Inherent pub const INCLUSION_INHERENT_IDENTIFIER: InherentIdentifier = *b"inclusn0"; @@ -686,6 +687,13 @@ sp_api::decl_runtime_apis! { // initialization. #[skip_initialize_block] fn candidate_events() -> Vec>; + + /// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s. + /// Currently this request is limited to validators in the current session. + /// + /// We assume that every validator runs authority discovery, + /// which would allow us to establish point-to-point connection to given validators. + fn validator_discovery(validators: Vec) -> Vec>; } } diff --git a/roadmap/implementers-guide/src/node/utility/network-bridge.md b/roadmap/implementers-guide/src/node/utility/network-bridge.md index ef04090629f1..5ace56b2a9c8 100644 --- a/roadmap/implementers-guide/src/node/utility/network-bridge.md +++ b/roadmap/implementers-guide/src/node/utility/network-bridge.md @@ -92,10 +92,12 @@ Map the message onto the corresponding [Event Handler](#event-handlers) based on ### ConnectToValidators +> TODO: Currently, this request is limited to the validators in the current session. + - Determine the DHT keys to use for each validator based on the relay-chain state and Runtime API. - Recover the Peer IDs of the validators from the DHT. There may be more than one peer ID per validator. -- Accumulate all `(ValidatorId, PeerId)` pairs and send on the response channel. -- Feed all Peer IDs to peer set manager the underlying network provides, indicating the expected peer-set. +- Send all `(ValidatorId, PeerId)` pairs on the response channel. +- Feed all Peer IDs to peer set manager the underlying network provides. ## Event Handlers diff --git a/roadmap/implementers-guide/src/types/overseer-protocol.md b/roadmap/implementers-guide/src/types/overseer-protocol.md index 9d92e326a254..e32a17f80423 100644 --- a/roadmap/implementers-guide/src/types/overseer-protocol.md +++ b/roadmap/implementers-guide/src/types/overseer-protocol.md @@ -212,11 +212,25 @@ enum NetworkBridgeMessage { SendValidationMessage([PeerId], ValidationProtocolV1), /// Send a message to one or more peers on the collation peerset. SendCollationMessage([PeerId], ValidationProtocolV1), - /// Connect to peers who represent the given `ValidatorId`s at the given relay-parent. + /// Connect to peers who represent the given `validator_ids`. /// - /// Also accepts a response channel by which the issuer can learn the `PeerId`s of those - /// validators. - ConnectToValidators(PeerSet, [ValidatorId], ResponseChannel<[(ValidatorId, PeerId)]>>), + /// Also ask the network to stay connected to these peers at least + /// until the request is revoked. + ConnectToValidators { + /// Ids of the validators to connect to. + validator_ids: Vec, + /// Response sender by which the issuer can learn the `PeerId`s of + /// the validators as they are connected. + /// The response is sent immediately for already connected peers. + connected: ResponseStream<(AuthorityDiscoveryId, PeerId)>, + /// By revoking the request the caller allows the network to + /// free some peer slots thus freeing the resources. + /// It doesn't necessarily lead to peers disconnection though. + /// The revokation is enacted on in the next connection request. + /// + /// This can be done by sending to the channel or dropping the sender. + revoke: ReceiverChannel<()>, + }, } ``` diff --git a/runtime/kusama/src/lib.rs b/runtime/kusama/src/lib.rs index 08b07212ca7f..f9e1af928d9f 100644 --- a/runtime/kusama/src/lib.rs +++ b/runtime/kusama/src/lib.rs @@ -1137,6 +1137,10 @@ sp_api::impl_runtime_apis! { fn candidate_events() -> Vec> { Vec::new() } + + fn validator_discovery(_: Vec) -> Vec> { + Vec::new() + } } impl fg_primitives::GrandpaApi for Runtime { diff --git a/runtime/parachains/Cargo.toml b/runtime/parachains/Cargo.toml index a2aef4d65cb1..2beef27f9c9b 100644 --- a/runtime/parachains/Cargo.toml +++ b/runtime/parachains/Cargo.toml @@ -21,6 +21,7 @@ sp-session = { git = "https://github.com/paritytech/substrate", branch = "master sp-staking = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } +pallet-authority-discovery = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } pallet-authorship = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } pallet-balances = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } pallet-session = { git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } diff --git a/runtime/parachains/src/runtime_api_impl/v1.rs b/runtime/parachains/src/runtime_api_impl/v1.rs index 716d3eed9cb1..2c85a9e91792 100644 --- a/runtime/parachains/src/runtime_api_impl/v1.rs +++ b/runtime/parachains/src/runtime_api_impl/v1.rs @@ -22,7 +22,7 @@ use primitives::v1::{ ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, ValidationData, Id as ParaId, OccupiedCoreAssumption, SessionIndex, ValidationCode, CommittedCandidateReceipt, ScheduledCore, OccupiedCore, CoreOccupied, CoreIndex, - GroupIndex, CandidateEvent, PersistedValidationData, + GroupIndex, CandidateEvent, PersistedValidationData, AuthorityDiscoveryId, }; use sp_runtime::traits::Zero; use frame_support::debug; @@ -266,3 +266,27 @@ where }) .collect() } + +/// Get the `AuthorityDiscoveryId`s corresponding to the given `ValidatorId`s. +/// Currently this request is limited to validators in the current session. +/// +/// We assume that every validator runs authority discovery, +/// which would allow us to establish point-to-point connection to given validators. +// FIXME: handle previous sessions: +// https://github.com/paritytech/polkadot/issues/1461 +pub fn validator_discovery(validators: Vec) -> Vec> +where + T: initializer::Trait + pallet_authority_discovery::Trait, +{ + // FIXME: the mapping might be invalid if a session change happens in between the calls + // use SessionInfo from https://github.com/paritytech/polkadot/pull/1691 + let current_validators = >::validators(); + let authorities = >::authorities(); + // We assume the same ordering in authorities as in validators so we can do an index search + validators.iter().map(|id| { + // FIXME: linear search is slow O(n^2) + // use SessionInfo from https://github.com/paritytech/polkadot/pull/1691 + let validator_index = current_validators.iter().position(|v| v == id); + validator_index.and_then(|i| authorities.get(i).cloned()) + }).collect() +} diff --git a/runtime/polkadot/src/lib.rs b/runtime/polkadot/src/lib.rs index cd77d8f1abce..f19fd0fd16dd 100644 --- a/runtime/polkadot/src/lib.rs +++ b/runtime/polkadot/src/lib.rs @@ -1133,6 +1133,10 @@ sp_api::impl_runtime_apis! { fn candidate_events() -> Vec> { Vec::new() } + + fn validator_discovery(_: Vec) -> Vec> { + Vec::new() + } } impl fg_primitives::GrandpaApi for Runtime { diff --git a/runtime/rococo-v1/src/lib.rs b/runtime/rococo-v1/src/lib.rs index a026c91c6fc3..688c9fa1d2d2 100644 --- a/runtime/rococo-v1/src/lib.rs +++ b/runtime/rococo-v1/src/lib.rs @@ -212,6 +212,9 @@ sp_api::impl_runtime_apis! { } }) } + fn validator_discovery(validators: Vec) -> Vec> { + runtime_api_impl::validator_discovery::(validators) + } } impl fg_primitives::GrandpaApi for Runtime { @@ -338,6 +341,7 @@ impl_opaque_keys! { pub babe: Babe, pub im_online: ImOnline, pub parachain_validator: Initializer, + pub authority_discovery: AuthorityDiscovery, } } diff --git a/runtime/westend/src/lib.rs b/runtime/westend/src/lib.rs index 6eb91f5faf9e..70e183650b34 100644 --- a/runtime/westend/src/lib.rs +++ b/runtime/westend/src/lib.rs @@ -849,6 +849,10 @@ sp_api::impl_runtime_apis! { fn candidate_events() -> Vec> { Vec::new() } + + fn validator_discovery(_: Vec) -> Vec> { + Vec::new() + } } impl fg_primitives::GrandpaApi for Runtime {