diff --git a/Cargo.lock b/Cargo.lock index 35a1623ee2d8e..6b817081e4519 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -19225,6 +19225,7 @@ dependencies = [ "async-trait", "futures", "futures-timer", + "hex", "ip_network", "linked_hash_set", "log", @@ -19236,6 +19237,9 @@ dependencies = [ "sc-client-api", "sc-network", "sc-network-types", + "sc-service", + "serde", + "serde_json", "sp-api 26.0.0", "sp-authority-discovery", "sp-blockchain", @@ -19245,7 +19249,9 @@ dependencies = [ "sp-tracing 16.0.0", "substrate-prometheus-endpoint", "substrate-test-runtime-client", + "tempfile", "thiserror 1.0.65", + "tokio", ] [[package]] @@ -20235,6 +20241,8 @@ dependencies = [ "multihash 0.19.1", "quickcheck", "rand 0.8.5", + "serde", + "serde_with", "thiserror 1.0.65", "zeroize", ] @@ -21378,6 +21386,34 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" +dependencies = [ + "darling", + "proc-macro2 1.0.95", + "quote 1.0.40", + "syn 2.0.98", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" diff --git a/Cargo.toml b/Cargo.toml index 5c8e1012b07c0..7ad96473a7543 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1275,6 +1275,7 @@ serde = { version = "1.0.214", default-features = false } serde-big-array = { version = "0.3.2" } serde_derive = { version = "1.0.117" } serde_json = { version = "1.0.132", default-features = false } +serde_with = { version = "3.12.0", default-features = false, features = ["hex", "macros"] } serde_yaml = { version = "0.9" } sha1 = { version = "0.10.6" } sha2 = { version = "0.10.7", default-features = false } diff --git a/cumulus/client/relay-chain-minimal-node/src/lib.rs b/cumulus/client/relay-chain-minimal-node/src/lib.rs index b651e994872f9..f5d2c613f5a92 100644 --- a/cumulus/client/relay-chain-minimal-node/src/lib.rs +++ b/cumulus/client/relay-chain-minimal-node/src/lib.rs @@ -69,12 +69,14 @@ fn build_authority_discovery_service( _ => None, } }); + let net_config_path = config.network.net_config_path.clone(); let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, public_addresses: auth_disc_public_addresses, // Require that authority discovery records are signed. strict_record_validation: true, + persisted_cache_directory: net_config_path, ..Default::default() }, client, @@ -82,6 +84,7 @@ fn build_authority_discovery_service( Box::pin(dht_event_stream), authority_discovery_role, prometheus_registry, + task_manager.spawn_handle(), ); task_manager.spawn_handle().spawn( diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index bad72b4a44170..47ce9048d8111 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -480,6 +480,7 @@ where ); } + let network_config = config.network.clone(); let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { config, backend: backend.clone(), @@ -568,6 +569,7 @@ where public_addresses: auth_disc_public_addresses, // Require that authority discovery records are signed. strict_record_validation: true, + persisted_cache_directory: network_config.net_config_path, ..Default::default() }, client.clone(), @@ -575,6 +577,7 @@ where Box::pin(dht_event_stream), authority_discovery_role, prometheus_registry.clone(), + task_manager.spawn_handle(), ); task_manager.spawn_handle().spawn( diff --git a/prdoc/pr_8839.prdoc b/prdoc/pr_8839.prdoc new file mode 100644 index 0000000000000..55218f7bce5c3 --- /dev/null +++ b/prdoc/pr_8839.prdoc @@ -0,0 +1,23 @@ +title: "net/discovery: File persistence for AddrCache" +doc: + - audience: Node Dev + description: |- + Persisting the AddrCache periodically (every 10 minutes) and on worker + shutdown. Read AddrCache from file upon launch of worker. + + AddrCache is saved as authority_discovery_addr_cache.json in the + folder configured by net_config_path of NetworkConfiguration. + + This reduces the time it takes for a node to reconnect to peers after + restart. +crates: + - name: sc-authority-discovery + bump: major + - name: sc-network-types + bump: minor + - name: cumulus-relay-chain-minimal-node + bump: patch + - name: polkadot-service + bump: patch + - name: staging-node-cli + bump: patch diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 72e6d387ba91a..a080de61cf19f 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -547,6 +547,7 @@ pub fn new_full_base::Hash>>( task_manager.spawn_handle().spawn("mixnet", None, mixnet); } + let net_config_path = config.network.net_config_path.clone(); let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { config, backend: backend.clone(), @@ -659,6 +660,7 @@ pub fn new_full_base::Hash>>( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, public_addresses: auth_disc_public_addresses, + persisted_cache_directory: net_config_path, ..Default::default() }, client.clone(), @@ -666,6 +668,7 @@ pub fn new_full_base::Hash>>( Box::pin(dht_event_stream), authority_discovery_role, prometheus_registry.clone(), + task_manager.spawn_handle(), ); task_manager.spawn_handle().spawn( diff --git a/substrate/client/authority-discovery/Cargo.toml b/substrate/client/authority-discovery/Cargo.toml index dd147f6e2a553..8beab7e020ac5 100644 --- a/substrate/client/authority-discovery/Cargo.toml +++ b/substrate/client/authority-discovery/Cargo.toml @@ -30,6 +30,9 @@ rand = { workspace = true, default-features = true } sc-client-api = { workspace = true, default-features = true } sc-network = { workspace = true, default-features = true } sc-network-types = { workspace = true, default-features = true } +sc-service.workspace = true +serde.workspace = true +serde_json.workspace = true sp-api = { workspace = true, default-features = true } sp-authority-discovery = { workspace = true, default-features = true } sp-blockchain = { workspace = true, default-features = true } @@ -37,11 +40,14 @@ sp-core = { workspace = true, default-features = true } sp-keystore = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } thiserror = { workspace = true } +tokio.workspace = true [dev-dependencies] +hex.workspace = true quickcheck = { workspace = true } sp-tracing = { workspace = true, default-features = true } substrate-test-runtime-client = { workspace = true } +tempfile.workspace = true [build-dependencies] prost-build = { workspace = true } diff --git a/substrate/client/authority-discovery/src/error.rs b/substrate/client/authority-discovery/src/error.rs index 3f395e47922e2..d524ca1e9e9f7 100644 --- a/substrate/client/authority-discovery/src/error.rs +++ b/substrate/client/authority-discovery/src/error.rs @@ -52,6 +52,9 @@ pub enum Error { #[error("Failed to encode or decode scale payload.")] EncodingDecodingScale(#[from] codec::Error), + #[error("Failed to encode or decode AddrCache.")] + EncodingDecodingAddrCache(String), + #[error("Failed to parse a libp2p multi address.")] ParsingMultiaddress(#[from] sc_network::multiaddr::ParseError), diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index e674c51571eff..7b654a16e3d5b 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -33,7 +33,7 @@ pub use crate::{ worker::{AuthorityDiscovery, NetworkProvider, Role, Worker}, }; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration}; use futures::{ channel::{mpsc, oneshot}, @@ -44,8 +44,8 @@ use sc_network::{event::DhtEvent, Multiaddr}; use sc_network_types::PeerId; use sp_authority_discovery::AuthorityId; use sp_blockchain::HeaderBackend; +use sp_core::traits::SpawnNamed; use sp_runtime::traits::Block as BlockT; - mod error; mod interval; mod service; @@ -88,6 +88,11 @@ pub struct WorkerConfig { /// /// Defaults to `false` to provide compatibility with old versions pub strict_record_validation: bool, + + /// The directory of where the persisted AddrCache file is located, + /// optional since NetworkConfiguration's `net_config_path` field + /// is optional. If None, we won't persist the AddrCache at all. + pub persisted_cache_directory: Option, } impl Default for WorkerConfig { @@ -110,6 +115,7 @@ impl Default for WorkerConfig { publish_non_global_ips: true, public_addresses: Vec::new(), strict_record_validation: false, + persisted_cache_directory: None, } } } @@ -123,6 +129,7 @@ pub fn new_worker_and_service( dht_event_rx: DhtEventStream, role: Role, prometheus_registry: Option, + spawner: impl SpawnNamed + 'static, ) -> (Worker, Service) where Block: BlockT + Unpin + 'static, @@ -136,6 +143,7 @@ where dht_event_rx, role, prometheus_registry, + spawner, ) } @@ -149,6 +157,7 @@ pub fn new_worker_and_service_with_config( dht_event_rx: DhtEventStream, role: Role, prometheus_registry: Option, + spawner: impl SpawnNamed + 'static, ) -> (Worker, Service) where Block: BlockT + Unpin + 'static, @@ -157,8 +166,16 @@ where { let (to_worker, from_service) = mpsc::channel(0); - let worker = - Worker::new(from_service, client, network, dht_event_rx, role, prometheus_registry, config); + let worker = Worker::new( + from_service, + client, + network, + dht_event_rx, + role, + prometheus_registry, + config, + spawner, + ); let service = Service::new(to_worker); (worker, service) diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index a73515ee00d26..ac4c55e5df14b 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -17,11 +17,12 @@ // along with this program. If not, see . use crate::{ - new_worker_and_service, + new_worker_and_service_with_config, worker::{ tests::{TestApi, TestNetwork}, - Role, + AddrCache, Role, }, + WorkerConfig, }; use futures::{channel::mpsc::channel, executor::LocalPool, task::LocalSpawn}; @@ -30,11 +31,19 @@ use std::{collections::HashSet, sync::Arc}; use sc_network::{multiaddr::Protocol, Multiaddr, PeerId}; use sp_authority_discovery::AuthorityId; -use sp_core::crypto::key_types; +use sp_core::{crypto::key_types, testing::TaskExecutor, traits::SpawnNamed}; use sp_keystore::{testing::MemoryKeystore, Keystore}; -#[test] -fn get_addresses_and_authority_id() { +pub(super) fn create_spawner() -> Box { + Box::new(TaskExecutor::new()) +} + +pub(super) fn test_config(path_buf: Option) -> WorkerConfig { + WorkerConfig { persisted_cache_directory: path_buf, ..Default::default() } +} + +#[tokio::test] +async fn get_addresses_and_authority_id() { let (_dht_event_tx, dht_event_rx) = channel(0); let network: Arc = Arc::new(Default::default()); @@ -57,12 +66,16 @@ fn get_addresses_and_authority_id() { let test_api = Arc::new(TestApi { authorities: vec![] }); - let (mut worker, mut service) = new_worker_and_service( + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); + let (mut worker, mut service) = new_worker_and_service_with_config( + test_config(Some(path)), test_api, network.clone(), Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, + create_spawner(), ); worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]); @@ -80,8 +93,8 @@ fn get_addresses_and_authority_id() { }); } -#[test] -fn cryptos_are_compatible() { +#[tokio::test] +async fn cryptos_are_compatible() { use sp_core::crypto::Pair; let libp2p_keypair = ed25519::Keypair::generate(); @@ -103,3 +116,53 @@ fn cryptos_are_compatible() { )); assert!(libp2p_public.verify(message, sp_core_signature.as_ref())); } + +#[tokio::test] +async fn when_addr_cache_is_persisted_with_authority_ids_then_when_worker_is_created_it_loads_the_persisted_cache( +) { + // ARRANGE + let (_dht_event_tx, dht_event_rx) = channel(0); + let mut pool = LocalPool::new(); + let key_store = MemoryKeystore::new(); + + let remote_authority_id: AuthorityId = pool.run_until(async { + key_store + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap() + .into() + }); + let remote_peer_id = PeerId::random(); + let remote_addr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333" + .parse::() + .unwrap() + .with(Protocol::P2p(remote_peer_id.into())); + + let tempdir = tempfile::tempdir().unwrap(); + let cache_path = tempdir.path().to_path_buf(); + + // persist the remote_authority_id and remote_addr in the cache + { + let mut addr_cache = AddrCache::default(); + addr_cache.insert(remote_authority_id.clone(), vec![remote_addr.clone()]); + let path_to_save = cache_path.join(crate::worker::ADDR_CACHE_FILE_NAME); + addr_cache.serialize_and_persist(&path_to_save); + } + + let (_, from_service) = futures::channel::mpsc::channel(0); + + // ACT + // Create a worker with the persisted cache + let worker = crate::worker::Worker::new( + from_service, + Arc::new(TestApi { authorities: vec![] }), + Arc::new(TestNetwork::default()), + Box::pin(dht_event_rx), + Role::PublishAndDiscover(key_store.into()), + None, + test_config(Some(cache_path)), + create_spawner(), + ); + + // ASSERT + assert!(worker.contains_authority(&remote_authority_id)); +} diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index 16cdf3cc632e7..4649896115698 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +pub(crate) use crate::worker::addr_cache::AddrCache; use crate::{ error::{Error, Result}, interval::ExpIncInterval, @@ -25,19 +26,19 @@ use crate::{ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, + path::PathBuf, sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}; -use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; use linked_hash_set::LinkedHashSet; use sc_network_types::kad::{Key, PeerRecord, Record}; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; @@ -53,7 +54,10 @@ use sp_authority_discovery::{ AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature, }; use sp_blockchain::HeaderBackend; -use sp_core::crypto::{key_types, ByteArray, Pair}; +use sp_core::{ + crypto::{key_types, ByteArray, Pair}, + traits::SpawnNamed, +}; use sp_keystore::{Keystore, KeystorePtr}; use sp_runtime::traits::Block as BlockT; @@ -69,6 +73,8 @@ mod schema { pub mod tests; const LOG_TARGET: &str = "sub-authority-discovery"; +pub(crate) const ADDR_CACHE_FILE_NAME: &str = "authority_discovery_addr_cache.json"; +const ADDR_CACHE_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 10); // 10 minutes /// Maximum number of addresses cached per authority. Additional addresses are discarded. const MAX_ADDRESSES_PER_AUTHORITY: usize = 16; @@ -186,6 +192,14 @@ pub struct Worker { role: Role, phantom: PhantomData, + + /// A spawner of tasks + spawner: Box, + + /// The directory of where the persisted AddrCache file is located, + /// optional since NetworkConfiguration's `net_config_path` field + /// is optional. If None, we won't persist the AddrCache at all. + persisted_cache_file_path: Option, } #[derive(Debug, Clone)] @@ -245,6 +259,7 @@ where role: Role, prometheus_registry: Option, config: WorkerConfig, + spawner: impl SpawnNamed + 'static, ) -> Self { // When a node starts up publishing and querying might fail due to various reasons, for // example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather @@ -261,7 +276,30 @@ where let publish_if_changed_interval = ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval); - let addr_cache = AddrCache::new(); + let maybe_persisted_cache_file_path = + config.persisted_cache_directory.as_ref().map(|dir| { + let mut path = dir.clone(); + path.push(ADDR_CACHE_FILE_NAME); + path + }); + + // If we have a path to persisted cache file, then we will try to + // load the contents of persisted cache from file, if it exists, and is valid. + // Create a new one otherwise. + let addr_cache: AddrCache = if let Some(persisted_cache_file_path) = + maybe_persisted_cache_file_path.as_ref() + { + let loaded = + AddrCache::try_from(persisted_cache_file_path.as_path()).unwrap_or_else(|e| { + info!(target: LOG_TARGET, "Failed to load AddrCache from file, using empty instead: {}", e); + AddrCache::new() + }); + info!(target: LOG_TARGET, "Loaded persisted AddrCache with {} authority ids.", loaded.num_authority_ids()); + loaded + } else { + info!(target: LOG_TARGET, "No persisted cache file path provided, authority discovery will not persist the address cache to disk."); + AddrCache::new() + }; let metrics = match prometheus_registry { Some(registry) => match Metrics::register(®istry) { @@ -308,20 +346,43 @@ where warn_public_addresses: false, phantom: PhantomData, last_known_records: HashMap::new(), + spawner: Box::new(spawner), + persisted_cache_file_path: maybe_persisted_cache_file_path, } } + /// Persists `AddrCache` to disk if the `persisted_cache_file_path` is set. + pub fn persist_addr_cache_if_supported(&self) { + let Some(path) = self.persisted_cache_file_path.as_ref().cloned() else { + return; + }; + let cloned_cache = self.addr_cache.clone(); + self.spawner.spawn_blocking( + "persist-addr-cache", + Some("authority-discovery-worker"), + Box::pin(async move { + cloned_cache.serialize_and_persist(path); + }), + ) + } + /// Start the worker pub async fn run(mut self) { + let mut persist_interval = tokio::time::interval(ADDR_CACHE_PERSIST_INTERVAL); + loop { self.start_new_lookups(); futures::select! { + _ = persist_interval.tick().fuse() => { + self.persist_addr_cache_if_supported(); + }, // Process incoming events. event = self.dht_event_rx.next().fuse() => { if let Some(event) = event { self.handle_dht_event(event).await; } else { + self.persist_addr_cache_if_supported(); // This point is reached if the network has shut down, at which point there is not // much else to do than to shut down the authority discovery as well. return; @@ -1197,6 +1258,10 @@ impl Metrics { #[cfg(test)] impl Worker { pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec) { - self.addr_cache.insert(authority, addresses); + self.addr_cache.insert(authority, addresses) + } + + pub(crate) fn contains_authority(&self, authority: &AuthorityId) -> bool { + self.addr_cache.get_addresses_by_authority_id(authority).is_some() } } diff --git a/substrate/client/authority-discovery/src/worker/addr_cache.rs b/substrate/client/authority-discovery/src/worker/addr_cache.rs index 13bb990bf8b99..fe1819e64c1e3 100644 --- a/substrate/client/authority-discovery/src/worker/addr_cache.rs +++ b/substrate/client/authority-discovery/src/worker/addr_cache.rs @@ -16,14 +16,24 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::error::Error; +use log::{info, warn}; use sc_network::{multiaddr::Protocol, Multiaddr}; use sc_network_types::PeerId; +use serde::{Deserialize, Serialize}; use sp_authority_discovery::AuthorityId; -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use sp_runtime::DeserializeOwned; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + fs::File, + io::{self, BufReader, Write}, + path::Path, +}; /// Cache for [`AuthorityId`] -> [`HashSet`] and [`PeerId`] -> [`HashSet`] /// mappings. -pub(super) struct AddrCache { +#[derive(Default, Clone, PartialEq, Debug)] +pub(crate) struct AddrCache { /// The addresses found in `authority_id_to_addresses` are guaranteed to always match /// the peerids found in `peer_id_to_authority_ids`. In other words, these two hashmaps /// are similar to a bi-directional map. @@ -35,14 +45,116 @@ pub(super) struct AddrCache { peer_id_to_authority_ids: HashMap>, } +impl Serialize for AddrCache { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + SerializeAddrCache::from(self.clone()).serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for AddrCache { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + SerializeAddrCache::deserialize(deserializer).map(Into::into) + } +} + +/// A storage and serialization time optimized version of `AddrCache` +/// which contains the bare minimum info to reconstruct the AddrCache. We +/// rely on the fact that the `peer_id_to_authority_ids` can be reconstructed from +/// the `authority_id_to_addresses` field. +/// +/// Benchmarks show that this is about 2x faster to serialize and about 4x faster to deserialize +/// compared to the full `AddrCache`. +/// +/// Storage wise it is about half the size of the full `AddrCache`. +/// +/// This is used to persist the `AddrCache` to disk and load it back. +/// +/// AddrCache impl of Serialize and Deserialize "piggybacks" on this struct. +#[derive(Serialize, Deserialize)] +struct SerializeAddrCache { + authority_id_to_addresses: HashMap>, +} + +impl From for AddrCache { + fn from(value: SerializeAddrCache) -> Self { + let mut peer_id_to_authority_ids: HashMap> = HashMap::new(); + + for (authority_id, addresses) in &value.authority_id_to_addresses { + for peer_id in addresses_to_peer_ids(addresses) { + peer_id_to_authority_ids + .entry(peer_id) + .or_insert_with(HashSet::new) + .insert(authority_id.clone()); + } + } + + AddrCache { + authority_id_to_addresses: value.authority_id_to_addresses, + peer_id_to_authority_ids, + } + } +} +impl From for SerializeAddrCache { + fn from(value: AddrCache) -> Self { + Self { authority_id_to_addresses: value.authority_id_to_addresses } + } +} + +fn write_to_file(path: impl AsRef, contents: &str) -> io::Result<()> { + let path = path.as_ref(); + let mut file = File::create(path)?; + file.write_all(contents.as_bytes())?; + file.flush()?; + Ok(()) +} + +impl TryFrom<&Path> for AddrCache { + type Error = Error; + + fn try_from(path: &Path) -> Result { + // Try to load from the cache file if it exists and is valid. + load_from_file::(&path).map_err(|e| { + Error::EncodingDecodingAddrCache(format!( + "Failed to load AddrCache from file: {}, error: {:?}", + path.display(), + e + )) + }) + } +} impl AddrCache { pub fn new() -> Self { - AddrCache { - authority_id_to_addresses: HashMap::new(), - peer_id_to_authority_ids: HashMap::new(), + AddrCache::default() + } + + fn serialize(&self) -> Option { + serde_json::to_string_pretty(self).inspect_err(|e| { + warn!(target: super::LOG_TARGET, "Failed to serialize AddrCache to JSON: {} => skip persisting it.", e); + }).ok() + } + + fn persist(path: impl AsRef, serialized_cache: String) { + match write_to_file(path.as_ref(), &serialized_cache) { + Err(err) => { + warn!(target: super::LOG_TARGET, "Failed to persist AddrCache on disk at path: {}, error: {}", path.as_ref().display(), err); + }, + Ok(_) => { + info!(target: super::LOG_TARGET, "Successfully persisted AddrCache on disk"); + }, } } + pub fn serialize_and_persist(&self, path: impl AsRef) { + let Some(serialized) = self.serialize() else { return }; + Self::persist(path, serialized); + } + /// Inserts the given [`AuthorityId`] and [`Vec`] pair for future lookups by /// [`AuthorityId`] or [`PeerId`]. pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec) { @@ -56,7 +168,6 @@ impl AddrCache { authority_id, addresses, ); - return } else if peer_ids.len() > 1 { log::warn!( @@ -172,8 +283,21 @@ fn addresses_to_peer_ids(addresses: &HashSet) -> HashSet { addresses.iter().filter_map(peer_id_from_multiaddr).collect::>() } +fn load_from_file(path: impl AsRef) -> io::Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + + serde_json::from_reader(reader).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} + #[cfg(test)] mod tests { + + use std::{ + thread::sleep, + time::{Duration, Instant}, + }; + use super::*; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; @@ -280,6 +404,12 @@ mod tests { .quickcheck(property as fn(_, _, _) -> TestResult) } + #[test] + fn test_from_to_serializable() { + let serializable = SerializeAddrCache::from(AddrCache::sample()); + let roundtripped = AddrCache::from(serializable); + assert_eq!(roundtripped, AddrCache::sample()) + } #[test] fn keeps_consistency_between_authority_id_and_peer_id() { fn property( @@ -381,4 +511,165 @@ mod tests { addr_cache.get_addresses_by_authority_id(&authority_id1).unwrap() ); } + + impl AddrCache { + pub fn sample() -> Self { + let mut addr_cache = AddrCache::new(); + + let peer_id = PeerId::from_multihash( + Multihash::wrap(Code::Sha2_256.into(), &[0xab; 32]).unwrap(), + ) + .unwrap(); + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); + let authority_id0 = AuthorityPair::from_seed(&[0xaa; 32]).public(); + let authority_id1 = AuthorityPair::from_seed(&[0xbb; 32]).public(); + + addr_cache.insert(authority_id0.clone(), vec![addr.clone()]); + addr_cache.insert(authority_id1.clone(), vec![addr.clone()]); + addr_cache + } + } + + #[test] + fn serde_json() { + let sample = || AddrCache::sample(); + let serializable = AddrCache::from(sample()); + let json = serde_json::to_string(&serializable).expect("Serialization should not fail"); + let deserialized = serde_json::from_str::(&json).unwrap(); + let from_serializable = AddrCache::try_from(deserialized).unwrap(); + assert_eq!(sample(), from_serializable); + } + + #[test] + fn deserialize_from_json() { + let json = r#" + { + "authority_id_to_addresses": { + "5FjfMGrqw9ck5XZaPVTKm2RE5cbwoVUfXvSGZY7KCUEFtdr7": [ + "/p2p/QmZtnFaddFtzGNT8BxdHVbQrhSFdq1pWxud5z4fA4kxfDt" + ], + "5DiQDBQvjFkmUF3C8a7ape5rpRPoajmMj44Q9CTGPfVBaa6U": [ + "/p2p/QmZtnFaddFtzGNT8BxdHVbQrhSFdq1pWxud5z4fA4kxfDt" + ] + } + } + "#; + let deserialized = serde_json::from_str::(json).unwrap(); + assert_eq!(deserialized, AddrCache::sample()) + } + + fn serialize_and_write_to_file( + path: impl AsRef, + contents: &T, + ) -> io::Result<()> { + let serialized = serde_json::to_string_pretty(contents).unwrap(); + write_to_file(path, &serialized) + } + + #[test] + fn test_load_cache_from_disc() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cache.json"); + let sample = AddrCache::sample(); + assert_eq!(sample.num_authority_ids(), 2); + serialize_and_write_to_file(&path, &sample).unwrap(); + sleep(Duration::from_millis(10)); // Ensure file is written before loading + let cache = AddrCache::try_from(path.as_path()).unwrap(); + assert_eq!(cache.num_authority_ids(), 2); + } + + fn create_cache(authority_id_count: u64, multiaddr_per_authority_count: u64) -> AddrCache { + let mut addr_cache = AddrCache::new(); + + for i in 0..authority_id_count { + let seed = &mut [0xab as u8; 32]; + let i_bytes = i.to_le_bytes(); + seed[0..8].copy_from_slice(&i_bytes); + + let authority_id = AuthorityPair::from_seed(seed).public(); + let multi_addresses = (0..multiaddr_per_authority_count) + .map(|j| { + let mut digest = [0xab; 32]; + let j_bytes = j.to_le_bytes(); + digest[0..8].copy_from_slice(&j_bytes); + let peer_id = PeerId::from_multihash( + Multihash::wrap(Code::Sha2_256.into(), &digest).unwrap(), + ) + .unwrap(); + Multiaddr::empty().with(Protocol::P2p(peer_id.into())) + }) + .collect::>(); + + assert_eq!(multi_addresses.len(), multiaddr_per_authority_count as usize); + addr_cache.insert(authority_id.clone(), multi_addresses); + } + assert_eq!(addr_cache.authority_id_to_addresses.len(), authority_id_count as usize); + + addr_cache + } + + /// This test is ignored by default as it takes a long time to run. + #[test] + #[ignore] + fn addr_cache_measure_serde_performance() { + let addr_cache = create_cache(1000, 5); + + /// A replica of `AddrCache` that is serializable and deserializable + /// without any optimizations. + #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] + pub(crate) struct NaiveSerdeAddrCache { + authority_id_to_addresses: HashMap>, + peer_id_to_authority_ids: HashMap>, + } + impl From for NaiveSerdeAddrCache { + fn from(value: AddrCache) -> Self { + Self { + authority_id_to_addresses: value.authority_id_to_addresses, + peer_id_to_authority_ids: value.peer_id_to_authority_ids, + } + } + } + + let naive = NaiveSerdeAddrCache::from(addr_cache.clone()); + let storage_optimized = addr_cache.clone(); + + fn measure_clone(data: &T) -> Duration { + let start = Instant::now(); + let _ = data.clone(); + start.elapsed() + } + fn measure_serialize(data: &T) -> (Duration, String) { + let start = Instant::now(); + let json = serde_json::to_string_pretty(data).unwrap(); + (start.elapsed(), json) + } + fn measure_deserialize(json: String) -> (Duration, T) { + let start = Instant::now(); + let value = serde_json::from_str(&json).unwrap(); + (start.elapsed(), value) + } + + let serialize_naive = measure_serialize(&naive); + let serialize_storage_optimized = measure_serialize(&storage_optimized); + println!("CLONE: Naive took: {} ms", measure_clone(&naive).as_millis()); + println!( + "CLONE: Storage optimized took: {} ms", + measure_clone(&storage_optimized).as_millis() + ); + println!("SERIALIZE: Naive took: {} ms", serialize_naive.0.as_millis()); + println!( + "SERIALIZE: Storage optimized took: {} ms", + serialize_storage_optimized.0.as_millis() + ); + let deserialize_naive = measure_deserialize::(serialize_naive.1); + let deserialize_storage_optimized = + measure_deserialize::(serialize_storage_optimized.1); + println!("DESERIALIZE: Naive took: {} ms", deserialize_naive.0.as_millis()); + println!( + "DESERIALIZE: Storage optimized took: {} ms", + deserialize_storage_optimized.0.as_millis() + ); + assert_eq!(deserialize_naive.1, naive); + assert_eq!(deserialize_storage_optimized.1, storage_optimized); + } } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index ce3e6bfaa2bc3..00c75e2d2a912 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -23,6 +23,9 @@ use std::{ time::Instant, }; +use crate::tests::{create_spawner, test_config}; + +use super::*; use futures::{ channel::mpsc::{self, channel}, executor::{block_on, LocalPool}, @@ -46,8 +49,6 @@ use sp_keystore::{testing::MemoryKeystore, Keystore}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use substrate_test_runtime_client::runtime::Block; -use super::*; - #[derive(Clone)] pub(crate) struct TestApi { pub(crate) authorities: Vec, @@ -309,8 +310,8 @@ fn build_dht_event( kv_pairs } -#[test] -fn new_registers_metrics() { +#[tokio::test] +async fn new_registers_metrics() { let (_dht_event_tx, dht_event_rx) = mpsc::channel(1000); let network: Arc = Arc::new(Default::default()); let key_store = MemoryKeystore::new(); @@ -318,6 +319,8 @@ fn new_registers_metrics() { let registry = prometheus_endpoint::Registry::new(); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let (_to_worker, from_service) = mpsc::channel(0); Worker::new( from_service, @@ -326,14 +329,15 @@ fn new_registers_metrics() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), Some(registry.clone()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); assert!(registry.gather().len() > 0); } -#[test] -fn triggers_dht_get_query() { +#[tokio::test] +async fn triggers_dht_get_query() { sp_tracing::try_init_simple(); let (_dht_event_tx, dht_event_rx) = channel(1000); @@ -348,6 +352,8 @@ fn triggers_dht_get_query() { let key_store = MemoryKeystore::new(); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -355,7 +361,8 @@ fn triggers_dht_get_query() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); futures::executor::block_on(async { @@ -365,8 +372,8 @@ fn triggers_dht_get_query() { }) } -#[test] -fn publish_discover_cycle() { +#[tokio::test] +async fn publish_discover_cycle() { sp_tracing::try_init_simple(); let mut pool = LocalPool::new(); @@ -378,7 +385,6 @@ fn publish_discover_cycle() { let network: Arc = Arc::new(Default::default()); let key_store = MemoryKeystore::new(); - let _ = pool.spawner().spawn_local_obj( async move { let node_a_public = @@ -386,6 +392,9 @@ fn publish_discover_cycle() { let test_api = Arc::new(TestApi { authorities: vec![node_a_public.into()] }); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let temppath = tempdir.path(); + let path = temppath.to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -393,7 +402,8 @@ fn publish_discover_cycle() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); worker.publish_ext_addresses(false).await.unwrap(); @@ -420,6 +430,9 @@ fn publish_discover_cycle() { let key_store = MemoryKeystore::new(); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let temppath = tempdir.path(); + let path = temppath.to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -427,7 +440,8 @@ fn publish_discover_cycle() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); dht_event_tx.try_send(dht_event.clone()).unwrap(); @@ -447,12 +461,13 @@ fn publish_discover_cycle() { /// Don't terminate when sender side of service channel is dropped. Terminate when network event /// stream terminates. -#[test] -fn terminate_when_event_stream_terminates() { +#[tokio::test] +async fn terminate_when_event_stream_terminates() { let (dht_event_tx, dht_event_rx) = channel(1000); let network: Arc = Arc::new(Default::default()); let key_store = MemoryKeystore::new(); let test_api = Arc::new(TestApi { authorities: vec![] }); + let path = tempfile::tempdir().unwrap().path().to_path_buf(); let (to_worker, from_service) = mpsc::channel(0); let worker = Worker::new( @@ -462,7 +477,8 @@ fn terminate_when_event_stream_terminates() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ) .run(); futures::pin_mut!(worker); @@ -492,8 +508,8 @@ fn terminate_when_event_stream_terminates() { }); } -#[test] -fn dont_stop_polling_dht_event_stream_after_bogus_event() { +#[tokio::test] +async fn dont_stop_polling_dht_event_stream_after_bogus_event() { let remote_multiaddr = { let peer_id = PeerId::random(); let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); @@ -518,6 +534,8 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { let mut pool = LocalPool::new(); let (mut to_worker, from_service) = mpsc::channel(1); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -525,7 +543,8 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { Box::pin(dht_event_rx), Role::PublishAndDiscover(Arc::new(key_store)), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); // Spawn the authority discovery to make sure it is polled independently. @@ -652,7 +671,17 @@ impl DhtValueFoundTester { Box::pin(dht_event_rx), Role::PublishAndDiscover(Arc::new(local_key_store)), None, - WorkerConfig { strict_record_validation, ..Default::default() }, + WorkerConfig { + strict_record_validation, + persisted_cache_directory: Some( + tempfile::tempdir() + .expect("Should be able to create tmp dir") + .path() + .to_path_buf(), + ), + ..Default::default() + }, + create_spawner(), )); (self.local_worker.as_mut().unwrap(), Some(local_network)) }; @@ -681,8 +710,8 @@ impl DhtValueFoundTester { } } -#[test] -fn limit_number_of_addresses_added_to_cache_per_authority() { +#[tokio::test] +async fn limit_number_of_addresses_added_to_cache_per_authority() { let mut tester = DhtValueFoundTester::new(); assert!(MAX_ADDRESSES_PER_AUTHORITY < 100); let addresses = (1..100).map(|i| tester.multiaddr_with_peer_id(i)).collect(); @@ -698,8 +727,8 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { assert_eq!(MAX_ADDRESSES_PER_AUTHORITY, cached_remote_addresses.unwrap().len()); } -#[test] -fn strict_accept_address_with_peer_signature() { +#[tokio::test] +async fn strict_accept_address_with_peer_signature() { let mut tester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -719,8 +748,8 @@ fn strict_accept_address_with_peer_signature() { ); } -#[test] -fn strict_accept_address_without_creation_time() { +#[tokio::test] +async fn strict_accept_address_without_creation_time() { let mut tester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -740,8 +769,8 @@ fn strict_accept_address_without_creation_time() { ); } -#[test] -fn keep_last_received_if_no_creation_time() { +#[tokio::test] +async fn keep_last_received_if_no_creation_time() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -787,8 +816,8 @@ fn keep_last_received_if_no_creation_time() { .unwrap_or_default()); } -#[test] -fn records_with_incorrectly_signed_creation_time_are_ignored() { +#[tokio::test] +async fn records_with_incorrectly_signed_creation_time_are_ignored() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -841,8 +870,8 @@ fn records_with_incorrectly_signed_creation_time_are_ignored() { .unwrap_or_default()); } -#[test] -fn newer_records_overwrite_older_ones() { +#[tokio::test] +async fn newer_records_overwrite_older_ones() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let old_record = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -892,8 +921,8 @@ fn newer_records_overwrite_older_ones() { assert_eq!(result.1.len(), 1); } -#[test] -fn older_records_dont_affect_newer_ones() { +#[tokio::test] +async fn older_records_dont_affect_newer_ones() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let old_record = tester.multiaddr_with_peer_id(1); let old_kv_pairs = build_dht_event( @@ -943,8 +972,8 @@ fn older_records_dont_affect_newer_ones() { assert_eq!(update_peers_info.1.len(), 1); } -#[test] -fn reject_address_with_rogue_peer_signature() { +#[tokio::test] +async fn reject_address_with_rogue_peer_signature() { let mut tester = DhtValueFoundTester::new(); let rogue_remote_node_key = Keypair::generate_ed25519(); let kv_pairs = build_dht_event( @@ -963,8 +992,8 @@ fn reject_address_with_rogue_peer_signature() { ); } -#[test] -fn reject_address_with_invalid_peer_signature() { +#[tokio::test] +async fn reject_address_with_invalid_peer_signature() { let mut tester = DhtValueFoundTester::new(); let mut kv_pairs = build_dht_event( vec![tester.multiaddr_with_peer_id(1)], @@ -986,8 +1015,8 @@ fn reject_address_with_invalid_peer_signature() { ); } -#[test] -fn reject_address_without_peer_signature() { +#[tokio::test] +async fn reject_address_without_peer_signature() { let mut tester = DhtValueFoundTester::new(); let kv_pairs = build_dht_event::( vec![tester.multiaddr_with_peer_id(1)], @@ -1002,8 +1031,8 @@ fn reject_address_without_peer_signature() { assert!(cached_remote_addresses.is_none(), "Expected worker to ignore unsigned record.",); } -#[test] -fn do_not_cache_addresses_without_peer_id() { +#[tokio::test] +async fn do_not_cache_addresses_without_peer_id() { let mut tester = DhtValueFoundTester::new(); let multiaddr_with_peer_id = tester.multiaddr_with_peer_id(1); let multiaddr_without_peer_id: Multiaddr = @@ -1025,8 +1054,8 @@ fn do_not_cache_addresses_without_peer_id() { ); } -#[test] -fn addresses_to_publish_adds_p2p() { +#[tokio::test] +async fn addresses_to_publish_adds_p2p() { let (_dht_event_tx, dht_event_rx) = channel(1000); let network: Arc = Arc::new(Default::default()); @@ -1036,6 +1065,8 @@ fn addresses_to_publish_adds_p2p() { )); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: vec![] }), @@ -1043,7 +1074,8 @@ fn addresses_to_publish_adds_p2p() { Box::pin(dht_event_rx), Role::PublishAndDiscover(MemoryKeystore::new().into()), Some(prometheus_endpoint::Registry::new()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); assert!( @@ -1057,8 +1089,8 @@ fn addresses_to_publish_adds_p2p() { /// Ensure [`Worker::addresses_to_publish`] does not add an additional `p2p` protocol component in /// case one already exists. -#[test] -fn addresses_to_publish_respects_existing_p2p_protocol() { +#[tokio::test] +async fn addresses_to_publish_respects_existing_p2p_protocol() { let (_dht_event_tx, dht_event_rx) = channel(1000); let identity = Keypair::generate_ed25519(); let peer_id = identity.public().to_peer_id(); @@ -1074,6 +1106,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { }); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: vec![] }), @@ -1081,7 +1115,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { Box::pin(dht_event_rx), Role::PublishAndDiscover(MemoryKeystore::new().into()), Some(prometheus_endpoint::Registry::new()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); assert_eq!( @@ -1091,8 +1126,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { ); } -#[test] -fn lookup_throttling() { +#[tokio::test] +async fn lookup_throttling() { let remote_multiaddr = { let peer_id = PeerId::random(); let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); @@ -1118,6 +1153,8 @@ fn lookup_throttling() { let mut network = TestNetwork::default(); let mut receiver = network.get_event_receiver().unwrap(); let network = Arc::new(network); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), @@ -1125,7 +1162,8 @@ fn lookup_throttling() { dht_event_rx.boxed(), Role::Discover, Some(default_registry().clone()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); let mut pool = LocalPool::new(); @@ -1201,8 +1239,8 @@ fn lookup_throttling() { ); } -#[test] -fn test_handle_put_record_request() { +#[tokio::test] +async fn test_handle_put_record_request() { let local_node_network = TestNetwork::default(); let remote_node_network = TestNetwork::default(); let peer_id = remote_node_network.peer_id; @@ -1236,6 +1274,8 @@ fn test_handle_put_record_request() { let (_dht_event_tx, dht_event_rx) = channel(1); let (_to_worker, from_service) = mpsc::channel(0); let network = Arc::new(local_node_network); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), @@ -1243,7 +1283,8 @@ fn test_handle_put_record_request() { dht_event_rx.boxed(), Role::Discover, Some(default_registry().clone()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); let mut pool = LocalPool::new(); diff --git a/substrate/client/network/types/Cargo.toml b/substrate/client/network/types/Cargo.toml index ab6ce5efcd49a..62ab57a5838a7 100644 --- a/substrate/client/network/types/Cargo.toml +++ b/substrate/client/network/types/Cargo.toml @@ -20,6 +20,8 @@ log = { workspace = true, default-features = true } multiaddr = { workspace = true } multihash = { workspace = true } rand = { workspace = true, default-features = true } +serde.workspace = true +serde_with.workspace = true thiserror = { workspace = true } zeroize = { workspace = true } diff --git a/substrate/client/network/types/src/multiaddr.rs b/substrate/client/network/types/src/multiaddr.rs index 925e24fe70d6d..d938202b8977b 100644 --- a/substrate/client/network/types/src/multiaddr.rs +++ b/substrate/client/network/types/src/multiaddr.rs @@ -21,6 +21,7 @@ use litep2p::types::multiaddr::{ Protocol as LiteP2pProtocol, }; use multiaddr::Multiaddr as LibP2pMultiaddr; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::{ fmt::{self, Debug, Display}, net::{IpAddr, Ipv4Addr, Ipv6Addr}, @@ -36,7 +37,7 @@ pub use crate::build_multiaddr as multiaddr; /// [`Multiaddr`] type used in Substrate. Converted to libp2p's `Multiaddr` /// or litep2p's `Multiaddr` when passed to the corresponding network backend. -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, SerializeDisplay, DeserializeFromStr)] pub struct Multiaddr { multiaddr: LiteP2pMultiaddr, } diff --git a/substrate/client/network/types/src/peer_id.rs b/substrate/client/network/types/src/peer_id.rs index 076be0a66c7b7..124b955d48a65 100644 --- a/substrate/client/network/types/src/peer_id.rs +++ b/substrate/client/network/types/src/peer_id.rs @@ -21,6 +21,7 @@ use crate::{ multihash::{Code, Error, Multihash}, }; use rand::Rng; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::{fmt, hash::Hash, str::FromStr}; @@ -32,7 +33,9 @@ const MAX_INLINE_KEY_LENGTH: usize = 42; /// /// The data is a CIDv0 compatible multihash of the protobuf encoded public key of the peer /// as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md). -#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive( + Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, SerializeDisplay, DeserializeFromStr, +)] pub struct PeerId { multihash: Multihash, }