diff --git a/Cargo.lock b/Cargo.lock index 85986af3674ae..1960073ca95cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -18682,6 +18682,7 @@ dependencies = [ "async-trait", "futures", "futures-timer", + "hex", "ip_network", "libp2p", "linked_hash_set", @@ -18695,7 +18696,14 @@ dependencies = [ "sc-client-api", "sc-network", "sc-network-types", +<<<<<<< HEAD "sp-api 35.0.0", +======= + "sc-service", + "serde", + "serde_json", + "sp-api 26.0.0", +>>>>>>> ee6d22b9 (net/discovery: File persistence for `AddrCache` (#8839)) "sp-authority-discovery", "sp-blockchain", "sp-core 35.0.0", @@ -18704,7 +18712,9 @@ dependencies = [ "sp-tracing 17.0.1", "substrate-prometheus-endpoint", "substrate-test-runtime-client", + "tempfile", "thiserror 1.0.65", + "tokio", ] [[package]] @@ -19725,6 +19735,8 @@ dependencies = [ "multihash 0.19.1", "quickcheck", "rand 0.8.5", + "serde", + "serde_with", "thiserror 1.0.65", "zeroize", ] @@ -20717,6 +20729,34 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" +dependencies = [ + "base64 0.22.1", + "chrono", + "hex", + "serde", + "serde_derive", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" +dependencies = [ + "darling", + "proc-macro2 1.0.95", + "quote 1.0.40", + "syn 2.0.98", +] + [[package]] name = "serde_yaml" version = "0.9.34+deprecated" diff --git a/Cargo.toml b/Cargo.toml index 2f3df9dfc1d8b..c144d1928a496 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1207,6 +1207,7 @@ serde = { version = "1.0.214", default-features = false } serde-big-array = { version = "0.3.2" } serde_derive = { version = "1.0.117" } serde_json = { version = "1.0.132", default-features = false } +serde_with = { version = "3.12.0", default-features = false, features = ["hex", "macros"] } serde_yaml = { version = "0.9" } sha1 = { version = "0.10.6" } sha2 = { version = "0.10.7", default-features = false } diff --git a/cumulus/client/relay-chain-minimal-node/src/lib.rs b/cumulus/client/relay-chain-minimal-node/src/lib.rs index a3d858ea40c92..391fecb082b3b 100644 --- a/cumulus/client/relay-chain-minimal-node/src/lib.rs +++ b/cumulus/client/relay-chain-minimal-node/src/lib.rs @@ -67,12 +67,14 @@ fn build_authority_discovery_service( _ => None, } }); + let net_config_path = config.network.net_config_path.clone(); let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, public_addresses: auth_disc_public_addresses, // Require that authority discovery records are signed. strict_record_validation: true, + persisted_cache_directory: net_config_path, ..Default::default() }, client, @@ -80,6 +82,7 @@ fn build_authority_discovery_service( Box::pin(dht_event_stream), authority_discovery_role, prometheus_registry, + task_manager.spawn_handle(), ); task_manager.spawn_handle().spawn( diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs new file mode 100644 index 0000000000000..47ce9048d8111 --- /dev/null +++ b/polkadot/node/service/src/builder/mod.rs @@ -0,0 +1,863 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Polkadot service builder. + +#![cfg(feature = "full-node")] + +mod partial; +use partial::PolkadotPartialComponents; +pub(crate) use partial::{new_partial, new_partial_basics}; + +use crate::{ + grandpa_support, open_database, + overseer::{ExtendedOverseerGenArgs, OverseerGen, OverseerGenArgs}, + parachains_db, + relay_chain_selection::SelectRelayChain, + workers, Chain, Error, FullBackend, FullClient, IdentifyVariant, IsParachainNode, + GRANDPA_JUSTIFICATION_PERIOD, KEEP_FINALIZED_FOR_LIVE_NETWORKS, +}; +use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE; +use gum::info; +use mmr_gadget::MmrGadget; +use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD; +use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; +use polkadot_node_core_av_store::Config as AvailabilityConfig; +use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; +use polkadot_node_core_chain_selection::{ + self as chain_selection_subsystem, Config as ChainSelectionConfig, +}; +use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; +use polkadot_node_network_protocol::{ + peer_set::{PeerSet, PeerSetProtocolNames}, + request_response::{IncomingRequest, ReqProtocolNames}, +}; +use polkadot_node_subsystem_types::DefaultSubsystemClient; +use polkadot_overseer::{Handle, OverseerConnector}; +use polkadot_primitives::Block; +use sc_client_api::Backend; +use sc_network::config::FullNetworkConfiguration; +use sc_network_sync::WarpSyncConfig; +use sc_service::{Configuration, RpcHandlers, TaskManager}; +use sc_sysinfo::Metric; +use sc_telemetry::TelemetryWorkerHandle; +use sc_transaction_pool_api::OffchainTransactionPoolFactory; +use sp_consensus_beefy::ecdsa_crypto; +use sp_runtime::traits::Block as BlockT; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +/// Polkadot node service initialization parameters. +pub struct NewFullParams { + pub is_parachain_node: IsParachainNode, + pub enable_beefy: bool, + /// Whether to enable the block authoring backoff on production networks + /// where it isn't enabled by default. + pub force_authoring_backoff: bool, + pub telemetry_worker_handle: Option, + /// The version of the node. TESTING ONLY: `None` can be passed to skip the node/worker version + /// check, both on startup and in the workers. + pub node_version: Option, + /// Whether the node is attempting to run as a secure validator. + pub secure_validator_mode: bool, + /// An optional path to a directory containing the workers. + pub workers_path: Option, + /// Optional custom names for the prepare and execute workers. + pub workers_names: Option<(String, String)>, + /// An optional number of the maximum number of pvf execute workers. + pub execute_workers_max_num: Option, + /// An optional maximum number of pvf workers that can be spawned in the pvf prepare pool for + /// tasks with the priority below critical. + pub prepare_workers_soft_max_num: Option, + /// An optional absolute number of pvf workers that can be spawned in the pvf prepare pool. + pub prepare_workers_hard_max_num: Option, + /// How long finalized data should be kept in the availability store (in hours) + pub keep_finalized_for: Option, + pub overseer_gen: OverseerGenerator, + pub overseer_message_channel_capacity_override: Option, + #[allow(dead_code)] + pub malus_finality_delay: Option, + pub hwbench: Option, +} + +/// Completely built polkadot node service. +pub struct NewFull { + pub task_manager: TaskManager, + pub client: Arc, + pub overseer_handle: Option, + pub network: Arc, + pub sync_service: Arc>, + pub rpc_handlers: RpcHandlers, + pub backend: Arc, +} + +pub struct PolkadotServiceBuilder +where + OverseerGenerator: OverseerGen, + Network: sc_network::NetworkBackend::Hash>, +{ + config: Configuration, + params: NewFullParams, + overseer_connector: OverseerConnector, + partial_components: PolkadotPartialComponents>, + net_config: FullNetworkConfiguration::Hash, Network>, +} + +impl PolkadotServiceBuilder +where + OverseerGenerator: OverseerGen, + Network: sc_network::NetworkBackend::Hash>, +{ + /// Create new polkadot service builder. + pub fn new( + mut config: Configuration, + params: NewFullParams, + ) -> Result, Error> { + let basics = new_partial_basics(&mut config, params.telemetry_worker_handle.clone())?; + + let prometheus_registry = config.prometheus_registry().cloned(); + let overseer_connector = OverseerConnector::default(); + let overseer_handle = Handle::new(overseer_connector.handle()); + let auth_or_collator = config.role.is_authority() || params.is_parachain_node.is_collator(); + + let select_chain = if auth_or_collator { + let metrics = polkadot_node_subsystem_util::metrics::Metrics::register( + prometheus_registry.as_ref(), + )?; + + SelectRelayChain::new_with_overseer( + basics.backend.clone(), + overseer_handle.clone(), + metrics, + Some(basics.task_manager.spawn_handle()), + ) + } else { + SelectRelayChain::new_longest_chain(basics.backend.clone()) + }; + + let partial_components = + new_partial::>(&mut config, basics, select_chain)?; + + let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new( + &config.network, + config.prometheus_config.as_ref().map(|cfg| cfg.registry.clone()), + ); + + Ok(PolkadotServiceBuilder { + config, + params, + overseer_connector, + partial_components, + net_config, + }) + } + + /// Get the genesis hash of the polkadot service being built. + pub fn genesis_hash(&self) -> ::Hash { + self.partial_components.client.chain_info().genesis_hash + } + + /// Add extra request-response protocol to the polkadot service. + pub fn add_extra_request_response_protocol( + &mut self, + config: Network::RequestResponseProtocolConfig, + ) { + self.net_config.add_request_response_protocol(config); + } + + /// Build polkadot service. + pub fn build(self) -> Result { + let Self { + config, + params: + NewFullParams { + is_parachain_node, + enable_beefy, + force_authoring_backoff, + telemetry_worker_handle: _, + node_version, + secure_validator_mode, + workers_path, + workers_names, + overseer_gen, + overseer_message_channel_capacity_override, + malus_finality_delay: _malus_finality_delay, + hwbench, + execute_workers_max_num, + prepare_workers_soft_max_num, + prepare_workers_hard_max_num, + keep_finalized_for, + }, + overseer_connector, + partial_components: + sc_service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> { + client, + backend, + mut task_manager, + keystore_container, + select_chain, + import_queue, + transaction_pool, + other: + (rpc_extensions_builder, import_setup, rpc_setup, slot_duration, mut telemetry), + }, + mut net_config, + } = self; + + let role = config.role; + let auth_or_collator = config.role.is_authority() || is_parachain_node.is_collator(); + let is_offchain_indexing_enabled = config.offchain_worker.indexing_enabled; + let force_authoring = config.force_authoring; + let disable_grandpa = config.disable_grandpa; + let name = config.network.node_name.clone(); + let backoff_authoring_blocks = if !force_authoring_backoff && + (config.chain_spec.is_polkadot() || config.chain_spec.is_kusama()) + { + // the block authoring backoff is disabled by default on production networks + None + } else { + let mut backoff = sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging::default(); + + if config.chain_spec.is_rococo() || + config.chain_spec.is_versi() || + config.chain_spec.is_dev() + { + // on testnets that are in flux (like rococo or versi), finality has stalled + // sometimes due to operational issues and it's annoying to slow down block + // production to 1 block per hour. + backoff.max_interval = 10; + } + + Some(backoff) + }; + let shared_voter_state = rpc_setup; + let auth_disc_publish_non_global_ips = config.network.allow_non_globals_in_dht; + let auth_disc_public_addresses = config.network.public_addresses.clone(); + + let genesis_hash = client.chain_info().genesis_hash; + let peer_store_handle = net_config.peer_store_handle(); + + let prometheus_registry = config.prometheus_registry().cloned(); + let metrics = Network::register_notification_metrics( + config.prometheus_config.as_ref().map(|cfg| &cfg.registry), + ); + + // Note: GrandPa is pushed before the Polkadot-specific protocols. This doesn't change + // anything in terms of behaviour, but makes the logs more consistent with the other + // Substrate nodes. + let grandpa_protocol_name = + sc_consensus_grandpa::protocol_standard_name(&genesis_hash, &config.chain_spec); + let (grandpa_protocol_config, grandpa_notification_service) = + sc_consensus_grandpa::grandpa_peers_set_config::<_, Network>( + grandpa_protocol_name.clone(), + metrics.clone(), + Arc::clone(&peer_store_handle), + ); + net_config.add_notification_protocol(grandpa_protocol_config); + + let beefy_gossip_proto_name = + sc_consensus_beefy::gossip_protocol_name(&genesis_hash, config.chain_spec.fork_id()); + // `beefy_on_demand_justifications_handler` is given to `beefy-gadget` task to be run, + // while `beefy_req_resp_cfg` is added to `config.network.request_response_protocols`. + let (beefy_on_demand_justifications_handler, beefy_req_resp_cfg) = + sc_consensus_beefy::communication::request_response::BeefyJustifsRequestHandler::new::< + _, + Network, + >( + &genesis_hash, + config.chain_spec.fork_id(), + client.clone(), + prometheus_registry.clone(), + ); + let beefy_notification_service = match enable_beefy { + false => None, + true => { + let (beefy_notification_config, beefy_notification_service) = + sc_consensus_beefy::communication::beefy_peers_set_config::<_, Network>( + beefy_gossip_proto_name.clone(), + metrics.clone(), + Arc::clone(&peer_store_handle), + ); + + net_config.add_notification_protocol(beefy_notification_config); + net_config.add_request_response_protocol(beefy_req_resp_cfg); + Some(beefy_notification_service) + }, + }; + + // validation/collation protocols are enabled only if `Overseer` is enabled + let peerset_protocol_names = + PeerSetProtocolNames::new(genesis_hash, config.chain_spec.fork_id()); + + // If this is a validator or running alongside a parachain node, we need to enable the + // networking protocols. + // + // Collators and parachain full nodes require the collator and validator networking to send + // collations and to be able to recover PoVs. + let notification_services = if role.is_authority() || + is_parachain_node.is_running_alongside_parachain_node() + { + use polkadot_network_bridge::{peer_sets_info, IsAuthority}; + let is_authority = if role.is_authority() { IsAuthority::Yes } else { IsAuthority::No }; + + peer_sets_info::<_, Network>( + is_authority, + &peerset_protocol_names, + metrics.clone(), + Arc::clone(&peer_store_handle), + ) + .into_iter() + .map(|(config, (peerset, service))| { + net_config.add_notification_protocol(config); + (peerset, service) + }) + .collect::>>() + } else { + std::collections::HashMap::new() + }; + + let req_protocol_names = ReqProtocolNames::new(&genesis_hash, config.chain_spec.fork_id()); + + let (collation_req_v1_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let (collation_req_v2_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let (available_data_req_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let (pov_req_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let (chunk_req_v1_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let (chunk_req_v2_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + + let grandpa_hard_forks = if config.chain_spec.is_kusama() { + grandpa_support::kusama_hard_forks() + } else { + Vec::new() + }; + + let warp_sync = Arc::new(sc_consensus_grandpa::warp_proof::NetworkProvider::new( + backend.clone(), + import_setup.1.shared_authority_set().clone(), + grandpa_hard_forks, + )); + + let ext_overseer_args = if is_parachain_node.is_running_alongside_parachain_node() { + None + } else { + let parachains_db = open_database(&config.database)?; + let candidate_validation_config = if role.is_authority() { + let (prep_worker_path, exec_worker_path) = workers::determine_workers_paths( + workers_path, + workers_names, + node_version.clone(), + )?; + log::info!("🚀 Using prepare-worker binary at: {:?}", prep_worker_path); + log::info!("🚀 Using execute-worker binary at: {:?}", exec_worker_path); + + Some(CandidateValidationConfig { + artifacts_cache_path: config + .database + .path() + .ok_or(Error::DatabasePathRequired)? + .join("pvf-artifacts"), + node_version, + secure_validator_mode, + prep_worker_path, + exec_worker_path, + // Default execution workers is 4 because we have 8 cores on the reference + // hardware, and this accounts for 50% of that cpu capacity. + pvf_execute_workers_max_num: execute_workers_max_num.unwrap_or(4), + pvf_prepare_workers_soft_max_num: prepare_workers_soft_max_num.unwrap_or(1), + pvf_prepare_workers_hard_max_num: prepare_workers_hard_max_num.unwrap_or(2), + }) + } else { + None + }; + let (candidate_req_v2_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let (dispute_req_receiver, cfg) = + IncomingRequest::get_config_receiver::<_, Network>(&req_protocol_names); + net_config.add_request_response_protocol(cfg); + let approval_voting_config = ApprovalVotingConfig { + col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, + slot_duration_millis: slot_duration.as_millis() as u64, + }; + let dispute_coordinator_config = DisputeCoordinatorConfig { + col_dispute_data: parachains_db::REAL_COLUMNS.col_dispute_coordinator_data, + }; + let chain_selection_config = ChainSelectionConfig { + col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data, + stagnant_check_interval: Default::default(), + stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly, + }; + + // Kusama + testnets get a higher threshold, we are conservative on Polkadot for now. + let fetch_chunks_threshold = + if config.chain_spec.is_polkadot() { None } else { Some(FETCH_CHUNKS_THRESHOLD) }; + + let availability_config = AvailabilityConfig { + col_data: parachains_db::REAL_COLUMNS.col_availability_data, + col_meta: parachains_db::REAL_COLUMNS.col_availability_meta, + keep_finalized_for: if matches!(config.chain_spec.identify_chain(), Chain::Rococo) { + keep_finalized_for.unwrap_or(1) + } else { + KEEP_FINALIZED_FOR_LIVE_NETWORKS + }, + }; + + Some(ExtendedOverseerGenArgs { + keystore: keystore_container.local_keystore(), + parachains_db, + candidate_validation_config, + availability_config, + pov_req_receiver, + chunk_req_v1_receiver, + chunk_req_v2_receiver, + candidate_req_v2_receiver, + approval_voting_config, + dispute_req_receiver, + dispute_coordinator_config, + chain_selection_config, + fetch_chunks_threshold, + }) + }; + + let (network, system_rpc_tx, tx_handler_controller, sync_service) = + sc_service::build_network(sc_service::BuildNetworkParams { + config: &config, + net_config, + client: client.clone(), + transaction_pool: transaction_pool.clone(), + spawn_handle: task_manager.spawn_handle(), + import_queue, + block_announce_validator_builder: None, + warp_sync_config: Some(WarpSyncConfig::WithProvider(warp_sync)), + block_relay: None, + metrics, + })?; + + if config.offchain_worker.enabled { + use futures::FutureExt; + + task_manager.spawn_handle().spawn( + "offchain-workers-runner", + "offchain-work", + sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions { + runtime_api_provider: client.clone(), + keystore: Some(keystore_container.keystore()), + offchain_db: backend.offchain_storage(), + transaction_pool: Some(OffchainTransactionPoolFactory::new( + transaction_pool.clone(), + )), + network_provider: Arc::new(network.clone()), + is_validator: role.is_authority(), + enable_http_requests: false, + custom_extensions: move |_| vec![], + })? + .run(client.clone(), task_manager.spawn_handle()) + .boxed(), + ); + } + + let network_config = config.network.clone(); + let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { + config, + backend: backend.clone(), + client: client.clone(), + keystore: keystore_container.keystore(), + network: network.clone(), + sync_service: sync_service.clone(), + rpc_builder: Box::new(rpc_extensions_builder), + transaction_pool: transaction_pool.clone(), + task_manager: &mut task_manager, + system_rpc_tx, + tx_handler_controller, + telemetry: telemetry.as_mut(), + })?; + + if let Some(hwbench) = hwbench { + sc_sysinfo::print_hwbench(&hwbench); + match SUBSTRATE_REFERENCE_HARDWARE.check_hardware(&hwbench, role.is_authority()) { + Err(err) if role.is_authority() => { + if err + .0 + .iter() + .any(|failure| matches!(failure.metric, Metric::Blake2256Parallel { .. })) + { + log::warn!( + "⚠️ Starting January 2025 the hardware will fail the minimal physical CPU cores requirements {} for role 'Authority',\n\ + find out more when this will become mandatory at:\n\ + https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware", + err + ); + } + if err + .0 + .iter() + .any(|failure| !matches!(failure.metric, Metric::Blake2256Parallel { .. })) + { + log::warn!( + "⚠️ The hardware does not meet the minimal requirements {} for role 'Authority' find out more at:\n\ + https://wiki.polkadot.network/docs/maintain-guides-how-to-validate-polkadot#reference-hardware", + err + ); + } + }, + _ => {}, + } + + if let Some(ref mut telemetry) = telemetry { + let telemetry_handle = telemetry.handle(); + task_manager.spawn_handle().spawn( + "telemetry_hwbench", + None, + sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench), + ); + } + } + + let (block_import, link_half, babe_link, beefy_links) = import_setup; + + let overseer_client = client.clone(); + let spawner = task_manager.spawn_handle(); + + let authority_discovery_service = + // We need the authority discovery if this node is either a validator or running alongside a parachain node. + // Parachains node require the authority discovery for finding relay chain validators for sending + // their PoVs or recovering PoVs. + if role.is_authority() || is_parachain_node.is_running_alongside_parachain_node() { + use futures::StreamExt; + use sc_network::{Event, NetworkEventStream}; + + let authority_discovery_role = if role.is_authority() { + sc_authority_discovery::Role::PublishAndDiscover(keystore_container.keystore()) + } else { + // don't publish our addresses when we're not an authority (collator, cumulus, ..) + sc_authority_discovery::Role::Discover + }; + let dht_event_stream = + network.event_stream("authority-discovery").filter_map(|e| async move { + match e { + Event::Dht(e) => Some(e), + _ => None, + } + }); + let (worker, service) = sc_authority_discovery::new_worker_and_service_with_config( + sc_authority_discovery::WorkerConfig { + publish_non_global_ips: auth_disc_publish_non_global_ips, + public_addresses: auth_disc_public_addresses, + // Require that authority discovery records are signed. + strict_record_validation: true, + persisted_cache_directory: network_config.net_config_path, + ..Default::default() + }, + client.clone(), + Arc::new(network.clone()), + Box::pin(dht_event_stream), + authority_discovery_role, + prometheus_registry.clone(), + task_manager.spawn_handle(), + ); + + task_manager.spawn_handle().spawn( + "authority-discovery-worker", + Some("authority-discovery"), + Box::pin(worker.run()), + ); + Some(service) + } else { + None + }; + + let runtime_client = Arc::new(DefaultSubsystemClient::new( + overseer_client.clone(), + OffchainTransactionPoolFactory::new(transaction_pool.clone()), + )); + + let overseer_handle = if let Some(authority_discovery_service) = authority_discovery_service + { + let (overseer, overseer_handle) = overseer_gen + .generate::>( + overseer_connector, + OverseerGenArgs { + runtime_client, + network_service: network.clone(), + sync_service: sync_service.clone(), + authority_discovery_service, + collation_req_v1_receiver, + collation_req_v2_receiver, + available_data_req_receiver, + registry: prometheus_registry.as_ref(), + spawner, + is_parachain_node, + overseer_message_channel_capacity_override, + req_protocol_names, + peerset_protocol_names, + notification_services, + }, + ext_overseer_args, + ) + .map_err(|e| { + gum::error!("Failed to init overseer: {}", e); + e + })?; + let handle = Handle::new(overseer_handle.clone()); + + { + let handle = handle.clone(); + task_manager.spawn_essential_handle().spawn_blocking( + "overseer", + None, + Box::pin(async move { + use futures::{pin_mut, select, FutureExt}; + + let forward = polkadot_overseer::forward_events(overseer_client, handle); + + let forward = forward.fuse(); + let overseer_fut = overseer.run().fuse(); + + pin_mut!(overseer_fut); + pin_mut!(forward); + + select! { + () = forward => (), + () = overseer_fut => (), + complete => (), + } + }), + ); + } + Some(handle) + } else { + assert!( + !auth_or_collator, + "Precondition congruence (false) is guaranteed by manual checking. qed" + ); + None + }; + + if role.is_authority() { + let proposer = sc_basic_authorship::ProposerFactory::new( + task_manager.spawn_handle(), + client.clone(), + transaction_pool.clone(), + prometheus_registry.as_ref(), + telemetry.as_ref().map(|x| x.handle()), + ); + + let client_clone = client.clone(); + let overseer_handle = + overseer_handle.as_ref().ok_or(Error::AuthoritiesRequireRealOverseer)?.clone(); + let slot_duration = babe_link.config().slot_duration(); + let babe_config = sc_consensus_babe::BabeParams { + keystore: keystore_container.keystore(), + client: client.clone(), + select_chain, + block_import, + env: proposer, + sync_oracle: sync_service.clone(), + justification_sync_link: sync_service.clone(), + create_inherent_data_providers: move |parent, ()| { + let client_clone = client_clone.clone(); + let overseer_handle = overseer_handle.clone(); + + async move { + let parachain = + polkadot_node_core_parachains_inherent::ParachainsInherentDataProvider::new( + client_clone, + overseer_handle, + parent, + ); + + let timestamp = sp_timestamp::InherentDataProvider::from_system_time(); + + let slot = + sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration( + *timestamp, + slot_duration, + ); + + Ok((slot, timestamp, parachain)) + } + }, + force_authoring, + backoff_authoring_blocks, + babe_link, + block_proposal_slot_portion: sc_consensus_babe::SlotProportion::new(2f32 / 3f32), + max_block_proposal_slot_portion: None, + telemetry: telemetry.as_ref().map(|x| x.handle()), + }; + + let babe = sc_consensus_babe::start_babe(babe_config)?; + task_manager.spawn_essential_handle().spawn_blocking("babe", None, babe); + } + + // if the node isn't actively participating in consensus then it doesn't + // need a keystore, regardless of which protocol we use below. + let keystore_opt = + if role.is_authority() { Some(keystore_container.keystore()) } else { None }; + + // beefy is enabled if its notification service exists + if let Some(notification_service) = beefy_notification_service { + let justifications_protocol_name = + beefy_on_demand_justifications_handler.protocol_name(); + let network_params = sc_consensus_beefy::BeefyNetworkParams { + network: Arc::new(network.clone()), + sync: sync_service.clone(), + gossip_protocol_name: beefy_gossip_proto_name, + justifications_protocol_name, + notification_service, + _phantom: core::marker::PhantomData::, + }; + let payload_provider = sp_consensus_beefy::mmr::MmrRootProvider::new(client.clone()); + let beefy_params = sc_consensus_beefy::BeefyParams { + client: client.clone(), + backend: backend.clone(), + payload_provider, + runtime: client.clone(), + key_store: keystore_opt.clone(), + network_params, + min_block_delta: 8, + prometheus_registry: prometheus_registry.clone(), + links: beefy_links, + on_demand_justifications_handler: beefy_on_demand_justifications_handler, + is_authority: role.is_authority(), + }; + + let gadget = sc_consensus_beefy::start_beefy_gadget::< + _, + _, + _, + _, + _, + _, + _, + ecdsa_crypto::AuthorityId, + >(beefy_params); + + // BEEFY is part of consensus, if it fails we'll bring the node down with it to make + // sure it is noticed. + task_manager + .spawn_essential_handle() + .spawn_blocking("beefy-gadget", None, gadget); + } + // When offchain indexing is enabled, MMR gadget should also run. + if is_offchain_indexing_enabled { + task_manager.spawn_essential_handle().spawn_blocking( + "mmr-gadget", + None, + MmrGadget::start( + client.clone(), + backend.clone(), + sp_mmr_primitives::INDEXING_PREFIX.to_vec(), + ), + ); + } + + let config = sc_consensus_grandpa::Config { + // FIXME substrate#1578 make this available through chainspec + // Grandpa performance can be improved a bit by tuning this parameter, see: + // https://github.com/paritytech/polkadot/issues/5464 + gossip_duration: Duration::from_millis(1000), + justification_generation_period: GRANDPA_JUSTIFICATION_PERIOD, + name: Some(name), + observer_enabled: false, + keystore: keystore_opt, + local_role: role, + telemetry: telemetry.as_ref().map(|x| x.handle()), + protocol_name: grandpa_protocol_name, + }; + + let enable_grandpa = !disable_grandpa; + if enable_grandpa { + // start the full GRANDPA voter + // NOTE: unlike in substrate we are currently running the full + // GRANDPA voter protocol for all full nodes (regardless of whether + // they're validators or not). at this point the full voter should + // provide better guarantees of block and vote data availability than + // the observer. + + let mut voting_rules_builder = sc_consensus_grandpa::VotingRulesBuilder::default(); + + #[cfg(not(feature = "malus"))] + let _malus_finality_delay = None; + + if let Some(delay) = _malus_finality_delay { + info!(?delay, "Enabling malus finality delay",); + voting_rules_builder = + voting_rules_builder.add(sc_consensus_grandpa::BeforeBestBlockBy(delay)); + }; + + let grandpa_config = sc_consensus_grandpa::GrandpaParams { + config, + link: link_half, + network: network.clone(), + sync: sync_service.clone(), + voting_rule: voting_rules_builder.build(), + prometheus_registry: prometheus_registry.clone(), + shared_voter_state, + telemetry: telemetry.as_ref().map(|x| x.handle()), + notification_service: grandpa_notification_service, + offchain_tx_pool_factory: OffchainTransactionPoolFactory::new( + transaction_pool.clone(), + ), + }; + + task_manager.spawn_essential_handle().spawn_blocking( + "grandpa-voter", + None, + sc_consensus_grandpa::run_grandpa_voter(grandpa_config)?, + ); + } + + Ok(NewFull { + task_manager, + client, + overseer_handle, + network, + sync_service, + rpc_handlers, + backend, + }) + } +} + +/// Create a new full node of arbitrary runtime and executor. +/// +/// This is an advanced feature and not recommended for general use. Generally, `build_full` is +/// a better choice. +/// +/// `workers_path` is used to get the path to the directory where auxiliary worker binaries reside. +/// If not specified, the main binary's directory is searched first, then `/usr/lib/polkadot` is +/// searched. If the path points to an executable rather then directory, that executable is used +/// both as preparation and execution worker (supposed to be used for tests only). +pub fn new_full< + OverseerGenerator: OverseerGen, + Network: sc_network::NetworkBackend::Hash>, +>( + config: Configuration, + params: NewFullParams, +) -> Result { + PolkadotServiceBuilder::::new(config, params)?.build() +} diff --git a/prdoc/pr_8839.prdoc b/prdoc/pr_8839.prdoc new file mode 100644 index 0000000000000..55218f7bce5c3 --- /dev/null +++ b/prdoc/pr_8839.prdoc @@ -0,0 +1,23 @@ +title: "net/discovery: File persistence for AddrCache" +doc: + - audience: Node Dev + description: |- + Persisting the AddrCache periodically (every 10 minutes) and on worker + shutdown. Read AddrCache from file upon launch of worker. + + AddrCache is saved as authority_discovery_addr_cache.json in the + folder configured by net_config_path of NetworkConfiguration. + + This reduces the time it takes for a node to reconnect to peers after + restart. +crates: + - name: sc-authority-discovery + bump: major + - name: sc-network-types + bump: minor + - name: cumulus-relay-chain-minimal-node + bump: patch + - name: polkadot-service + bump: patch + - name: staging-node-cli + bump: patch diff --git a/substrate/bin/node/cli/src/service.rs b/substrate/bin/node/cli/src/service.rs index 008cac4ef8a88..135325bc661d6 100644 --- a/substrate/bin/node/cli/src/service.rs +++ b/substrate/bin/node/cli/src/service.rs @@ -543,6 +543,7 @@ pub fn new_full_base::Hash>>( task_manager.spawn_handle().spawn("mixnet", None, mixnet); } + let net_config_path = config.network.net_config_path.clone(); let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { config, backend: backend.clone(), @@ -655,6 +656,7 @@ pub fn new_full_base::Hash>>( sc_authority_discovery::WorkerConfig { publish_non_global_ips: auth_disc_publish_non_global_ips, public_addresses: auth_disc_public_addresses, + persisted_cache_directory: net_config_path, ..Default::default() }, client.clone(), @@ -662,6 +664,7 @@ pub fn new_full_base::Hash>>( Box::pin(dht_event_stream), authority_discovery_role, prometheus_registry.clone(), + task_manager.spawn_handle(), ); task_manager.spawn_handle().spawn( diff --git a/substrate/client/authority-discovery/Cargo.toml b/substrate/client/authority-discovery/Cargo.toml index 7c082d62d9132..01b1ab9b402cb 100644 --- a/substrate/client/authority-discovery/Cargo.toml +++ b/substrate/client/authority-discovery/Cargo.toml @@ -30,6 +30,7 @@ linked_hash_set = { workspace = true } log = { workspace = true, default-features = true } prost = { workspace = true } rand = { workspace = true, default-features = true } +<<<<<<< HEAD thiserror = { workspace = true } prometheus-endpoint.workspace = true prometheus-endpoint.default-features = true @@ -52,8 +53,34 @@ sp-keystore.default-features = true sp-runtime.workspace = true sp-runtime.default-features = true async-trait = { workspace = true } +======= +sc-client-api = { workspace = true, default-features = true } +sc-network = { workspace = true, default-features = true } +sc-network-types = { workspace = true, default-features = true } +sc-service.workspace = true +serde.workspace = true +serde_json.workspace = true +sp-api = { workspace = true, default-features = true } +sp-authority-discovery = { workspace = true, default-features = true } +sp-blockchain = { workspace = true, default-features = true } +sp-core = { workspace = true, default-features = true } +sp-keystore = { workspace = true, default-features = true } +sp-runtime = { workspace = true, default-features = true } +thiserror = { workspace = true } +tokio.workspace = true +>>>>>>> ee6d22b9 (net/discovery: File persistence for `AddrCache` (#8839)) [dev-dependencies] +hex.workspace = true quickcheck = { workspace = true } +<<<<<<< HEAD sp-tracing = { default-features = true, path = "../../primitives/tracing" } substrate-test-runtime-client = { path = "../../test-utils/runtime/client" } +======= +sp-tracing = { workspace = true, default-features = true } +substrate-test-runtime-client = { workspace = true } +tempfile.workspace = true + +[build-dependencies] +prost-build = { workspace = true } +>>>>>>> ee6d22b9 (net/discovery: File persistence for `AddrCache` (#8839)) diff --git a/substrate/client/authority-discovery/src/error.rs b/substrate/client/authority-discovery/src/error.rs index 3f395e47922e2..d524ca1e9e9f7 100644 --- a/substrate/client/authority-discovery/src/error.rs +++ b/substrate/client/authority-discovery/src/error.rs @@ -52,6 +52,9 @@ pub enum Error { #[error("Failed to encode or decode scale payload.")] EncodingDecodingScale(#[from] codec::Error), + #[error("Failed to encode or decode AddrCache.")] + EncodingDecodingAddrCache(String), + #[error("Failed to parse a libp2p multi address.")] ParsingMultiaddress(#[from] sc_network::multiaddr::ParseError), diff --git a/substrate/client/authority-discovery/src/lib.rs b/substrate/client/authority-discovery/src/lib.rs index e674c51571eff..7b654a16e3d5b 100644 --- a/substrate/client/authority-discovery/src/lib.rs +++ b/substrate/client/authority-discovery/src/lib.rs @@ -33,7 +33,7 @@ pub use crate::{ worker::{AuthorityDiscovery, NetworkProvider, Role, Worker}, }; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration}; use futures::{ channel::{mpsc, oneshot}, @@ -44,8 +44,8 @@ use sc_network::{event::DhtEvent, Multiaddr}; use sc_network_types::PeerId; use sp_authority_discovery::AuthorityId; use sp_blockchain::HeaderBackend; +use sp_core::traits::SpawnNamed; use sp_runtime::traits::Block as BlockT; - mod error; mod interval; mod service; @@ -88,6 +88,11 @@ pub struct WorkerConfig { /// /// Defaults to `false` to provide compatibility with old versions pub strict_record_validation: bool, + + /// The directory of where the persisted AddrCache file is located, + /// optional since NetworkConfiguration's `net_config_path` field + /// is optional. If None, we won't persist the AddrCache at all. + pub persisted_cache_directory: Option, } impl Default for WorkerConfig { @@ -110,6 +115,7 @@ impl Default for WorkerConfig { publish_non_global_ips: true, public_addresses: Vec::new(), strict_record_validation: false, + persisted_cache_directory: None, } } } @@ -123,6 +129,7 @@ pub fn new_worker_and_service( dht_event_rx: DhtEventStream, role: Role, prometheus_registry: Option, + spawner: impl SpawnNamed + 'static, ) -> (Worker, Service) where Block: BlockT + Unpin + 'static, @@ -136,6 +143,7 @@ where dht_event_rx, role, prometheus_registry, + spawner, ) } @@ -149,6 +157,7 @@ pub fn new_worker_and_service_with_config( dht_event_rx: DhtEventStream, role: Role, prometheus_registry: Option, + spawner: impl SpawnNamed + 'static, ) -> (Worker, Service) where Block: BlockT + Unpin + 'static, @@ -157,8 +166,16 @@ where { let (to_worker, from_service) = mpsc::channel(0); - let worker = - Worker::new(from_service, client, network, dht_event_rx, role, prometheus_registry, config); + let worker = Worker::new( + from_service, + client, + network, + dht_event_rx, + role, + prometheus_registry, + config, + spawner, + ); let service = Service::new(to_worker); (worker, service) diff --git a/substrate/client/authority-discovery/src/tests.rs b/substrate/client/authority-discovery/src/tests.rs index acfd0e61de019..32c4afec0c531 100644 --- a/substrate/client/authority-discovery/src/tests.rs +++ b/substrate/client/authority-discovery/src/tests.rs @@ -17,11 +17,12 @@ // along with this program. If not, see . use crate::{ - new_worker_and_service, + new_worker_and_service_with_config, worker::{ tests::{TestApi, TestNetwork}, - Role, + AddrCache, Role, }, + WorkerConfig, }; use futures::{channel::mpsc::channel, executor::LocalPool, task::LocalSpawn}; @@ -30,11 +31,19 @@ use std::{collections::HashSet, sync::Arc}; use sc_network::{multiaddr::Protocol, Multiaddr, PeerId}; use sp_authority_discovery::AuthorityId; -use sp_core::crypto::key_types; +use sp_core::{crypto::key_types, testing::TaskExecutor, traits::SpawnNamed}; use sp_keystore::{testing::MemoryKeystore, Keystore}; -#[test] -fn get_addresses_and_authority_id() { +pub(super) fn create_spawner() -> Box { + Box::new(TaskExecutor::new()) +} + +pub(super) fn test_config(path_buf: Option) -> WorkerConfig { + WorkerConfig { persisted_cache_directory: path_buf, ..Default::default() } +} + +#[tokio::test] +async fn get_addresses_and_authority_id() { let (_dht_event_tx, dht_event_rx) = channel(0); let network: Arc = Arc::new(Default::default()); @@ -57,12 +66,16 @@ fn get_addresses_and_authority_id() { let test_api = Arc::new(TestApi { authorities: vec![] }); - let (mut worker, mut service) = new_worker_and_service( + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); + let (mut worker, mut service) = new_worker_and_service_with_config( + test_config(Some(path)), test_api, network.clone(), Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, + create_spawner(), ); worker.inject_addresses(remote_authority_id.clone(), vec![remote_addr.clone()]); @@ -80,8 +93,8 @@ fn get_addresses_and_authority_id() { }); } -#[test] -fn cryptos_are_compatible() { +#[tokio::test] +async fn cryptos_are_compatible() { use sp_core::crypto::Pair; let libp2p_keypair = ed25519::Keypair::generate(); @@ -103,3 +116,53 @@ fn cryptos_are_compatible() { )); assert!(libp2p_public.verify(message, sp_core_signature.as_ref())); } + +#[tokio::test] +async fn when_addr_cache_is_persisted_with_authority_ids_then_when_worker_is_created_it_loads_the_persisted_cache( +) { + // ARRANGE + let (_dht_event_tx, dht_event_rx) = channel(0); + let mut pool = LocalPool::new(); + let key_store = MemoryKeystore::new(); + + let remote_authority_id: AuthorityId = pool.run_until(async { + key_store + .sr25519_generate_new(key_types::AUTHORITY_DISCOVERY, None) + .unwrap() + .into() + }); + let remote_peer_id = PeerId::random(); + let remote_addr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333" + .parse::() + .unwrap() + .with(Protocol::P2p(remote_peer_id.into())); + + let tempdir = tempfile::tempdir().unwrap(); + let cache_path = tempdir.path().to_path_buf(); + + // persist the remote_authority_id and remote_addr in the cache + { + let mut addr_cache = AddrCache::default(); + addr_cache.insert(remote_authority_id.clone(), vec![remote_addr.clone()]); + let path_to_save = cache_path.join(crate::worker::ADDR_CACHE_FILE_NAME); + addr_cache.serialize_and_persist(&path_to_save); + } + + let (_, from_service) = futures::channel::mpsc::channel(0); + + // ACT + // Create a worker with the persisted cache + let worker = crate::worker::Worker::new( + from_service, + Arc::new(TestApi { authorities: vec![] }), + Arc::new(TestNetwork::default()), + Box::pin(dht_event_rx), + Role::PublishAndDiscover(key_store.into()), + None, + test_config(Some(cache_path)), + create_spawner(), + ); + + // ASSERT + assert!(worker.contains_authority(&remote_authority_id)); +} diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index 9319fbe6321e7..ca610b8422c01 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -16,6 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +pub(crate) use crate::worker::addr_cache::AddrCache; use crate::{ error::{Error, Result}, interval::ExpIncInterval, @@ -25,19 +26,19 @@ use crate::{ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, + path::PathBuf, sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt}; -use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; use libp2p::kad::{PeerRecord, Record}; use linked_hash_set::LinkedHashSet; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; use prost::Message; use rand::{seq::SliceRandom, thread_rng}; @@ -53,7 +54,10 @@ use sp_authority_discovery::{ AuthorityDiscoveryApi, AuthorityId, AuthorityPair, AuthoritySignature, }; use sp_blockchain::HeaderBackend; -use sp_core::crypto::{key_types, ByteArray, Pair}; +use sp_core::{ + crypto::{key_types, ByteArray, Pair}, + traits::SpawnNamed, +}; use sp_keystore::{Keystore, KeystorePtr}; use sp_runtime::traits::Block as BlockT; @@ -69,6 +73,8 @@ mod schema { pub mod tests; const LOG_TARGET: &str = "sub-authority-discovery"; +pub(crate) const ADDR_CACHE_FILE_NAME: &str = "authority_discovery_addr_cache.json"; +const ADDR_CACHE_PERSIST_INTERVAL: Duration = Duration::from_secs(60 * 10); // 10 minutes /// Maximum number of addresses cached per authority. Additional addresses are discarded. const MAX_ADDRESSES_PER_AUTHORITY: usize = 16; @@ -186,6 +192,14 @@ pub struct Worker { role: Role, phantom: PhantomData, + + /// A spawner of tasks + spawner: Box, + + /// The directory of where the persisted AddrCache file is located, + /// optional since NetworkConfiguration's `net_config_path` field + /// is optional. If None, we won't persist the AddrCache at all. + persisted_cache_file_path: Option, } #[derive(Debug, Clone)] @@ -245,6 +259,7 @@ where role: Role, prometheus_registry: Option, config: WorkerConfig, + spawner: impl SpawnNamed + 'static, ) -> Self { // When a node starts up publishing and querying might fail due to various reasons, for // example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather @@ -261,7 +276,30 @@ where let publish_if_changed_interval = ExpIncInterval::new(config.keystore_refresh_interval, config.keystore_refresh_interval); - let addr_cache = AddrCache::new(); + let maybe_persisted_cache_file_path = + config.persisted_cache_directory.as_ref().map(|dir| { + let mut path = dir.clone(); + path.push(ADDR_CACHE_FILE_NAME); + path + }); + + // If we have a path to persisted cache file, then we will try to + // load the contents of persisted cache from file, if it exists, and is valid. + // Create a new one otherwise. + let addr_cache: AddrCache = if let Some(persisted_cache_file_path) = + maybe_persisted_cache_file_path.as_ref() + { + let loaded = + AddrCache::try_from(persisted_cache_file_path.as_path()).unwrap_or_else(|e| { + info!(target: LOG_TARGET, "Failed to load AddrCache from file, using empty instead: {}", e); + AddrCache::new() + }); + info!(target: LOG_TARGET, "Loaded persisted AddrCache with {} authority ids.", loaded.num_authority_ids()); + loaded + } else { + info!(target: LOG_TARGET, "No persisted cache file path provided, authority discovery will not persist the address cache to disk."); + AddrCache::new() + }; let metrics = match prometheus_registry { Some(registry) => match Metrics::register(®istry) { @@ -308,20 +346,43 @@ where warn_public_addresses: false, phantom: PhantomData, last_known_records: HashMap::new(), + spawner: Box::new(spawner), + persisted_cache_file_path: maybe_persisted_cache_file_path, } } + /// Persists `AddrCache` to disk if the `persisted_cache_file_path` is set. + pub fn persist_addr_cache_if_supported(&self) { + let Some(path) = self.persisted_cache_file_path.as_ref().cloned() else { + return; + }; + let cloned_cache = self.addr_cache.clone(); + self.spawner.spawn_blocking( + "persist-addr-cache", + Some("authority-discovery-worker"), + Box::pin(async move { + cloned_cache.serialize_and_persist(path); + }), + ) + } + /// Start the worker pub async fn run(mut self) { + let mut persist_interval = tokio::time::interval(ADDR_CACHE_PERSIST_INTERVAL); + loop { self.start_new_lookups(); futures::select! { + _ = persist_interval.tick().fuse() => { + self.persist_addr_cache_if_supported(); + }, // Process incoming events. event = self.dht_event_rx.next().fuse() => { if let Some(event) = event { self.handle_dht_event(event).await; } else { + self.persist_addr_cache_if_supported(); // This point is reached if the network has shut down, at which point there is not // much else to do than to shut down the authority discovery as well. return; @@ -1196,6 +1257,10 @@ impl Metrics { #[cfg(test)] impl Worker { pub(crate) fn inject_addresses(&mut self, authority: AuthorityId, addresses: Vec) { - self.addr_cache.insert(authority, addresses); + self.addr_cache.insert(authority, addresses) + } + + pub(crate) fn contains_authority(&self, authority: &AuthorityId) -> bool { + self.addr_cache.get_addresses_by_authority_id(authority).is_some() } } diff --git a/substrate/client/authority-discovery/src/worker/addr_cache.rs b/substrate/client/authority-discovery/src/worker/addr_cache.rs index 13bb990bf8b99..fe1819e64c1e3 100644 --- a/substrate/client/authority-discovery/src/worker/addr_cache.rs +++ b/substrate/client/authority-discovery/src/worker/addr_cache.rs @@ -16,14 +16,24 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::error::Error; +use log::{info, warn}; use sc_network::{multiaddr::Protocol, Multiaddr}; use sc_network_types::PeerId; +use serde::{Deserialize, Serialize}; use sp_authority_discovery::AuthorityId; -use std::collections::{hash_map::Entry, HashMap, HashSet}; +use sp_runtime::DeserializeOwned; +use std::{ + collections::{hash_map::Entry, HashMap, HashSet}, + fs::File, + io::{self, BufReader, Write}, + path::Path, +}; /// Cache for [`AuthorityId`] -> [`HashSet`] and [`PeerId`] -> [`HashSet`] /// mappings. -pub(super) struct AddrCache { +#[derive(Default, Clone, PartialEq, Debug)] +pub(crate) struct AddrCache { /// The addresses found in `authority_id_to_addresses` are guaranteed to always match /// the peerids found in `peer_id_to_authority_ids`. In other words, these two hashmaps /// are similar to a bi-directional map. @@ -35,14 +45,116 @@ pub(super) struct AddrCache { peer_id_to_authority_ids: HashMap>, } +impl Serialize for AddrCache { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + SerializeAddrCache::from(self.clone()).serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for AddrCache { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + SerializeAddrCache::deserialize(deserializer).map(Into::into) + } +} + +/// A storage and serialization time optimized version of `AddrCache` +/// which contains the bare minimum info to reconstruct the AddrCache. We +/// rely on the fact that the `peer_id_to_authority_ids` can be reconstructed from +/// the `authority_id_to_addresses` field. +/// +/// Benchmarks show that this is about 2x faster to serialize and about 4x faster to deserialize +/// compared to the full `AddrCache`. +/// +/// Storage wise it is about half the size of the full `AddrCache`. +/// +/// This is used to persist the `AddrCache` to disk and load it back. +/// +/// AddrCache impl of Serialize and Deserialize "piggybacks" on this struct. +#[derive(Serialize, Deserialize)] +struct SerializeAddrCache { + authority_id_to_addresses: HashMap>, +} + +impl From for AddrCache { + fn from(value: SerializeAddrCache) -> Self { + let mut peer_id_to_authority_ids: HashMap> = HashMap::new(); + + for (authority_id, addresses) in &value.authority_id_to_addresses { + for peer_id in addresses_to_peer_ids(addresses) { + peer_id_to_authority_ids + .entry(peer_id) + .or_insert_with(HashSet::new) + .insert(authority_id.clone()); + } + } + + AddrCache { + authority_id_to_addresses: value.authority_id_to_addresses, + peer_id_to_authority_ids, + } + } +} +impl From for SerializeAddrCache { + fn from(value: AddrCache) -> Self { + Self { authority_id_to_addresses: value.authority_id_to_addresses } + } +} + +fn write_to_file(path: impl AsRef, contents: &str) -> io::Result<()> { + let path = path.as_ref(); + let mut file = File::create(path)?; + file.write_all(contents.as_bytes())?; + file.flush()?; + Ok(()) +} + +impl TryFrom<&Path> for AddrCache { + type Error = Error; + + fn try_from(path: &Path) -> Result { + // Try to load from the cache file if it exists and is valid. + load_from_file::(&path).map_err(|e| { + Error::EncodingDecodingAddrCache(format!( + "Failed to load AddrCache from file: {}, error: {:?}", + path.display(), + e + )) + }) + } +} impl AddrCache { pub fn new() -> Self { - AddrCache { - authority_id_to_addresses: HashMap::new(), - peer_id_to_authority_ids: HashMap::new(), + AddrCache::default() + } + + fn serialize(&self) -> Option { + serde_json::to_string_pretty(self).inspect_err(|e| { + warn!(target: super::LOG_TARGET, "Failed to serialize AddrCache to JSON: {} => skip persisting it.", e); + }).ok() + } + + fn persist(path: impl AsRef, serialized_cache: String) { + match write_to_file(path.as_ref(), &serialized_cache) { + Err(err) => { + warn!(target: super::LOG_TARGET, "Failed to persist AddrCache on disk at path: {}, error: {}", path.as_ref().display(), err); + }, + Ok(_) => { + info!(target: super::LOG_TARGET, "Successfully persisted AddrCache on disk"); + }, } } + pub fn serialize_and_persist(&self, path: impl AsRef) { + let Some(serialized) = self.serialize() else { return }; + Self::persist(path, serialized); + } + /// Inserts the given [`AuthorityId`] and [`Vec`] pair for future lookups by /// [`AuthorityId`] or [`PeerId`]. pub fn insert(&mut self, authority_id: AuthorityId, addresses: Vec) { @@ -56,7 +168,6 @@ impl AddrCache { authority_id, addresses, ); - return } else if peer_ids.len() > 1 { log::warn!( @@ -172,8 +283,21 @@ fn addresses_to_peer_ids(addresses: &HashSet) -> HashSet { addresses.iter().filter_map(peer_id_from_multiaddr).collect::>() } +fn load_from_file(path: impl AsRef) -> io::Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + + serde_json::from_reader(reader).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) +} + #[cfg(test)] mod tests { + + use std::{ + thread::sleep, + time::{Duration, Instant}, + }; + use super::*; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; @@ -280,6 +404,12 @@ mod tests { .quickcheck(property as fn(_, _, _) -> TestResult) } + #[test] + fn test_from_to_serializable() { + let serializable = SerializeAddrCache::from(AddrCache::sample()); + let roundtripped = AddrCache::from(serializable); + assert_eq!(roundtripped, AddrCache::sample()) + } #[test] fn keeps_consistency_between_authority_id_and_peer_id() { fn property( @@ -381,4 +511,165 @@ mod tests { addr_cache.get_addresses_by_authority_id(&authority_id1).unwrap() ); } + + impl AddrCache { + pub fn sample() -> Self { + let mut addr_cache = AddrCache::new(); + + let peer_id = PeerId::from_multihash( + Multihash::wrap(Code::Sha2_256.into(), &[0xab; 32]).unwrap(), + ) + .unwrap(); + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); + let authority_id0 = AuthorityPair::from_seed(&[0xaa; 32]).public(); + let authority_id1 = AuthorityPair::from_seed(&[0xbb; 32]).public(); + + addr_cache.insert(authority_id0.clone(), vec![addr.clone()]); + addr_cache.insert(authority_id1.clone(), vec![addr.clone()]); + addr_cache + } + } + + #[test] + fn serde_json() { + let sample = || AddrCache::sample(); + let serializable = AddrCache::from(sample()); + let json = serde_json::to_string(&serializable).expect("Serialization should not fail"); + let deserialized = serde_json::from_str::(&json).unwrap(); + let from_serializable = AddrCache::try_from(deserialized).unwrap(); + assert_eq!(sample(), from_serializable); + } + + #[test] + fn deserialize_from_json() { + let json = r#" + { + "authority_id_to_addresses": { + "5FjfMGrqw9ck5XZaPVTKm2RE5cbwoVUfXvSGZY7KCUEFtdr7": [ + "/p2p/QmZtnFaddFtzGNT8BxdHVbQrhSFdq1pWxud5z4fA4kxfDt" + ], + "5DiQDBQvjFkmUF3C8a7ape5rpRPoajmMj44Q9CTGPfVBaa6U": [ + "/p2p/QmZtnFaddFtzGNT8BxdHVbQrhSFdq1pWxud5z4fA4kxfDt" + ] + } + } + "#; + let deserialized = serde_json::from_str::(json).unwrap(); + assert_eq!(deserialized, AddrCache::sample()) + } + + fn serialize_and_write_to_file( + path: impl AsRef, + contents: &T, + ) -> io::Result<()> { + let serialized = serde_json::to_string_pretty(contents).unwrap(); + write_to_file(path, &serialized) + } + + #[test] + fn test_load_cache_from_disc() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("cache.json"); + let sample = AddrCache::sample(); + assert_eq!(sample.num_authority_ids(), 2); + serialize_and_write_to_file(&path, &sample).unwrap(); + sleep(Duration::from_millis(10)); // Ensure file is written before loading + let cache = AddrCache::try_from(path.as_path()).unwrap(); + assert_eq!(cache.num_authority_ids(), 2); + } + + fn create_cache(authority_id_count: u64, multiaddr_per_authority_count: u64) -> AddrCache { + let mut addr_cache = AddrCache::new(); + + for i in 0..authority_id_count { + let seed = &mut [0xab as u8; 32]; + let i_bytes = i.to_le_bytes(); + seed[0..8].copy_from_slice(&i_bytes); + + let authority_id = AuthorityPair::from_seed(seed).public(); + let multi_addresses = (0..multiaddr_per_authority_count) + .map(|j| { + let mut digest = [0xab; 32]; + let j_bytes = j.to_le_bytes(); + digest[0..8].copy_from_slice(&j_bytes); + let peer_id = PeerId::from_multihash( + Multihash::wrap(Code::Sha2_256.into(), &digest).unwrap(), + ) + .unwrap(); + Multiaddr::empty().with(Protocol::P2p(peer_id.into())) + }) + .collect::>(); + + assert_eq!(multi_addresses.len(), multiaddr_per_authority_count as usize); + addr_cache.insert(authority_id.clone(), multi_addresses); + } + assert_eq!(addr_cache.authority_id_to_addresses.len(), authority_id_count as usize); + + addr_cache + } + + /// This test is ignored by default as it takes a long time to run. + #[test] + #[ignore] + fn addr_cache_measure_serde_performance() { + let addr_cache = create_cache(1000, 5); + + /// A replica of `AddrCache` that is serializable and deserializable + /// without any optimizations. + #[derive(Default, Clone, PartialEq, Debug, Serialize, Deserialize)] + pub(crate) struct NaiveSerdeAddrCache { + authority_id_to_addresses: HashMap>, + peer_id_to_authority_ids: HashMap>, + } + impl From for NaiveSerdeAddrCache { + fn from(value: AddrCache) -> Self { + Self { + authority_id_to_addresses: value.authority_id_to_addresses, + peer_id_to_authority_ids: value.peer_id_to_authority_ids, + } + } + } + + let naive = NaiveSerdeAddrCache::from(addr_cache.clone()); + let storage_optimized = addr_cache.clone(); + + fn measure_clone(data: &T) -> Duration { + let start = Instant::now(); + let _ = data.clone(); + start.elapsed() + } + fn measure_serialize(data: &T) -> (Duration, String) { + let start = Instant::now(); + let json = serde_json::to_string_pretty(data).unwrap(); + (start.elapsed(), json) + } + fn measure_deserialize(json: String) -> (Duration, T) { + let start = Instant::now(); + let value = serde_json::from_str(&json).unwrap(); + (start.elapsed(), value) + } + + let serialize_naive = measure_serialize(&naive); + let serialize_storage_optimized = measure_serialize(&storage_optimized); + println!("CLONE: Naive took: {} ms", measure_clone(&naive).as_millis()); + println!( + "CLONE: Storage optimized took: {} ms", + measure_clone(&storage_optimized).as_millis() + ); + println!("SERIALIZE: Naive took: {} ms", serialize_naive.0.as_millis()); + println!( + "SERIALIZE: Storage optimized took: {} ms", + serialize_storage_optimized.0.as_millis() + ); + let deserialize_naive = measure_deserialize::(serialize_naive.1); + let deserialize_storage_optimized = + measure_deserialize::(serialize_storage_optimized.1); + println!("DESERIALIZE: Naive took: {} ms", deserialize_naive.0.as_millis()); + println!( + "DESERIALIZE: Storage optimized took: {} ms", + deserialize_storage_optimized.0.as_millis() + ); + assert_eq!(deserialize_naive.1, naive); + assert_eq!(deserialize_storage_optimized.1, storage_optimized); + } } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index 8018b5ea492dc..5074ab2bd5849 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -23,6 +23,9 @@ use std::{ time::Instant, }; +use crate::tests::{create_spawner, test_config}; + +use super::*; use futures::{ channel::mpsc::{self, channel}, executor::{block_on, LocalPool}, @@ -44,8 +47,6 @@ use sp_keystore::{testing::MemoryKeystore, Keystore}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; use substrate_test_runtime_client::runtime::Block; -use super::*; - #[derive(Clone)] pub(crate) struct TestApi { pub(crate) authorities: Vec, @@ -291,8 +292,8 @@ fn build_dht_event( kv_pairs } -#[test] -fn new_registers_metrics() { +#[tokio::test] +async fn new_registers_metrics() { let (_dht_event_tx, dht_event_rx) = mpsc::channel(1000); let network: Arc = Arc::new(Default::default()); let key_store = MemoryKeystore::new(); @@ -300,6 +301,8 @@ fn new_registers_metrics() { let registry = prometheus_endpoint::Registry::new(); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let (_to_worker, from_service) = mpsc::channel(0); Worker::new( from_service, @@ -308,14 +311,15 @@ fn new_registers_metrics() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), Some(registry.clone()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); assert!(registry.gather().len() > 0); } -#[test] -fn triggers_dht_get_query() { +#[tokio::test] +async fn triggers_dht_get_query() { sp_tracing::try_init_simple(); let (_dht_event_tx, dht_event_rx) = channel(1000); @@ -330,6 +334,8 @@ fn triggers_dht_get_query() { let key_store = MemoryKeystore::new(); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -337,7 +343,8 @@ fn triggers_dht_get_query() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); futures::executor::block_on(async { @@ -347,8 +354,8 @@ fn triggers_dht_get_query() { }) } -#[test] -fn publish_discover_cycle() { +#[tokio::test] +async fn publish_discover_cycle() { sp_tracing::try_init_simple(); let mut pool = LocalPool::new(); @@ -360,7 +367,6 @@ fn publish_discover_cycle() { let network: Arc = Arc::new(Default::default()); let key_store = MemoryKeystore::new(); - let _ = pool.spawner().spawn_local_obj( async move { let node_a_public = @@ -368,6 +374,9 @@ fn publish_discover_cycle() { let test_api = Arc::new(TestApi { authorities: vec![node_a_public.into()] }); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let temppath = tempdir.path(); + let path = temppath.to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -375,7 +384,8 @@ fn publish_discover_cycle() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); worker.publish_ext_addresses(false).await.unwrap(); @@ -402,6 +412,9 @@ fn publish_discover_cycle() { let key_store = MemoryKeystore::new(); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let temppath = tempdir.path(); + let path = temppath.to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -409,7 +422,8 @@ fn publish_discover_cycle() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); dht_event_tx.try_send(dht_event.clone()).unwrap(); @@ -429,12 +443,13 @@ fn publish_discover_cycle() { /// Don't terminate when sender side of service channel is dropped. Terminate when network event /// stream terminates. -#[test] -fn terminate_when_event_stream_terminates() { +#[tokio::test] +async fn terminate_when_event_stream_terminates() { let (dht_event_tx, dht_event_rx) = channel(1000); let network: Arc = Arc::new(Default::default()); let key_store = MemoryKeystore::new(); let test_api = Arc::new(TestApi { authorities: vec![] }); + let path = tempfile::tempdir().unwrap().path().to_path_buf(); let (to_worker, from_service) = mpsc::channel(0); let worker = Worker::new( @@ -444,7 +459,8 @@ fn terminate_when_event_stream_terminates() { Box::pin(dht_event_rx), Role::PublishAndDiscover(key_store.into()), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ) .run(); futures::pin_mut!(worker); @@ -474,8 +490,8 @@ fn terminate_when_event_stream_terminates() { }); } -#[test] -fn dont_stop_polling_dht_event_stream_after_bogus_event() { +#[tokio::test] +async fn dont_stop_polling_dht_event_stream_after_bogus_event() { let remote_multiaddr = { let peer_id = PeerId::random(); let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); @@ -500,6 +516,8 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { let mut pool = LocalPool::new(); let (mut to_worker, from_service) = mpsc::channel(1); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, test_api, @@ -507,7 +525,8 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { Box::pin(dht_event_rx), Role::PublishAndDiscover(Arc::new(key_store)), None, - Default::default(), + test_config(Some(path)), + create_spawner(), ); // Spawn the authority discovery to make sure it is polled independently. @@ -634,7 +653,17 @@ impl DhtValueFoundTester { Box::pin(dht_event_rx), Role::PublishAndDiscover(Arc::new(local_key_store)), None, - WorkerConfig { strict_record_validation, ..Default::default() }, + WorkerConfig { + strict_record_validation, + persisted_cache_directory: Some( + tempfile::tempdir() + .expect("Should be able to create tmp dir") + .path() + .to_path_buf(), + ), + ..Default::default() + }, + create_spawner(), )); (self.local_worker.as_mut().unwrap(), Some(local_network)) }; @@ -663,8 +692,8 @@ impl DhtValueFoundTester { } } -#[test] -fn limit_number_of_addresses_added_to_cache_per_authority() { +#[tokio::test] +async fn limit_number_of_addresses_added_to_cache_per_authority() { let mut tester = DhtValueFoundTester::new(); assert!(MAX_ADDRESSES_PER_AUTHORITY < 100); let addresses = (1..100).map(|i| tester.multiaddr_with_peer_id(i)).collect(); @@ -680,8 +709,8 @@ fn limit_number_of_addresses_added_to_cache_per_authority() { assert_eq!(MAX_ADDRESSES_PER_AUTHORITY, cached_remote_addresses.unwrap().len()); } -#[test] -fn strict_accept_address_with_peer_signature() { +#[tokio::test] +async fn strict_accept_address_with_peer_signature() { let mut tester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -701,8 +730,8 @@ fn strict_accept_address_with_peer_signature() { ); } -#[test] -fn strict_accept_address_without_creation_time() { +#[tokio::test] +async fn strict_accept_address_without_creation_time() { let mut tester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -722,8 +751,8 @@ fn strict_accept_address_without_creation_time() { ); } -#[test] -fn keep_last_received_if_no_creation_time() { +#[tokio::test] +async fn keep_last_received_if_no_creation_time() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -769,8 +798,8 @@ fn keep_last_received_if_no_creation_time() { .unwrap_or_default()); } -#[test] -fn records_with_incorrectly_signed_creation_time_are_ignored() { +#[tokio::test] +async fn records_with_incorrectly_signed_creation_time_are_ignored() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let addr = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -823,8 +852,8 @@ fn records_with_incorrectly_signed_creation_time_are_ignored() { .unwrap_or_default()); } -#[test] -fn newer_records_overwrite_older_ones() { +#[tokio::test] +async fn newer_records_overwrite_older_ones() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let old_record = tester.multiaddr_with_peer_id(1); let kv_pairs = build_dht_event( @@ -874,8 +903,8 @@ fn newer_records_overwrite_older_ones() { assert_eq!(result.1.len(), 1); } -#[test] -fn older_records_dont_affect_newer_ones() { +#[tokio::test] +async fn older_records_dont_affect_newer_ones() { let mut tester: DhtValueFoundTester = DhtValueFoundTester::new(); let old_record = tester.multiaddr_with_peer_id(1); let old_kv_pairs = build_dht_event( @@ -925,8 +954,8 @@ fn older_records_dont_affect_newer_ones() { assert_eq!(update_peers_info.1.len(), 1); } -#[test] -fn reject_address_with_rogue_peer_signature() { +#[tokio::test] +async fn reject_address_with_rogue_peer_signature() { let mut tester = DhtValueFoundTester::new(); let rogue_remote_node_key = Keypair::generate_ed25519(); let kv_pairs = build_dht_event( @@ -945,8 +974,8 @@ fn reject_address_with_rogue_peer_signature() { ); } -#[test] -fn reject_address_with_invalid_peer_signature() { +#[tokio::test] +async fn reject_address_with_invalid_peer_signature() { let mut tester = DhtValueFoundTester::new(); let mut kv_pairs = build_dht_event( vec![tester.multiaddr_with_peer_id(1)], @@ -968,8 +997,8 @@ fn reject_address_with_invalid_peer_signature() { ); } -#[test] -fn reject_address_without_peer_signature() { +#[tokio::test] +async fn reject_address_without_peer_signature() { let mut tester = DhtValueFoundTester::new(); let kv_pairs = build_dht_event::( vec![tester.multiaddr_with_peer_id(1)], @@ -984,8 +1013,8 @@ fn reject_address_without_peer_signature() { assert!(cached_remote_addresses.is_none(), "Expected worker to ignore unsigned record.",); } -#[test] -fn do_not_cache_addresses_without_peer_id() { +#[tokio::test] +async fn do_not_cache_addresses_without_peer_id() { let mut tester = DhtValueFoundTester::new(); let multiaddr_with_peer_id = tester.multiaddr_with_peer_id(1); let multiaddr_without_peer_id: Multiaddr = @@ -1007,8 +1036,8 @@ fn do_not_cache_addresses_without_peer_id() { ); } -#[test] -fn addresses_to_publish_adds_p2p() { +#[tokio::test] +async fn addresses_to_publish_adds_p2p() { let (_dht_event_tx, dht_event_rx) = channel(1000); let network: Arc = Arc::new(Default::default()); @@ -1018,6 +1047,8 @@ fn addresses_to_publish_adds_p2p() { )); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: vec![] }), @@ -1025,7 +1056,8 @@ fn addresses_to_publish_adds_p2p() { Box::pin(dht_event_rx), Role::PublishAndDiscover(MemoryKeystore::new().into()), Some(prometheus_endpoint::Registry::new()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); assert!( @@ -1039,8 +1071,8 @@ fn addresses_to_publish_adds_p2p() { /// Ensure [`Worker::addresses_to_publish`] does not add an additional `p2p` protocol component in /// case one already exists. -#[test] -fn addresses_to_publish_respects_existing_p2p_protocol() { +#[tokio::test] +async fn addresses_to_publish_respects_existing_p2p_protocol() { let (_dht_event_tx, dht_event_rx) = channel(1000); let identity = Keypair::generate_ed25519(); let peer_id = identity.public().to_peer_id(); @@ -1056,6 +1088,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { }); let (_to_worker, from_service) = mpsc::channel(0); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: vec![] }), @@ -1063,7 +1097,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { Box::pin(dht_event_rx), Role::PublishAndDiscover(MemoryKeystore::new().into()), Some(prometheus_endpoint::Registry::new()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); assert_eq!( @@ -1073,8 +1108,8 @@ fn addresses_to_publish_respects_existing_p2p_protocol() { ); } -#[test] -fn lookup_throttling() { +#[tokio::test] +async fn lookup_throttling() { let remote_multiaddr = { let peer_id = PeerId::random(); let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); @@ -1100,6 +1135,8 @@ fn lookup_throttling() { let mut network = TestNetwork::default(); let mut receiver = network.get_event_receiver().unwrap(); let network = Arc::new(network); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), @@ -1107,7 +1144,8 @@ fn lookup_throttling() { dht_event_rx.boxed(), Role::Discover, Some(default_registry().clone()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); let mut pool = LocalPool::new(); @@ -1183,8 +1221,8 @@ fn lookup_throttling() { ); } -#[test] -fn test_handle_put_record_request() { +#[tokio::test] +async fn test_handle_put_record_request() { let local_node_network = TestNetwork::default(); let remote_node_network = TestNetwork::default(); let peer_id = remote_node_network.peer_id; @@ -1218,6 +1256,8 @@ fn test_handle_put_record_request() { let (_dht_event_tx, dht_event_rx) = channel(1); let (_to_worker, from_service) = mpsc::channel(0); let network = Arc::new(local_node_network); + let tempdir = tempfile::tempdir().unwrap(); + let path = tempdir.path().to_path_buf(); let mut worker = Worker::new( from_service, Arc::new(TestApi { authorities: remote_public_keys.clone() }), @@ -1225,7 +1265,8 @@ fn test_handle_put_record_request() { dht_event_rx.boxed(), Role::Discover, Some(default_registry().clone()), - Default::default(), + test_config(Some(path)), + create_spawner(), ); let mut pool = LocalPool::new(); diff --git a/substrate/client/network/types/Cargo.toml b/substrate/client/network/types/Cargo.toml index b47b475f12d12..f2cde17ccefec 100644 --- a/substrate/client/network/types/Cargo.toml +++ b/substrate/client/network/types/Cargo.toml @@ -18,6 +18,8 @@ log = { workspace = true, default-features = true } multiaddr = { workspace = true } multihash = { workspace = true } rand = { workspace = true, default-features = true } +serde.workspace = true +serde_with.workspace = true thiserror = { workspace = true } zeroize = { workspace = true } diff --git a/substrate/client/network/types/src/multiaddr.rs b/substrate/client/network/types/src/multiaddr.rs index 925e24fe70d6d..d938202b8977b 100644 --- a/substrate/client/network/types/src/multiaddr.rs +++ b/substrate/client/network/types/src/multiaddr.rs @@ -21,6 +21,7 @@ use litep2p::types::multiaddr::{ Protocol as LiteP2pProtocol, }; use multiaddr::Multiaddr as LibP2pMultiaddr; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::{ fmt::{self, Debug, Display}, net::{IpAddr, Ipv4Addr, Ipv6Addr}, @@ -36,7 +37,7 @@ pub use crate::build_multiaddr as multiaddr; /// [`Multiaddr`] type used in Substrate. Converted to libp2p's `Multiaddr` /// or litep2p's `Multiaddr` when passed to the corresponding network backend. -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Hash, SerializeDisplay, DeserializeFromStr)] pub struct Multiaddr { multiaddr: LiteP2pMultiaddr, } diff --git a/substrate/client/network/types/src/peer_id.rs b/substrate/client/network/types/src/peer_id.rs index 076be0a66c7b7..124b955d48a65 100644 --- a/substrate/client/network/types/src/peer_id.rs +++ b/substrate/client/network/types/src/peer_id.rs @@ -21,6 +21,7 @@ use crate::{ multihash::{Code, Error, Multihash}, }; use rand::Rng; +use serde_with::{DeserializeFromStr, SerializeDisplay}; use std::{fmt, hash::Hash, str::FromStr}; @@ -32,7 +33,9 @@ const MAX_INLINE_KEY_LENGTH: usize = 42; /// /// The data is a CIDv0 compatible multihash of the protobuf encoded public key of the peer /// as specified in [specs/peer-ids](https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md). -#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] +#[derive( + Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, SerializeDisplay, DeserializeFromStr, +)] pub struct PeerId { multihash: Multihash, }