diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs
index 800f683aa0aef..1bbb9f38796c2 100644
--- a/client/authority-discovery/src/lib.rs
+++ b/client/authority-discovery/src/lib.rs
@@ -18,6 +18,7 @@
#![warn(missing_docs)]
#![recursion_limit = "1024"]
+
//! Substrate authority discovery.
//!
//! This crate enables Substrate authorities to discover and directly connect to
@@ -31,7 +32,7 @@ pub use crate::{
worker::{NetworkProvider, Role, Worker},
};
-use std::{sync::Arc, time::Duration};
+use std::{collections::HashSet, sync::Arc, time::Duration};
use futures::{
channel::{mpsc, oneshot},
@@ -58,11 +59,13 @@ pub struct WorkerConfig {
///
/// By default this is set to 1 hour.
pub max_publish_interval: Duration,
+
/// Interval at which the keystore is queried. If the keys have changed, unconditionally
/// re-publish its addresses on the DHT.
///
/// By default this is set to 1 minute.
pub keystore_refresh_interval: Duration,
+
/// The maximum interval in which the node will query the DHT for new entries.
///
/// By default this is set to 10 minutes.
@@ -156,7 +159,7 @@ where
/// Message send from the [`Service`] to the [`Worker`].
pub(crate) enum ServicetoWorkerMsg {
/// See [`Service::get_addresses_by_authority_id`].
- GetAddressesByAuthorityId(AuthorityId, oneshot::Sender>>),
- /// See [`Service::get_authority_id_by_peer_id`].
- GetAuthorityIdByPeerId(PeerId, oneshot::Sender >),
+ GetAddressesByAuthorityId(AuthorityId, oneshot::Sender >>),
+ /// See [`Service::get_authority_ids_by_peer_id`].
+ GetAuthorityIdsByPeerId(PeerId, oneshot::Sender >>),
}
diff --git a/client/authority-discovery/src/service.rs b/client/authority-discovery/src/service.rs
index 2e5ae66e4dd4a..9b59a4ec8647f 100644
--- a/client/authority-discovery/src/service.rs
+++ b/client/authority-discovery/src/service.rs
@@ -16,7 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-use std::fmt::Debug;
+use std::{collections::HashSet, fmt::Debug};
use crate::ServicetoWorkerMsg;
@@ -62,7 +62,7 @@ impl Service {
pub async fn get_addresses_by_authority_id(
&mut self,
authority: AuthorityId,
- ) -> Option> {
+ ) -> Option> {
let (tx, rx) = oneshot::channel();
self.to_worker
@@ -78,11 +78,14 @@ impl Service {
///
/// Returns `None` if no entry was present or connection to the
/// [`crate::Worker`] failed.
- pub async fn get_authority_id_by_peer_id(&mut self, peer_id: PeerId) -> Option {
+ pub async fn get_authority_ids_by_peer_id(
+ &mut self,
+ peer_id: PeerId,
+ ) -> Option> {
let (tx, rx) = oneshot::channel();
self.to_worker
- .send(ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, tx))
+ .send(ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, tx))
.await
.ok()?;
diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs
index 3784b4c834266..cef91445064ca 100644
--- a/client/authority-discovery/src/tests.rs
+++ b/client/authority-discovery/src/tests.rs
@@ -29,7 +29,7 @@ use libp2p::core::{
multiaddr::{Multiaddr, Protocol},
PeerId,
};
-use std::sync::Arc;
+use std::{collections::HashSet, sync::Arc};
use sp_authority_discovery::AuthorityId;
use sp_core::crypto::key_types;
@@ -73,12 +73,12 @@ fn get_addresses_and_authority_id() {
pool.run_until(async {
assert_eq!(
- Some(vec![remote_addr]),
+ Some(HashSet::from([remote_addr])),
service.get_addresses_by_authority_id(remote_authority_id.clone()).await,
);
assert_eq!(
- Some(remote_authority_id),
- service.get_authority_id_by_peer_id(remote_peer_id).await,
+ Some(HashSet::from([remote_authority_id])),
+ service.get_authority_ids_by_peer_id(remote_peer_id).await,
);
});
}
diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs
index a689d0bafd262..00021ecbdcb83 100644
--- a/client/authority-discovery/src/worker.rs
+++ b/client/authority-discovery/src/worker.rs
@@ -259,9 +259,9 @@ where
self.addr_cache.get_addresses_by_authority_id(&authority).map(Clone::clone),
);
},
- ServicetoWorkerMsg::GetAuthorityIdByPeerId(peer_id, sender) => {
+ ServicetoWorkerMsg::GetAuthorityIdsByPeerId(peer_id, sender) => {
let _ = sender
- .send(self.addr_cache.get_authority_id_by_peer_id(&peer_id).map(Clone::clone));
+ .send(self.addr_cache.get_authority_ids_by_peer_id(&peer_id).map(Clone::clone));
},
}
}
@@ -374,7 +374,7 @@ where
.map_err(|e| Error::CallingRuntime(e.into()))?
.into_iter()
.filter(|id| !local_keys.contains(id.as_ref()))
- .collect();
+ .collect::>();
self.addr_cache.retain_ids(&authorities);
@@ -548,7 +548,7 @@ where
if let Some(metrics) = &self.metrics {
metrics
.known_authorities_count
- .set(self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX));
+ .set(self.addr_cache.num_authority_ids().try_into().unwrap_or(std::u64::MAX));
}
}
Ok(())
diff --git a/client/authority-discovery/src/worker/addr_cache.rs b/client/authority-discovery/src/worker/addr_cache.rs
index e770297f6f3be..d4ba156d5fa19 100644
--- a/client/authority-discovery/src/worker/addr_cache.rs
+++ b/client/authority-discovery/src/worker/addr_cache.rs
@@ -17,79 +17,94 @@
// along with this program. If not, see .
use libp2p::core::multiaddr::{Multiaddr, Protocol};
-use std::collections::HashMap;
use sc_network::PeerId;
use sp_authority_discovery::AuthorityId;
+use std::collections::{hash_map::Entry, HashMap, HashSet};
-/// Cache for [`AuthorityId`] -> [`Vec`] and [`PeerId`] -> [`AuthorityId`] mappings.
+/// Cache for [`AuthorityId`] -> [`HashSet`] and [`PeerId`] -> [`HashSet`]
+/// mappings.
pub(super) struct AddrCache {
- // The addresses found in `authority_id_to_addresses` are guaranteed to always match
- // the peerids found in `peer_id_to_authority_id`. In other words, these two hashmaps
- // are similar to a bi-directional map.
- authority_id_to_addresses: HashMap>,
- peer_id_to_authority_id: HashMap,
+ /// The addresses found in `authority_id_to_addresses` are guaranteed to always match
+ /// the peerids found in `peer_id_to_authority_ids`. In other words, these two hashmaps
+ /// are similar to a bi-directional map.
+ ///
+ /// Since we may store the mapping across several sessions, a single
+ /// `PeerId` might correspond to multiple `AuthorityId`s. However,
+ /// it's not expected that a single `AuthorityId` can have multiple `PeerId`s.
+ authority_id_to_addresses: HashMap>,
+ peer_id_to_authority_ids: HashMap>,
}
impl AddrCache {
pub fn new() -> Self {
AddrCache {
authority_id_to_addresses: HashMap::new(),
- peer_id_to_authority_id: HashMap::new(),
+ peer_id_to_authority_ids: HashMap::new(),
}
}
/// Inserts the given [`AuthorityId`] and [`Vec`] pair for future lookups by
/// [`AuthorityId`] or [`PeerId`].
- pub fn insert(&mut self, authority_id: AuthorityId, mut addresses: Vec) {
- addresses.sort_unstable_by(|a, b| a.as_ref().cmp(b.as_ref()));
+ pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec) {
+ let addresses = addresses.into_iter().collect::>();
+ let peer_ids = addresses_to_peer_ids(&addresses);
+
+ if peer_ids.is_empty() {
+ log::debug!(
+ target: super::LOG_TARGET,
+ "Authority({:?}) provides no addresses or addresses without peer ids. Adresses: {:?}",
+ authority_id,
+ addresses,
+ );
- // Insert into `self.peer_id_to_authority_id`.
- let peer_ids = addresses
- .iter()
- .map(|a| peer_id_from_multiaddr(a))
- .filter_map(|peer_id| peer_id);
- for peer_id in peer_ids.clone() {
- let former_auth =
- match self.peer_id_to_authority_id.insert(peer_id, authority_id.clone()) {
- Some(a) if a != authority_id => a,
- _ => continue,
- };
-
- // PeerId was associated to a different authority id before.
- // Remove corresponding authority from `self.authority_id_to_addresses`.
- let former_auth_addrs = match self.authority_id_to_addresses.get_mut(&former_auth) {
- Some(a) => a,
- None => {
- debug_assert!(false);
- continue
- },
- };
- former_auth_addrs.retain(|a| peer_id_from_multiaddr(a).map_or(true, |p| p != peer_id));
+ return
+ } else if peer_ids.len() > 1 {
+ log::warn!(
+ target: super::LOG_TARGET,
+ "Authority({:?}) can be reached through multiple peer ids: {:?}",
+ authority_id,
+ peer_ids
+ );
}
- // Insert into `self.authority_id_to_addresses`.
- for former_addr in self
- .authority_id_to_addresses
- .insert(authority_id.clone(), addresses.clone())
- .unwrap_or_default()
- {
- // Must remove from `self.peer_id_to_authority_id` any PeerId formerly associated
- // to that authority but that can't be found in its new addresses.
-
- let peer_id = match peer_id_from_multiaddr(&former_addr) {
- Some(p) => p,
- None => continue,
- };
+ let old_addresses = self.authority_id_to_addresses.insert(authority_id.clone(), addresses);
+ let old_peer_ids = addresses_to_peer_ids(&old_addresses.unwrap_or_default());
- if !peer_ids.clone().any(|p| p == peer_id) {
- self.peer_id_to_authority_id.remove(&peer_id);
+ // Add the new peer ids
+ peer_ids.difference(&old_peer_ids).for_each(|new_peer_id| {
+ self.peer_id_to_authority_ids
+ .entry(*new_peer_id)
+ .or_default()
+ .insert(authority_id.clone());
+ });
+
+ // Remove the old peer ids
+ self.remove_authority_id_from_peer_ids(&authority_id, old_peer_ids.difference(&peer_ids));
+ }
+
+ /// Remove the given `authority_id` from the `peer_id` to `authority_ids` mapping.
+ ///
+ /// If a `peer_id` doesn't have any `authority_id` assigned anymore, it is removed.
+ fn remove_authority_id_from_peer_ids<'a>(
+ &mut self,
+ authority_id: &AuthorityId,
+ peer_ids: impl Iterator- ,
+ ) {
+ peer_ids.for_each(|peer_id| {
+ if let Entry::Occupied(mut e) = self.peer_id_to_authority_ids.entry(*peer_id) {
+ e.get_mut().remove(authority_id);
+
+ // If there are no more entries, remove the peer id.
+ if e.get().is_empty() {
+ e.remove();
+ }
}
- }
+ })
}
/// Returns the number of authority IDs in the cache.
- pub fn num_ids(&self) -> usize {
+ pub fn num_authority_ids(&self) -> usize {
self.authority_id_to_addresses.len()
}
@@ -97,18 +112,21 @@ impl AddrCache {
pub fn get_addresses_by_authority_id(
&self,
authority_id: &AuthorityId,
- ) -> Option<&Vec
> {
- self.authority_id_to_addresses.get(&authority_id)
+ ) -> Option<&HashSet> {
+ self.authority_id_to_addresses.get(authority_id)
}
- /// Returns the [`AuthorityId`] for the given [`PeerId`].
- pub fn get_authority_id_by_peer_id(&self, peer_id: &PeerId) -> Option<&AuthorityId> {
- self.peer_id_to_authority_id.get(peer_id)
+ /// Returns the [`AuthorityId`]s for the given [`PeerId`].
+ ///
+ /// As the authority id can change between sessions, one [`PeerId`] can be mapped to
+ /// multiple authority ids.
+ pub fn get_authority_ids_by_peer_id(&self, peer_id: &PeerId) -> Option<&HashSet> {
+ self.peer_id_to_authority_ids.get(peer_id)
}
/// Removes all [`PeerId`]s and [`Multiaddr`]s from the cache that are not related to the given
/// [`AuthorityId`]s.
- pub fn retain_ids(&mut self, authority_ids: &Vec) {
+ pub fn retain_ids(&mut self, authority_ids: &[AuthorityId]) {
// The below logic could be replaced by `BtreeMap::drain_filter` once it stabilized.
let authority_ids_to_remove = self
.authority_id_to_addresses
@@ -120,19 +138,18 @@ impl AddrCache {
for authority_id_to_remove in authority_ids_to_remove {
// Remove other entries from `self.authority_id_to_addresses`.
- let addresses = self.authority_id_to_addresses.remove(&authority_id_to_remove);
-
- // Remove other entries from `self.peer_id_to_authority_id`.
- let peer_ids = addresses
- .iter()
- .flatten()
- .map(|a| peer_id_from_multiaddr(a))
- .filter_map(|peer_id| peer_id);
- for peer_id in peer_ids {
- if let Some(id) = self.peer_id_to_authority_id.remove(&peer_id) {
- debug_assert_eq!(authority_id_to_remove, id);
- }
- }
+ let addresses = if let Some(addresses) =
+ self.authority_id_to_addresses.remove(&authority_id_to_remove)
+ {
+ addresses
+ } else {
+ continue
+ };
+
+ self.remove_authority_id_from_peer_ids(
+ &authority_id_to_remove,
+ addresses_to_peer_ids(&addresses).iter(),
+ );
}
}
}
@@ -147,6 +164,13 @@ fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option {
})
}
+fn addresses_to_peer_ids(addresses: &HashSet) -> HashSet {
+ addresses
+ .iter()
+ .filter_map(|a| peer_id_from_multiaddr(a))
+ .collect::>()
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -226,27 +250,27 @@ mod tests {
cache.insert(third.0.clone(), vec![third.1.clone()]);
assert_eq!(
- Some(&vec![third.1.clone()]),
+ Some(&HashSet::from([third.1.clone()])),
cache.get_addresses_by_authority_id(&third.0),
- "Expect `get_addresses_by_authority_id` to return addresses of third authority."
+ "Expect `get_addresses_by_authority_id` to return addresses of third authority.",
);
assert_eq!(
- Some(&third.0),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
- "Expect `get_authority_id_by_peer_id` to return `AuthorityId` of third authority."
+ Some(&HashSet::from([third.0.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
+ "Expect `get_authority_id_by_peer_id` to return `AuthorityId` of third authority.",
);
- cache.retain_ids(&vec![first.0, second.0]);
+ cache.retain_ids(&vec![first.0.clone(), second.0]);
assert_eq!(
None,
cache.get_addresses_by_authority_id(&third.0),
- "Expect `get_addresses_by_authority_id` to not return `None` for third authority."
+ "Expect `get_addresses_by_authority_id` to not return `None` for third authority.",
);
assert_eq!(
None,
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
- "Expect `get_authority_id_by_peer_id` to return `None` for third authority."
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&third.1).unwrap()),
+ "Expect `get_authority_id_by_peer_id` to return `None` for third authority.",
);
TestResult::passed()
@@ -282,44 +306,47 @@ mod tests {
assert_eq!(
None,
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr1).unwrap())
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr1).unwrap())
);
assert_eq!(
- Some(&authority1),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
+ Some(&HashSet::from([authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
);
assert_eq!(
- Some(&authority1),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
+ Some(&HashSet::from([authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
);
assert_eq!(
- Some(&authority1),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr4).unwrap())
+ Some(&HashSet::from([authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr4).unwrap())
);
cache.insert(authority2.clone(), vec![multiaddr2.clone()]);
assert_eq!(
- Some(&authority2),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
+ Some(&HashSet::from([authority2.clone(), authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
);
assert_eq!(
- Some(&authority1),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
+ Some(&HashSet::from([authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
);
- assert_eq!(cache.get_addresses_by_authority_id(&authority1).unwrap().len(), 2);
+ assert_eq!(cache.get_addresses_by_authority_id(&authority1).unwrap().len(), 3);
cache.insert(authority2.clone(), vec![multiaddr2.clone(), multiaddr3.clone()]);
assert_eq!(
- Some(&authority2),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
+ Some(&HashSet::from([authority2.clone(), authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr2).unwrap())
+ );
+ assert_eq!(
+ Some(&HashSet::from([authority2.clone(), authority1.clone()])),
+ cache.get_authority_ids_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
);
assert_eq!(
- Some(&authority2),
- cache.get_authority_id_by_peer_id(&peer_id_from_multiaddr(&multiaddr3).unwrap())
+ &HashSet::from([multiaddr2.clone(), multiaddr3.clone(), multiaddr4.clone()]),
+ cache.get_addresses_by_authority_id(&authority1).unwrap(),
);
- assert!(cache.get_addresses_by_authority_id(&authority1).unwrap().is_empty());
TestResult::passed()
}
@@ -328,4 +355,31 @@ mod tests {
.max_tests(10)
.quickcheck(property as fn(_, _, _, _, _) -> TestResult)
}
+
+ /// As the runtime gives us the current + next authority ids, it can happen that some
+ /// authority changed its session keys. Changing the sessions keys leads to having two
+ /// authority ids that map to the same `PeerId` & addresses.
+ #[test]
+ fn adding_two_authority_ids_for_the_same_peer_id() {
+ let mut addr_cache = AddrCache::new();
+
+ let peer_id = PeerId::random();
+ let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into()));
+
+ let authority_id0 = AuthorityPair::generate().0.public();
+ let authority_id1 = AuthorityPair::generate().0.public();
+
+ addr_cache.insert(authority_id0.clone(), vec![addr.clone()]);
+ addr_cache.insert(authority_id1.clone(), vec![addr.clone()]);
+
+ assert_eq!(2, addr_cache.num_authority_ids());
+ assert_eq!(
+ &HashSet::from([addr.clone()]),
+ addr_cache.get_addresses_by_authority_id(&authority_id0).unwrap()
+ );
+ assert_eq!(
+ &HashSet::from([addr]),
+ addr_cache.get_addresses_by_authority_id(&authority_id1).unwrap()
+ );
+ }
}
diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs
index 3c1610256f5bc..130aea71fdfb0 100644
--- a/client/authority-discovery/src/worker/tests.rs
+++ b/client/authority-discovery/src/worker/tests.rs
@@ -19,6 +19,7 @@
use crate::worker::schema;
use std::{
+ collections::HashSet,
sync::{Arc, Mutex},
task::Poll,
};
@@ -469,7 +470,7 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() {
.send(ServicetoWorkerMsg::GetAddressesByAuthorityId(remote_public_key, sender))
.await
.expect("Channel has capacity of 1.");
- assert_eq!(Some(vec![remote_multiaddr]), addresses.await.unwrap());
+ assert_eq!(Some(HashSet::from([remote_multiaddr])), addresses.await.unwrap());
});
}
@@ -562,7 +563,7 @@ fn do_not_cache_addresses_without_peer_id() {
local_worker.handle_dht_value_found_event(vec![dht_event]).unwrap();
assert_eq!(
- Some(&vec![multiaddr_with_peer_id]),
+ Some(&HashSet::from([multiaddr_with_peer_id])),
local_worker.addr_cache.get_addresses_by_authority_id(&remote_public.into()),
"Expect worker to only cache `Multiaddr`s with `PeerId`s.",
);