diff --git a/bin/node-template/node/src/command.rs b/bin/node-template/node/src/command.rs index 75b88877aadfb..5e6b1f1eca204 100644 --- a/bin/node-template/node/src/command.rs +++ b/bin/node-template/node/src/command.rs @@ -67,7 +67,14 @@ pub fn run() -> sc_cli::Result<()> { match &cli.subcommand { Some(subcommand) => { let runner = cli.create_runner(subcommand)?; - runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0)) + runner.run_subcommand(subcommand, |config| { + let ( + client, _import_setup, _inherent_data_providers, _backend, _tasks_builder, + _keystore, import_queue, _select_chain, _transaction_pool, _background_tasks, + ) = new_full_start!(config); + + Ok((client, import_queue)) + }) } None => { let runner = cli.create_runner(&cli.run)?; diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 7c4a574f6be04..975cdf28987c7 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -5,7 +5,7 @@ use std::time::Duration; use sc_client::LongestChain; use sc_client_api::ExecutorProvider; use node_template_runtime::{self, opaque::Block, RuntimeApi}; -use sc_service::{error::{Error as ServiceError}, AbstractService, Configuration, ServiceBuilder}; +use sc_service::{error::{Error as ServiceError}, AbstractService, Configuration}; use sp_inherents::InherentDataProviders; use sc_executor::native_executor_instance; pub use sc_executor::NativeExecutor; @@ -26,45 +26,48 @@ native_executor_instance!( macro_rules! new_full_start { ($config:expr) => {{ use std::sync::Arc; - let mut import_setup = None; let inherent_data_providers = sp_inherents::InherentDataProviders::new(); - let builder = sc_service::ServiceBuilder::new_full::< + let (client, backend, keystore, tasks_builder) = sc_service::new_full_parts::< node_template_runtime::opaque::Block, node_template_runtime::RuntimeApi, crate::service::Executor - >($config)? - .with_select_chain(|_config, backend| { - Ok(sc_client::LongestChain::new(backend.clone())) - })? - .with_transaction_pool(|config, client, _fetcher| { - let pool_api = sc_transaction_pool::FullChainApi::new(client.clone()); - Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api))) - })? - .with_import_queue(|_config, client, mut select_chain, _transaction_pool| { - let select_chain = select_chain.take() - .ok_or_else(|| sc_service::Error::SelectChainRequired)?; - - let (grandpa_block_import, grandpa_link) = - sc_finality_grandpa::block_import(client.clone(), &(client.clone() as Arc<_>), select_chain)?; - - let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( - grandpa_block_import.clone(), client.clone(), - ); - - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( - sc_consensus_aura::slot_duration(&*client)?, - aura_block_import, - Some(Box::new(grandpa_block_import.clone())), - None, - client, - inherent_data_providers.clone(), - )?; - - import_setup = Some((grandpa_block_import, grandpa_link)); - - Ok(import_queue) - })?; - - (builder, import_setup, inherent_data_providers) + >(&$config)?; + let client = Arc::new(client); + let select_chain = sc_client::LongestChain::new(backend.clone()); + let pool_api = sc_transaction_pool::FullChainApi::new(Arc::clone(&client)); + let (transaction_pool, background_task_one) = sc_transaction_pool::BasicPool::new($config.transaction_pool.clone(), std::sync::Arc::new(pool_api)); + let transaction_pool = Arc::new(transaction_pool); + let mut background_tasks = Vec::new(); + + if let Some(bg_t) = background_task_one { + background_tasks.push(("txpool-background", bg_t)); + } + + let (import_queue, import_setup) = { + let (grandpa_block_import, grandpa_link) = + sc_finality_grandpa::block_import(Arc::clone(&client), &(Arc::clone(&client) as Arc<_>), select_chain.clone())?; + + let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( + grandpa_block_import.clone(), Arc::clone(&client), + ); + + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( + sc_consensus_aura::slot_duration(&*client)?, + aura_block_import, + Some(Box::new(grandpa_block_import.clone())), + None, + client.clone(), + inherent_data_providers.clone(), + )?; + + let import_setup = Some((grandpa_block_import, grandpa_link)); + + (import_queue, import_setup) + }; + + ( + client, import_setup, inherent_data_providers, backend, tasks_builder, keystore, + import_queue, select_chain, transaction_pool, background_tasks, + ) }} } @@ -77,19 +80,33 @@ pub fn new_full(config: Configuration) let name = config.network.node_name.clone(); let disable_grandpa = config.disable_grandpa; - let (builder, mut import_setup, inherent_data_providers) = new_full_start!(config); + let ( + client, mut import_setup, inherent_data_providers, backend, tasks_builder, keystore, + import_queue, select_chain, transaction_pool, background_tasks, + ) = new_full_start!(config); let (block_import, grandpa_link) = import_setup.take() .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); - let service = builder - .with_finality_proof_provider(|client, backend| { - // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider - let provider = client as Arc>; - Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _) - })? - .build()?; + let provider = client.clone() as Arc>; + let finality_proof_provider = Arc::new(GrandpaFinalityProofProvider::new(backend.clone(), provider)); + let service = sc_service::build( + config, + client, + backend, + tasks_builder, + keystore, + None, + Some(select_chain), + import_queue, + None, + Some(finality_proof_provider), + transaction_pool, + (), + None, + background_tasks, + )?; if role.is_authority() { let proposer = @@ -179,49 +196,57 @@ pub fn new_light(config: Configuration) { let inherent_data_providers = InherentDataProviders::new(); - ServiceBuilder::new_light::(config)? - .with_select_chain(|_config, backend| { - Ok(LongestChain::new(backend.clone())) - })? - .with_transaction_pool(|config, client, fetcher| { - let fetcher = fetcher - .ok_or_else(|| "Trying to start light transaction pool without active fetcher")?; - - let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); - let pool = sc_transaction_pool::BasicPool::with_revalidation_type( - config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light, - ); - Ok(pool) - })? - .with_import_queue_and_fprb(|_config, client, backend, fetcher, _select_chain, _tx_pool| { - let fetch_checker = fetcher - .map(|fetcher| fetcher.checker().clone()) - .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; - let grandpa_block_import = sc_finality_grandpa::light_block_import( - client.clone(), - backend, - &(client.clone() as Arc<_>), - Arc::new(fetch_checker), - )?; - let finality_proof_import = grandpa_block_import.clone(); - let finality_proof_request_builder = - finality_proof_import.create_finality_proof_request_builder(); - - let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( - sc_consensus_aura::slot_duration(&*client)?, - grandpa_block_import, - None, - Some(Box::new(finality_proof_import)), - client, - inherent_data_providers.clone(), - )?; - - Ok((import_queue, finality_proof_request_builder)) - })? - .with_finality_proof_provider(|client, backend| { - // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider - let provider = client as Arc>; - Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _) - })? - .build() + let ((client, backend, keystore, tasks_builder), fetcher, remote_blockchain) = sc_service::new_light_parts::< + Block, RuntimeApi, Executor + >(&config)?; + let client = Arc::new(client); + let select_chain = LongestChain::new(backend.clone()); + let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone()); + let (transaction_pool, background_task_one) = sc_transaction_pool::BasicPool::with_revalidation_type( + config.transaction_pool.clone(), Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light, + ); + let transaction_pool = Arc::new(transaction_pool); + let mut background_tasks = Vec::new(); + + if let Some(bg_t) = background_task_one { + background_tasks.push(("txpool-background", bg_t)); + } + let fetch_checker = fetcher.checker().clone(); + let grandpa_block_import = sc_finality_grandpa::light_block_import( + client.clone(), + backend.clone(), + &(client.clone() as Arc<_>), + Arc::new(fetch_checker), + )?; + let finality_proof_import = grandpa_block_import.clone(); + let finality_proof_request_builder = + finality_proof_import.create_finality_proof_request_builder(); + + let import_queue = sc_consensus_aura::import_queue::<_, _, _, AuraPair>( + sc_consensus_aura::slot_duration(&*client)?, + grandpa_block_import, + None, + Some(Box::new(finality_proof_import)), + client.clone(), + inherent_data_providers.clone(), + )?; + // GenesisAuthoritySetProvider is implemented for StorageAndProofProvider + let provider = client.clone() as Arc>; + let finality_proof_provider = Arc::new(GrandpaFinalityProofProvider::new(backend.clone(), provider)); + sc_service::build( + config, + client, + backend, + tasks_builder, + keystore, + None, + Some(select_chain), + import_queue, + Some(finality_proof_request_builder), + Some(finality_proof_provider), + transaction_pool, + (), + Some(remote_blockchain), + background_tasks, + ) } diff --git a/client/cli/src/commands/check_block_cmd.rs b/client/cli/src/commands/check_block_cmd.rs index ac4fe63da9506..d0bb5ddc72abb 100644 --- a/client/cli/src/commands/check_block_cmd.rs +++ b/client/cli/src/commands/check_block_cmd.rs @@ -18,12 +18,13 @@ use crate::error; use crate::params::ImportParams; use crate::params::SharedParams; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; +//use sc_service::{Configuration, ServiceBuilderCommand}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; use std::str::FromStr; use structopt::StructOpt; +use sc_service::check_block; /// The `check-block` command used to validate blocks. #[derive(Debug, StructOpt, Clone)] @@ -49,17 +50,18 @@ pub struct CheckBlockCmd { impl CheckBlockCmd { /// Run the check-block command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, + import_queue: IQ, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, + RA: Send + Sync + 'static, + ::Hash: std::str::FromStr, { let input = if self.input.starts_with("0x") { &self.input[2..] @@ -79,7 +81,7 @@ impl CheckBlockCmd { }; let start = std::time::Instant::now(); - builder(config)?.check_block(block_id).await?; + check_block(client, import_queue, block_id).await?; println!("Completed in {} ms.", start.elapsed().as_millis()); Ok(()) diff --git a/client/cli/src/commands/export_blocks_cmd.rs b/client/cli/src/commands/export_blocks_cmd.rs index 48abd409d6833..e59826e94c916 100644 --- a/client/cli/src/commands/export_blocks_cmd.rs +++ b/client/cli/src/commands/export_blocks_cmd.rs @@ -19,7 +19,7 @@ use crate::params::{BlockNumber, PruningParams, SharedParams}; use crate::CliConfiguration; use log::info; use sc_service::{ - config::DatabaseConfig, Configuration, ServiceBuilderCommand, + config::DatabaseConfig, //Configuration, ServiceBuilderCommand, }; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; @@ -27,6 +27,7 @@ use std::fs; use std::io; use std::path::PathBuf; use structopt::StructOpt; +use sc_service::export_blocks; /// The `export-blocks` command used to export blocks. #[derive(Debug, StructOpt, Clone)] @@ -62,21 +63,23 @@ pub struct ExportBlocksCmd { impl ExportBlocksCmd { /// Run the export-blocks command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + <<::Header as sp_runtime::traits::Header>::Number as std::str::FromStr>::Err: std::fmt::Debug, { + // TODO: should probably not be here + /* if let DatabaseConfig::Path { ref path, .. } = &config.database { info!("DB path: {}", path.display()); } + */ let from = self.from.as_ref().and_then(|f| f.parse().ok()).unwrap_or(1); let to = self.to.as_ref().and_then(|t| t.parse().ok()); @@ -88,8 +91,7 @@ impl ExportBlocksCmd { None => Box::new(io::stdout()), }; - builder(config)? - .export_blocks(file, from.into(), to, binary) + export_blocks(client, file, from.into(), to, binary) .await .map_err(Into::into) } diff --git a/client/cli/src/commands/import_blocks_cmd.rs b/client/cli/src/commands/import_blocks_cmd.rs index ce95640f469ce..c5a8f5f50e21f 100644 --- a/client/cli/src/commands/import_blocks_cmd.rs +++ b/client/cli/src/commands/import_blocks_cmd.rs @@ -18,13 +18,14 @@ use crate::error; use crate::params::ImportParams; use crate::params::SharedParams; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; +//use sc_service::{Configuration, ServiceBuilderCommand}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; use std::fs; use std::io::{self, Read, Seek}; use std::path::PathBuf; use structopt::StructOpt; +use sc_service::import_blocks; /// The `import-blocks` command used to import blocks. #[derive(Debug, StructOpt, Clone)] @@ -55,17 +56,17 @@ impl ReadPlusSeek for T {} impl ImportBlocksCmd { /// Run the import-blocks command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: std::sync::Arc>, + import_queue: IQ, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, { let file: Box = match &self.input { Some(filename) => Box::new(fs::File::open(filename)?), @@ -76,8 +77,7 @@ impl ImportBlocksCmd { } }; - builder(config)? - .import_blocks(file, false) + import_blocks(client, import_queue, file, false) .await .map_err(Into::into) } diff --git a/client/cli/src/commands/revert_cmd.rs b/client/cli/src/commands/revert_cmd.rs index f7629ff2f6357..200472a9e92fb 100644 --- a/client/cli/src/commands/revert_cmd.rs +++ b/client/cli/src/commands/revert_cmd.rs @@ -17,10 +17,11 @@ use crate::error; use crate::params::{BlockNumber, PruningParams, SharedParams}; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; +//use sc_service::{Configuration, ServiceBuilderCommand}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; use structopt::StructOpt; +use sc_service::revert_chain; /// The `revert` command used revert the chain to a previous state. #[derive(Debug, StructOpt, Clone)] @@ -40,16 +41,16 @@ pub struct RevertCmd { impl RevertCmd { /// Run the revert command - pub fn run(&self, config: Configuration, builder: B) -> error::Result<()> + pub fn run(&self, client: std::sync::Arc>) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + <<::Header as sp_runtime::traits::Header>::Number as std::str::FromStr>::Err: std::fmt::Debug, { let blocks = self.num.parse()?; - builder(config)?.revert_chain(blocks)?; + revert_chain(client, blocks)?; Ok(()) } diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index bd5dc7100ef02..98bf58d6e62d4 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -23,7 +23,7 @@ use futures::pin_mut; use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; -use sc_service::{AbstractService, Configuration, Role, ServiceBuilderCommand}; +use sc_service::{AbstractService, Configuration, Role}; //, ServiceBuilderCommand}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use std::fmt::Debug; @@ -161,27 +161,33 @@ impl Runner { /// A helper function that runs a future with tokio and stops if the process receives the signal /// `SIGTERM` or `SIGINT`. - pub fn run_subcommand(self, subcommand: &Subcommand, builder: B) -> Result<()> + pub fn run_subcommand(self, subcommand: &Subcommand, builder: BU) -> Result<()> where - B: FnOnce(Configuration) -> sc_service::error::Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: Debug, - ::Hash: std::str::FromStr, + BU: FnOnce(Configuration) -> sc_service::error::Result<(std::sync::Arc>, IQ)>, + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, + RA: Send + Sync + 'static, + ::Hash: std::str::FromStr, + <<::Header as sp_runtime::traits::Header>::Number as std::str::FromStr>::Err: std::fmt::Debug, { + let (client, import_queue) = builder(self.config)?; + match subcommand { - Subcommand::BuildSpec(cmd) => cmd.run(self.config), + //Subcommand::BuildSpec(cmd) => cmd.run(client), Subcommand::ExportBlocks(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + run_until_exit(self.tokio_runtime, cmd.run(client)) } Subcommand::ImportBlocks(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + run_until_exit(self.tokio_runtime, cmd.run(client, import_queue)) } Subcommand::CheckBlock(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + run_until_exit(self.tokio_runtime, cmd.run(client, import_queue)) } - Subcommand::Revert(cmd) => cmd.run(self.config, builder), - Subcommand::PurgeChain(cmd) => cmd.run(self.config), + Subcommand::Revert(cmd) => cmd.run(client), + //Subcommand::PurgeChain(cmd) => cmd.run(client), + _ => todo!(), } } diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 0586a61a89715..6e68f33ee2693 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +#![allow(unused_imports)] + use crate::{Service, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID, MallocSizeOfWasm}; use crate::{TaskManagerBuilder, start_rpc_servers, build_network_future, TransactionPoolAdapter}; use crate::status_sinks; @@ -56,43 +58,6 @@ use sp_blockchain; pub type BackgroundTask = Pin + Send>>; -/// Aggregator for the components required to build a service. -/// -/// # Usage -/// -/// Call [`ServiceBuilder::new_full`] or [`ServiceBuilder::new_light`], then call the various -/// `with_` methods to add the required components that you built yourself: -/// -/// - [`with_select_chain`](ServiceBuilder::with_select_chain) -/// - [`with_import_queue`](ServiceBuilder::with_import_queue) -/// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider) -/// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool) -/// -/// After this is done, call [`build`](ServiceBuilder::build) to construct the service. -/// -/// The order in which the `with_*` methods are called doesn't matter, as the correct binding of -/// generics is done when you call `build`. -/// -pub struct ServiceBuilder -{ - config: Configuration, - pub (crate) client: Arc, - backend: Arc, - tasks_builder: TaskManagerBuilder, - keystore: Arc>, - fetcher: Option, - select_chain: Option, - pub (crate) import_queue: TImpQu, - finality_proof_request_builder: Option, - finality_proof_provider: Option, - transaction_pool: Arc, - rpc_extensions: TRpc, - remote_backend: Option>>, - marker: PhantomData<(TBl, TRtApi)>, - background_tasks: Vec<(&'static str, BackgroundTask)>, -} - /// Full client type. pub type TFullClient = Client< TFullBackend, @@ -110,6 +75,13 @@ pub type TFullCallExecutor = sc_client::LocalCallExecutor< NativeExecutor, >; +type TFullParts = ( + TFullClient, + Arc>, + Arc>, + TaskManagerBuilder, +); + /// Light client type. pub type TLightClient = Client< TLightBackend, @@ -139,9 +111,9 @@ pub type TLightCallExecutor = sc_client::light::call_executor::G >, >; -type TFullParts = ( - TFullClient, - Arc>, +type TLightParts = ( + TLightClient, + Arc>, Arc>, TaskManagerBuilder, ); @@ -156,7 +128,7 @@ pub fn new_full_client( new_full_parts(config).map(|parts| parts.0) } -fn new_full_parts( +pub fn new_full_parts( config: &Configuration, ) -> Result, Error> where TBl: BlockT, @@ -224,489 +196,105 @@ fn new_full_parts( Ok((client, backend, keystore, tasks_builder)) } -impl ServiceBuilder<(), (), (), (), (), (), (), (), (), (), ()> { - /// Start the service builder with a configuration. - pub fn new_full( - config: Configuration, - ) -> Result, - Arc>, - (), - (), - BoxFinalityProofRequestBuilder, - Arc>, - (), - (), - TFullBackend, - >, Error> { - let (client, backend, keystore, tasks_builder) = new_full_parts(&config)?; - - let client = Arc::new(client); - - Ok(ServiceBuilder { - config, - client, - backend, - keystore, - tasks_builder, - fetcher: None, - select_chain: None, - import_queue: (), - finality_proof_request_builder: None, - finality_proof_provider: None, - transaction_pool: Arc::new(()), - rpc_extensions: Default::default(), - remote_backend: None, - background_tasks: Default::default(), - marker: PhantomData, - }) - } +/// Start the service builder with a configuration. +pub fn new_light_parts( + config: &Configuration, +) -> Result<(TLightParts, Arc>, Arc>), Error> { + let tasks_builder = TaskManagerBuilder::new(); - /// Start the service builder with a configuration. - pub fn new_light( - config: Configuration, - ) -> Result, - Arc>, - (), - (), - BoxFinalityProofRequestBuilder, - Arc>, - (), - (), - TLightBackend, - >, Error> { - let tasks_builder = TaskManagerBuilder::new(); - - let keystore = match &config.keystore { - KeystoreConfig::Path { path, password } => Keystore::open( - path.clone(), - password.clone() - )?, - KeystoreConfig::InMemory => Keystore::new_in_memory(), - }; + let keystore = match &config.keystore { + KeystoreConfig::Path { path, password } => Keystore::open( + path.clone(), + password.clone() + )?, + KeystoreConfig::InMemory => Keystore::new_in_memory(), + }; - let executor = NativeExecutor::::new( - config.wasm_method, - config.default_heap_pages, - config.max_runtime_instances, - ); + let executor = NativeExecutor::::new( + config.wasm_method, + config.default_heap_pages, + config.max_runtime_instances, + ); - let db_storage = { - let db_settings = sc_client_db::DatabaseSettings { - state_cache_size: config.state_cache_size, - state_cache_child_ratio: - config.state_cache_child_ratio.map(|v| (v, 100)), - pruning: config.pruning.clone(), - source: match &config.database { - DatabaseConfig::Path { path, cache_size } => - sc_client_db::DatabaseSettingsSrc::Path { - path: path.clone(), - cache_size: *cache_size, - }, - DatabaseConfig::Custom(db) => - sc_client_db::DatabaseSettingsSrc::Custom(db.clone()), - }, - }; - sc_client_db::light::LightStorage::new(db_settings)? + let db_storage = { + let db_settings = sc_client_db::DatabaseSettings { + state_cache_size: config.state_cache_size, + state_cache_child_ratio: + config.state_cache_child_ratio.map(|v| (v, 100)), + pruning: config.pruning.clone(), + source: match &config.database { + DatabaseConfig::Path { path, cache_size } => + sc_client_db::DatabaseSettingsSrc::Path { + path: path.clone(), + cache_size: *cache_size, + }, + DatabaseConfig::Custom(db) => + sc_client_db::DatabaseSettingsSrc::Custom(db.clone()), + }, }; - let light_blockchain = sc_client::light::new_light_blockchain(db_storage); - let fetch_checker = Arc::new( - sc_client::light::new_fetch_checker::<_, TBl, _>( - light_blockchain.clone(), - executor.clone(), - Box::new(tasks_builder.spawn_handle()), - ), - ); - let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker)); - let backend = sc_client::light::new_light_backend(light_blockchain); - let remote_blockchain = backend.remote_blockchain(); - let client = Arc::new(sc_client::light::new_light( - backend.clone(), - config.chain_spec.as_storage_builder(), - executor, + sc_client_db::light::LightStorage::new(db_settings)? + }; + let light_blockchain = sc_client::light::new_light_blockchain(db_storage); + let fetch_checker = Arc::new( + sc_client::light::new_fetch_checker::<_, TBl, _>( + light_blockchain.clone(), + executor.clone(), Box::new(tasks_builder.spawn_handle()), - config.prometheus_config.as_ref().map(|config| config.registry.clone()), - )?); - - Ok(ServiceBuilder { - config, - client, - backend, - tasks_builder, - keystore, - fetcher: Some(fetcher.clone()), - select_chain: None, - import_queue: (), - finality_proof_request_builder: None, - finality_proof_provider: None, - transaction_pool: Arc::new(()), - rpc_extensions: Default::default(), - remote_backend: Some(remote_blockchain), - background_tasks: Default::default(), - marker: PhantomData, - }) - } -} - -impl - ServiceBuilder { - - /// Returns a reference to the client that was stored in this builder. - pub fn client(&self) -> &Arc { - &self.client - } - - /// Returns a reference to the backend that was used in this builder. - pub fn backend(&self) -> &Arc { - &self.backend - } - - /// Returns a reference to the select-chain that was stored in this builder. - pub fn select_chain(&self) -> Option<&TSc> { - self.select_chain.as_ref() - } - - /// Returns a reference to the keystore - pub fn keystore(&self) -> Arc> { - self.keystore.clone() - } - - /// Returns a reference to the transaction pool stored in this builder - pub fn pool(&self) -> Arc { - self.transaction_pool.clone() - } - - /// Returns a reference to the fetcher, only available if builder - /// was created with `new_light`. - pub fn fetcher(&self) -> Option - where TFchr: Clone - { - self.fetcher.clone() - } - - /// Returns a reference to the remote_backend, only available if builder - /// was created with `new_light`. - pub fn remote_backend(&self) -> Option>> { - self.remote_backend.clone() - } - - /// Defines which head-of-chain strategy to use. - pub fn with_opt_select_chain( - self, - select_chain_builder: impl FnOnce( - &Configuration, &Arc, - ) -> Result, Error> - ) -> Result, Error> { - let select_chain = select_chain_builder(&self.config, &self.backend)?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - tasks_builder: self.tasks_builder, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - background_tasks: self.background_tasks, - marker: self.marker, - }) - } - - /// Defines which head-of-chain strategy to use. - pub fn with_select_chain( - self, - builder: impl FnOnce(&Configuration, &Arc) -> Result, - ) -> Result, Error> { - self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some)) - } - - /// Defines which import queue to use. - pub fn with_import_queue( - self, - builder: impl FnOnce(&Configuration, Arc, Option, Arc) - -> Result - ) -> Result, Error> - where TSc: Clone { - let import_queue = builder( - &self.config, - self.client.clone(), - self.select_chain.clone(), - self.transaction_pool.clone() - )?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - tasks_builder: self.tasks_builder, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - background_tasks: self.background_tasks, - marker: self.marker, - }) - } - - /// Defines which strategy to use for providing finality proofs. - pub fn with_opt_finality_proof_provider( - self, - builder: impl FnOnce(Arc, Arc) -> Result>>, Error> - ) -> Result>, - TExPool, - TRpc, - Backend, - >, Error> { - let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - tasks_builder: self.tasks_builder, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - background_tasks: self.background_tasks, - marker: self.marker, - }) - } - - /// Defines which strategy to use for providing finality proofs. - pub fn with_finality_proof_provider( - self, - build: impl FnOnce(Arc, Arc) -> Result>, Error> - ) -> Result>, - TExPool, - TRpc, - Backend, - >, Error> { - self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some)) - } - - /// Defines which import queue to use. - pub fn with_import_queue_and_opt_fprb( - self, - builder: impl FnOnce( - &Configuration, - Arc, - Arc, - Option, - Option, - Arc, - ) -> Result<(UImpQu, Option), Error> - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - let (import_queue, fprb) = builder( - &self.config, - self.client.clone(), - self.backend.clone(), - self.fetcher.clone(), - self.select_chain.clone(), - self.transaction_pool.clone() - )?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - tasks_builder: self.tasks_builder, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue, - finality_proof_request_builder: fprb, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - background_tasks: self.background_tasks, - marker: self.marker, - }) - } - - /// Defines which import queue to use. - pub fn with_import_queue_and_fprb( - self, - builder: impl FnOnce( - &Configuration, - Arc, - Arc, - Option, - Option, - Arc, - ) -> Result<(UImpQu, UFprb), Error> - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - self.with_import_queue_and_opt_fprb(|cfg, cl, b, f, sc, tx| - builder(cfg, cl, b, f, sc, tx) - .map(|(q, f)| (q, Some(f))) - ) - } - - /// Defines which transaction pool to use. - pub fn with_transaction_pool( - mut self, - transaction_pool_builder: impl FnOnce( - sc_transaction_pool::txpool::Options, - Arc, - Option, - ) -> Result<(UExPool, Option), Error> - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - let (transaction_pool, background_task) = transaction_pool_builder( - self.config.transaction_pool.clone(), - self.client.clone(), - self.fetcher.clone(), - )?; - - if let Some(background_task) = background_task{ - self.background_tasks.push(("txpool-background", background_task)); - } - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - tasks_builder: self.tasks_builder, - backend: self.backend, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: Arc::new(transaction_pool), - rpc_extensions: self.rpc_extensions, - remote_backend: self.remote_backend, - background_tasks: self.background_tasks, - marker: self.marker, - }) - } - - /// Defines the RPC extensions to use. - pub fn with_rpc_extensions( - self, - rpc_ext_builder: impl FnOnce(&Self) -> Result, - ) -> Result, Error> - where TSc: Clone, TFchr: Clone { - let rpc_extensions = rpc_ext_builder(&self)?; - - Ok(ServiceBuilder { - config: self.config, - client: self.client, - backend: self.backend, - tasks_builder: self.tasks_builder, - keystore: self.keystore, - fetcher: self.fetcher, - select_chain: self.select_chain, - import_queue: self.import_queue, - finality_proof_request_builder: self.finality_proof_request_builder, - finality_proof_provider: self.finality_proof_provider, - transaction_pool: self.transaction_pool, - rpc_extensions, - remote_backend: self.remote_backend, - background_tasks: self.background_tasks, - marker: self.marker, - }) - } -} - -/// Implemented on `ServiceBuilder`. Allows running block commands, such as import/export/validate -/// components to the builder. -pub trait ServiceBuilderCommand { - /// Block type this API operates on. - type Block: BlockT; - /// Native execution dispatch required by some commands. - type NativeDispatch: NativeExecutionDispatch + 'static; - /// Starts the process of importing blocks. - fn import_blocks( - self, - input: impl Read + Seek + Send + 'static, - force: bool, - ) -> Pin> + Send>>; - - /// Performs the blocks export. - fn export_blocks( - self, - output: impl Write + 'static, - from: NumberFor, - to: Option>, - binary: bool - ) -> Pin>>>; - - /// Performs a revert of `blocks` blocks. - fn revert_chain( - &self, - blocks: NumberFor - ) -> Result<(), Error>; - - /// Re-validate known block. - fn check_block( - self, - block: BlockId - ) -> Pin> + Send>>; + ), + ); + let fetcher = Arc::new(sc_network::config::OnDemand::new(fetch_checker)); + let backend = sc_client::light::new_light_backend(light_blockchain); + let remote_blockchain = backend.remote_blockchain(); + let client = sc_client::light::new_light( + backend.clone(), + config.chain_spec.as_storage_builder(), + executor, + Box::new(tasks_builder.spawn_handle()), + config.prometheus_config.as_ref().map(|config| config.registry.clone()), + )?; + + Ok(((client, backend, keystore, tasks_builder), fetcher, remote_blockchain)) } -impl -ServiceBuilder< +/// Builds the service. +pub fn build< TBl, TRtApi, - Client, - Arc>, TSc, TImpQu, - BoxFinalityProofRequestBuilder, - Arc>, TExPool, TRpc, TBackend, -> where + TExec, +>( + mut config: Configuration, + client: Arc,>, + backend: Arc, + tasks_builder: TaskManagerBuilder, + keystore: Arc>, + on_demand: Option>>, + select_chain: Option, + import_queue: TImpQu, + finality_proof_request_builder: Option>, + finality_proof_provider: Option>>, + transaction_pool: Arc, + rpc_extensions: TRpc, + remote_backend: Option>>, + background_tasks: Vec<(&'static str, BackgroundTask)>, +) -> Result, + TSc, + NetworkStatus, + NetworkService::Hash>, + TExPool, + sc_offchain::OffchainWorkers< + Client, + TBackend::OffchainStorage, + TBl + >, +>, Error> +where Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: sp_api::Metadata + @@ -723,448 +311,407 @@ ServiceBuilder< TImpQu: 'static + ImportQueue, TExPool: MaintainedTransactionPool::Hash> + MallocSizeOfWasm + 'static, TRpc: sc_rpc::RpcExtension + Clone, + TExec: CallExecutor, { + sp_session::generate_initial_session_keys( + client.clone(), + &BlockId::Hash(client.chain_info().best_hash), + config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), + )?; - /// Set an ExecutionExtensionsFactory - pub fn with_execution_extensions_factory(self, execution_extensions_factory: Box) -> Result { - self.client.execution_extensions().set_extensions_factory(execution_extensions_factory); - Ok(self) - } + // A side-channel for essential tasks to communicate shutdown. + let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); - /// Builds the service. - pub fn build(self) -> Result, - TSc, - NetworkStatus, - NetworkService::Hash>, - TExPool, - sc_offchain::OffchainWorkers< - Client, - TBackend::OffchainStorage, - TBl - >, - >, Error> - where TExec: CallExecutor, - { - let ServiceBuilder { - marker: _, - mut config, - client, - tasks_builder, - fetcher: on_demand, - backend, - keystore, - select_chain, - import_queue, - finality_proof_request_builder, - finality_proof_provider, - transaction_pool, - rpc_extensions, - remote_backend, - background_tasks, - } = self; - - sp_session::generate_initial_session_keys( - client.clone(), - &BlockId::Hash(client.chain_info().best_hash), - config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), - )?; + let import_queue = Box::new(import_queue); + let chain_info = client.chain_info(); + let chain_spec = &config.chain_spec; - // A side-channel for essential tasks to communicate shutdown. - let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); + let version = config.impl_version; + info!("📦 Highest known block at #{}", chain_info.best_number); + telemetry!( + SUBSTRATE_INFO; + "node.start"; + "height" => chain_info.best_number.saturated_into::(), + "best" => ?chain_info.best_hash + ); - let import_queue = Box::new(import_queue); - let chain_info = client.chain_info(); - let chain_spec = &config.chain_spec; + // make transaction pool available for off-chain runtime calls. + client.execution_extensions() + .register_transaction_pool(Arc::downgrade(&transaction_pool) as _); + + let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { + imports_external_transactions: !matches!(config.role, Role::Light), + pool: transaction_pool.clone(), + client: client.clone(), + executor: tasks_builder.spawn_handle(), + }); + + let protocol_id = { + let protocol_id_full = match chain_spec.protocol_id() { + Some(pid) => pid, + None => { + warn!("Using default protocol ID {:?} because none is configured in the \ + chain specs", DEFAULT_PROTOCOL_ID + ); + DEFAULT_PROTOCOL_ID + } + }.as_bytes(); + sc_network::config::ProtocolId::from(protocol_id_full) + }; - let version = config.impl_version; - info!("📦 Highest known block at #{}", chain_info.best_number); - telemetry!( - SUBSTRATE_INFO; - "node.start"; - "height" => chain_info.best_number.saturated_into::(), - "best" => ?chain_info.best_hash - ); + let block_announce_validator = + Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator::new(client.clone())); + + let network_params = sc_network::config::Params { + role: config.role.clone(), + executor: { + let spawn_handle = tasks_builder.spawn_handle(); + Some(Box::new(move |fut| { + spawn_handle.spawn("libp2p-node", fut); + })) + }, + network_config: config.network.clone(), + chain: client.clone(), + finality_proof_provider, + finality_proof_request_builder, + on_demand: on_demand.clone(), + transaction_pool: transaction_pool_adapter.clone() as _, + import_queue, + protocol_id, + block_announce_validator, + metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()) + }; + + let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); + let network_mut = sc_network::NetworkWorker::new(network_params)?; + let network = network_mut.service().clone(); + let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + + let offchain_storage = backend.offchain_storage(); + let offchain_workers = match (config.offchain_worker, offchain_storage.clone()) { + (true, Some(db)) => { + Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db))) + }, + (true, None) => { + warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); + None + }, + _ => None, + }; - // make transaction pool available for off-chain runtime calls. - client.execution_extensions() - .register_transaction_pool(Arc::downgrade(&transaction_pool) as _); + let spawn_handle = tasks_builder.spawn_handle(); - let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { - imports_external_transactions: !matches!(config.role, Role::Light), - pool: transaction_pool.clone(), - client: client.clone(), - executor: tasks_builder.spawn_handle(), - }); + // Spawn background tasks which were stacked during the + // service building. + for (title, background_task) in background_tasks { + spawn_handle.spawn(title, background_task); + } + + { + // block notifications + let txpool = Arc::downgrade(&transaction_pool); + let offchain = offchain_workers.as_ref().map(Arc::downgrade); + let notifications_spawn_handle = tasks_builder.spawn_handle(); + let network_state_info: Arc = network.clone(); + let is_validator = config.role.is_authority(); + + let (import_stream, finality_stream) = ( + client.import_notification_stream().map(|n| ChainEvent::NewBlock { + id: BlockId::Hash(n.hash), + header: n.header, + retracted: n.retracted, + is_new_best: n.is_new_best, + }), + client.finality_notification_stream().map(|n| ChainEvent::Finalized { + hash: n.hash + }) + ); + let events = futures::stream::select(import_stream, finality_stream) + .for_each(move |event| { + // offchain worker is only interested in block import events + if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { + let offchain = offchain.as_ref().and_then(|o| o.upgrade()); + match offchain { + Some(offchain) if is_new_best => { + notifications_spawn_handle.spawn( + "offchain-on-block", + offchain.on_block_imported( + &header, + network_state_info.clone(), + is_validator, + ), + ); + }, + Some(_) => log::debug!( + target: "sc_offchain", + "Skipping offchain workers for non-canon block: {:?}", + header, + ), + _ => {}, + } + }; - let protocol_id = { - let protocol_id_full = match chain_spec.protocol_id() { - Some(pid) => pid, - None => { - warn!("Using default protocol ID {:?} because none is configured in the \ - chain specs", DEFAULT_PROTOCOL_ID + let txpool = txpool.upgrade(); + if let Some(txpool) = txpool.as_ref() { + notifications_spawn_handle.spawn( + "txpool-maintain", + txpool.maintain(event), ); - DEFAULT_PROTOCOL_ID } - }.as_bytes(); - sc_network::config::ProtocolId::from(protocol_id_full) - }; - let block_announce_validator = - Box::new(sp_consensus::block_validation::DefaultBlockAnnounceValidator::new(client.clone())); + ready(()) + }); - let network_params = sc_network::config::Params { - role: config.role.clone(), - executor: { - let spawn_handle = tasks_builder.spawn_handle(); - Some(Box::new(move |fut| { - spawn_handle.spawn("libp2p-node", fut); - })) - }, - network_config: config.network.clone(), - chain: client.clone(), - finality_proof_provider, - finality_proof_request_builder, - on_demand: on_demand.clone(), - transaction_pool: transaction_pool_adapter.clone() as _, - import_queue, - protocol_id, - block_announce_validator, - metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()) - }; + spawn_handle.spawn( + "txpool-and-offchain-notif", + events, + ); + } - let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); - let network_mut = sc_network::NetworkWorker::new(network_params)?; - let network = network_mut.service().clone(); - let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + { + // extrinsic notifications + let network = Arc::downgrade(&network); + let transaction_pool_ = transaction_pool.clone(); + let events = transaction_pool.import_notification_stream() + .for_each(move |hash| { + if let Some(network) = network.upgrade() { + network.propagate_extrinsic(hash); + } + let status = transaction_pool_.status(); + telemetry!(SUBSTRATE_INFO; "txpool.import"; + "ready" => status.ready, + "future" => status.future + ); + ready(()) + }); - let offchain_storage = backend.offchain_storage(); - let offchain_workers = match (config.offchain_worker, offchain_storage.clone()) { - (true, Some(db)) => { - Some(Arc::new(sc_offchain::OffchainWorkers::new(client.clone(), db))) - }, - (true, None) => { - warn!("Offchain workers disabled, due to lack of offchain storage support in backend."); - None - }, - _ => None, - }; + spawn_handle.spawn( + "telemetry-on-block", + events, + ); + } - let spawn_handle = tasks_builder.spawn_handle(); + // Prometheus metrics. + let mut metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() { + // Set static metrics. - // Spawn background tasks which were stacked during the - // service building. - for (title, background_task) in background_tasks { - spawn_handle.spawn(title, background_task); - } - { - // block notifications - let txpool = Arc::downgrade(&transaction_pool); - let offchain = offchain_workers.as_ref().map(Arc::downgrade); - let notifications_spawn_handle = tasks_builder.spawn_handle(); - let network_state_info: Arc = network.clone(); - let is_validator = config.role.is_authority(); - - let (import_stream, finality_stream) = ( - client.import_notification_stream().map(|n| ChainEvent::NewBlock { - id: BlockId::Hash(n.hash), - header: n.header, - retracted: n.retracted, - is_new_best: n.is_new_best, - }), - client.finality_notification_stream().map(|n| ChainEvent::Finalized { - hash: n.hash - }) - ); - let events = futures::stream::select(import_stream, finality_stream) - .for_each(move |event| { - // offchain worker is only interested in block import events - if let ChainEvent::NewBlock { ref header, is_new_best, .. } = event { - let offchain = offchain.as_ref().and_then(|o| o.upgrade()); - match offchain { - Some(offchain) if is_new_best => { - notifications_spawn_handle.spawn( - "offchain-on-block", - offchain.on_block_imported( - &header, - network_state_info.clone(), - is_validator, - ), - ); - }, - Some(_) => log::debug!( - target: "sc_offchain", - "Skipping offchain workers for non-canon block: {:?}", - header, - ), - _ => {}, - } - }; - - let txpool = txpool.upgrade(); - if let Some(txpool) = txpool.as_ref() { - notifications_spawn_handle.spawn( - "txpool-maintain", - txpool.maintain(event), - ); - } + let role_bits = match config.role { + Role::Full => 1u64, + Role::Light => 2u64, + Role::Sentry { .. } => 3u64, + Role::Authority { .. } => 4u64, + }; + let metrics = MetricsService::with_prometheus( + ®istry, + &config.network.node_name, + &config.impl_version, + role_bits, + )?; + spawn_handle.spawn( + "prometheus-endpoint", + prometheus_endpoint::init_prometheus(port, registry).map(drop) + ); - ready(()) - }); + metrics + } else { + MetricsService::new() + }; - spawn_handle.spawn( - "txpool-and-offchain-notif", - events, - ); - } + // Periodically notify the telemetry. + let transaction_pool_ = transaction_pool.clone(); + let client_ = client.clone(); + let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); + network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); + let tel_task = state_rx.for_each(move |(net_status, _)| { + let info = client_.usage_info(); + metrics_service.tick( + &info, + &transaction_pool_.status(), + &net_status, + ); + ready(()) + }); - { - // extrinsic notifications - let network = Arc::downgrade(&network); - let transaction_pool_ = transaction_pool.clone(); - let events = transaction_pool.import_notification_stream() - .for_each(move |hash| { - if let Some(network) = network.upgrade() { - network.propagate_extrinsic(hash); - } - let status = transaction_pool_.status(); - telemetry!(SUBSTRATE_INFO; "txpool.import"; - "ready" => status.ready, - "future" => status.future - ); - ready(()) - }); + spawn_handle.spawn( + "telemetry-periodic-send", + tel_task, + ); - spawn_handle.spawn( - "telemetry-on-block", - events, - ); - } + // Periodically send the network state to the telemetry. + let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); + network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); + let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { + telemetry!( + SUBSTRATE_INFO; + "system.network_state"; + "state" => network_state, + ); + ready(()) + }); + spawn_handle.spawn( + "telemetry-periodic-network-state", + tel_task_2, + ); - // Prometheus metrics. - let mut metrics_service = if let Some(PrometheusConfig { port, registry }) = config.prometheus_config.clone() { - // Set static metrics. - - - let role_bits = match config.role { - Role::Full => 1u64, - Role::Light => 2u64, - Role::Sentry { .. } => 3u64, - Role::Authority { .. } => 4u64, - }; - let metrics = MetricsService::with_prometheus( - ®istry, - &config.network.node_name, - &config.impl_version, - role_bits, - )?; - spawn_handle.spawn( - "prometheus-endpoint", - prometheus_endpoint::init_prometheus(port, registry).map(drop) - ); + // RPC + let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); + let gen_handler = || { + use sc_rpc::{chain, state, author, system, offchain}; - metrics - } else { - MetricsService::new() + let system_info = sc_rpc::system::SystemInfo { + chain_name: chain_spec.name().into(), + impl_name: config.impl_name.into(), + impl_version: config.impl_version.into(), + properties: chain_spec.properties().clone(), }; - // Periodically notify the telemetry. - let transaction_pool_ = transaction_pool.clone(); - let client_ = client.clone(); - let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); - network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); - let tel_task = state_rx.for_each(move |(net_status, _)| { - let info = client_.usage_info(); - metrics_service.tick( - &info, - &transaction_pool_.status(), - &net_status, - ); - ready(()) - }); + let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle())); - spawn_handle.spawn( - "telemetry-periodic-send", - tel_task, - ); - - // Periodically send the network state to the telemetry. - let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); - network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); - let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { - telemetry!( - SUBSTRATE_INFO; - "system.network_state"; - "state" => network_state, + let (chain, state) = if let (Some(remote_backend), Some(on_demand)) = + (remote_backend.as_ref(), on_demand.as_ref()) { + // Light clients + let chain = sc_rpc::chain::new_light( + client.clone(), + subscriptions.clone(), + remote_backend.clone(), + on_demand.clone() ); - ready(()) - }); - spawn_handle.spawn( - "telemetry-periodic-network-state", - tel_task_2, - ); + let state = sc_rpc::state::new_light( + client.clone(), + subscriptions.clone(), + remote_backend.clone(), + on_demand.clone() + ); + (chain, state) - // RPC - let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); - let gen_handler = || { - use sc_rpc::{chain, state, author, system, offchain}; - - let system_info = sc_rpc::system::SystemInfo { - chain_name: chain_spec.name().into(), - impl_name: config.impl_name.into(), - impl_version: config.impl_version.into(), - properties: chain_spec.properties().clone(), - }; - - let subscriptions = sc_rpc::Subscriptions::new(Arc::new(tasks_builder.spawn_handle())); - - let (chain, state) = if let (Some(remote_backend), Some(on_demand)) = - (remote_backend.as_ref(), on_demand.as_ref()) { - // Light clients - let chain = sc_rpc::chain::new_light( - client.clone(), - subscriptions.clone(), - remote_backend.clone(), - on_demand.clone() - ); - let state = sc_rpc::state::new_light( - client.clone(), - subscriptions.clone(), - remote_backend.clone(), - on_demand.clone() - ); - (chain, state) + } else { + // Full nodes + let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone()); + let state = sc_rpc::state::new_full(client.clone(), subscriptions.clone()); + (chain, state) + }; - } else { - // Full nodes - let chain = sc_rpc::chain::new_full(client.clone(), subscriptions.clone()); - let state = sc_rpc::state::new_full(client.clone(), subscriptions.clone()); - (chain, state) - }; + let author = sc_rpc::author::Author::new( + client.clone(), + transaction_pool.clone(), + subscriptions, + keystore.clone(), + ); + let system = system::System::new(system_info, system_rpc_tx.clone()); - let author = sc_rpc::author::Author::new( - client.clone(), - transaction_pool.clone(), - subscriptions, - keystore.clone(), - ); - let system = system::System::new(system_info, system_rpc_tx.clone()); - - match offchain_storage.clone() { - Some(storage) => { - let offchain = sc_rpc::offchain::Offchain::new(storage); - sc_rpc_server::rpc_handler(( - state::StateApi::to_delegate(state), - chain::ChainApi::to_delegate(chain), - offchain::OffchainApi::to_delegate(offchain), - author::AuthorApi::to_delegate(author), - system::SystemApi::to_delegate(system), - rpc_extensions.clone(), - )) - }, - None => sc_rpc_server::rpc_handler(( + match offchain_storage.clone() { + Some(storage) => { + let offchain = sc_rpc::offchain::Offchain::new(storage); + sc_rpc_server::rpc_handler(( state::StateApi::to_delegate(state), chain::ChainApi::to_delegate(chain), + offchain::OffchainApi::to_delegate(offchain), author::AuthorApi::to_delegate(author), system::SystemApi::to_delegate(system), rpc_extensions.clone(), )) - } - }; - let rpc_handlers = gen_handler(); - let rpc = start_rpc_servers(&config, gen_handler)?; - - spawn_handle.spawn( - "network-worker", - build_network_future( - config.role.clone(), - network_mut, - client.clone(), - network_status_sinks.clone(), - system_rpc_rx, - has_bootnodes, - config.announce_block, - ), - ); + }, + None => sc_rpc_server::rpc_handler(( + state::StateApi::to_delegate(state), + chain::ChainApi::to_delegate(chain), + author::AuthorApi::to_delegate(author), + system::SystemApi::to_delegate(system), + rpc_extensions.clone(), + )) + } + }; + let rpc_handlers = gen_handler(); + let rpc = start_rpc_servers(&config, gen_handler)?; + + spawn_handle.spawn( + "network-worker", + build_network_future( + config.role.clone(), + network_mut, + client.clone(), + network_status_sinks.clone(), + system_rpc_rx, + has_bootnodes, + config.announce_block, + ), + ); - let telemetry_connection_sinks: Arc>>> = Default::default(); - - // Telemetry - let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { - let is_authority = config.role.is_authority(); - let network_id = network.local_peer_id().to_base58(); - let name = config.network.node_name.clone(); - let impl_name = config.impl_name.to_owned(); - let version = version.clone(); - let chain_name = config.chain_spec.name().to_owned(); - let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); - let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig { - endpoints, - wasm_external_transport: config.telemetry_external_transport.take(), - }); - let startup_time = SystemTime::UNIX_EPOCH.elapsed() - .map(|dur| dur.as_millis()) - .unwrap_or(0); - let future = telemetry.clone() - .for_each(move |event| { - // Safe-guard in case we add more events in the future. - let sc_telemetry::TelemetryEvent::Connected = event; - - telemetry!(SUBSTRATE_INFO; "system.connected"; - "name" => name.clone(), - "implementation" => impl_name.clone(), - "version" => version.clone(), - "config" => "", - "chain" => chain_name.clone(), - "authority" => is_authority, - "startup_time" => startup_time, - "network_id" => network_id.clone() - ); + let telemetry_connection_sinks: Arc>>> = Default::default(); + + // Telemetry + let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { + let is_authority = config.role.is_authority(); + let network_id = network.local_peer_id().to_base58(); + let name = config.network.node_name.clone(); + let impl_name = config.impl_name.to_owned(); + let version = version.clone(); + let chain_name = config.chain_spec.name().to_owned(); + let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); + let telemetry = sc_telemetry::init_telemetry(sc_telemetry::TelemetryConfig { + endpoints, + wasm_external_transport: config.telemetry_external_transport.take(), + }); + let startup_time = SystemTime::UNIX_EPOCH.elapsed() + .map(|dur| dur.as_millis()) + .unwrap_or(0); + let future = telemetry.clone() + .for_each(move |event| { + // Safe-guard in case we add more events in the future. + let sc_telemetry::TelemetryEvent::Connected = event; + + telemetry!(SUBSTRATE_INFO; "system.connected"; + "name" => name.clone(), + "implementation" => impl_name.clone(), + "version" => version.clone(), + "config" => "", + "chain" => chain_name.clone(), + "authority" => is_authority, + "startup_time" => startup_time, + "network_id" => network_id.clone() + ); - telemetry_connection_sinks_.lock().retain(|sink| { - sink.unbounded_send(()).is_ok() - }); - ready(()) + telemetry_connection_sinks_.lock().retain(|sink| { + sink.unbounded_send(()).is_ok() }); + ready(()) + }); - spawn_handle.spawn( - "telemetry-worker", - future, - ); + spawn_handle.spawn( + "telemetry-worker", + future, + ); - telemetry - }); + telemetry + }); - // Instrumentation - if let Some(tracing_targets) = config.tracing_targets.as_ref() { - let subscriber = sc_tracing::ProfilingSubscriber::new( - config.tracing_receiver, tracing_targets - ); - match tracing::subscriber::set_global_default(subscriber) { - Ok(_) => (), - Err(e) => error!(target: "tracing", "Unable to set global default subscriber {}", e), - } + // Instrumentation + if let Some(tracing_targets) = config.tracing_targets.as_ref() { + let subscriber = sc_tracing::ProfilingSubscriber::new( + config.tracing_receiver, tracing_targets + ); + match tracing::subscriber::set_global_default(subscriber) { + Ok(_) => (), + Err(e) => error!(target: "tracing", "Unable to set global default subscriber {}", e), } - - Ok(Service { - client, - task_manager: tasks_builder.into_task_manager(config.task_executor), - network, - network_status_sinks, - select_chain, - transaction_pool, - essential_failed_tx, - essential_failed_rx, - rpc_handlers, - _rpc: rpc, - _telemetry: telemetry, - _offchain_workers: offchain_workers, - _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), - keystore, - marker: PhantomData::, - prometheus_registry: config.prometheus_config.map(|config| config.registry) - }) } + + Ok(Service { + client, + task_manager: tasks_builder.into_task_manager(config.task_executor), + network, + network_status_sinks, + select_chain, + transaction_pool, + essential_failed_tx, + essential_failed_rx, + rpc_handlers, + _rpc: rpc, + _telemetry: telemetry, + _offchain_workers: offchain_workers, + _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), + keystore, + marker: PhantomData::, + prometheus_registry: config.prometheus_config.map(|config| config.registry) + }) } diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs index 12fae3224108a..9c76a15d130d8 100644 --- a/client/service/src/chain_ops.rs +++ b/client/service/src/chain_ops.rs @@ -16,8 +16,10 @@ //! Chain utilities. +#![allow(unused_imports)] + use crate::error; -use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; +//use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; use crate::error::Error; use sc_chain_spec::ChainSpec; use log::{warn, info}; @@ -36,264 +38,9 @@ use sc_executor::{NativeExecutor, NativeExecutionDispatch}; use std::{io::{Read, Write, Seek}, pin::Pin}; use sc_client_api::BlockBackend; +use std::sync::Arc; /// Build a chain spec json pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result { Ok(spec.as_json(raw)?) } - -impl< - TBl, TRtApi, TBackend, - TExecDisp, TFchr, TSc, TImpQu, TFprb, TFpp, - TExPool, TRpc, Backend -> ServiceBuilderCommand for ServiceBuilder< - TBl, TRtApi, - Client>, TBl, TRtApi>, - TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend -> where - TBl: BlockT, - TBackend: 'static + sc_client_api::backend::Backend + Send, - TExecDisp: 'static + NativeExecutionDispatch, - TImpQu: 'static + ImportQueue, - TRtApi: 'static + Send + Sync, -{ - type Block = TBl; - type NativeDispatch = TExecDisp; - - fn import_blocks( - self, - input: impl Read + Seek + Send + 'static, - force: bool, - ) -> Pin> + Send>> { - struct WaitLink { - imported_blocks: u64, - has_error: bool, - } - - impl WaitLink { - fn new() -> WaitLink { - WaitLink { - imported_blocks: 0, - has_error: false, - } - } - } - - impl Link for WaitLink { - fn blocks_processed( - &mut self, - imported: usize, - _count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)> - ) { - self.imported_blocks += imported as u64; - - for result in results { - if let (Err(err), hash) = result { - warn!("There was an error importing block with hash {:?}: {:?}", hash, err); - self.has_error = true; - break; - } - } - } - } - - let client = self.client; - let mut queue = self.import_queue; - - let mut io_reader_input = IoReader(input); - let mut count = None::; - let mut read_block_count = 0; - let mut link = WaitLink::new(); - - // Importing blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we read a block from the input or import a bunch of blocks from the import - // queue, the `Future` re-schedules itself and returns `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block imports, - // or to stop the operation completely. - let import = future::poll_fn(move |cx| { - // Start by reading the number of blocks if not done so already. - let count = match count { - Some(c) => c, - None => { - let c: u64 = match Decode::decode(&mut io_reader_input) { - Ok(c) => c, - Err(err) => { - let err = format!("Error reading file: {}", err); - return std::task::Poll::Ready(Err(From::from(err))); - }, - }; - info!("📦 Importing {} blocks", c); - count = Some(c); - c - } - }; - - // Read blocks from the input. - if read_block_count < count { - match SignedBlock::::decode(&mut io_reader_input) { - Ok(signed) => { - let (header, extrinsics) = signed.block.deconstruct(); - let hash = header.hash(); - // import queue handles verification and importing it into the client - queue.import_blocks(BlockOrigin::File, vec![ - IncomingBlock:: { - hash, - header: Some(header), - body: Some(extrinsics), - justification: signed.justification, - origin: None, - allow_missing_state: false, - import_existing: force, - } - ]); - } - Err(e) => { - warn!("Error reading block data at {}: {}", read_block_count, e); - return std::task::Poll::Ready(Ok(())); - } - } - - read_block_count += 1; - if read_block_count % 1000 == 0 { - info!("#{} blocks were added to the queue", read_block_count); - } - - cx.waker().wake_by_ref(); - return std::task::Poll::Pending; - } - - let blocks_before = link.imported_blocks; - queue.poll_actions(cx, &mut link); - - if link.has_error { - info!( - "Stopping after #{} blocks because of an error", - link.imported_blocks, - ); - return std::task::Poll::Ready(Ok(())); - } - - if link.imported_blocks / 1000 != blocks_before / 1000 { - info!( - "#{} blocks were imported (#{} left)", - link.imported_blocks, - count - link.imported_blocks - ); - } - - if link.imported_blocks >= count { - info!("🎉 Imported {} blocks. Best: #{}", read_block_count, client.chain_info().best_number); - return std::task::Poll::Ready(Ok(())); - - } else { - // Polling the import queue will re-schedule the task when ready. - return std::task::Poll::Pending; - } - }); - Box::pin(import) - } - - fn export_blocks( - self, - mut output: impl Write + 'static, - from: NumberFor, - to: Option>, - binary: bool - ) -> Pin>>> { - let client = self.client; - let mut block = from; - - let last = match to { - Some(v) if v.is_zero() => One::one(), - Some(v) => v, - None => client.chain_info().best_number, - }; - - let mut wrote_header = false; - - // Exporting blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we write a block to the output, the `Future` re-schedules itself and returns - // `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block exports, - // or to stop the operation completely. - let export = future::poll_fn(move |cx| { - if last < block { - return std::task::Poll::Ready(Err("Invalid block range specified".into())); - } - - if !wrote_header { - info!("Exporting blocks from #{} to #{}", block, last); - if binary { - let last_: u64 = last.saturated_into::(); - let block_: u64 = block.saturated_into::(); - let len: u64 = last_ - block_ + 1; - output.write_all(&len.encode())?; - } - wrote_header = true; - } - - match client.block(&BlockId::number(block))? { - Some(block) => { - if binary { - output.write_all(&block.encode())?; - } else { - serde_json::to_writer(&mut output, &block) - .map_err(|e| format!("Error writing JSON: {}", e))?; - } - }, - // Reached end of the chain. - None => return std::task::Poll::Ready(Ok(())), - } - if (block % 10000.into()).is_zero() { - info!("#{}", block); - } - if block == last { - return std::task::Poll::Ready(Ok(())); - } - block += One::one(); - - // Re-schedule the task in order to continue the operation. - cx.waker().wake_by_ref(); - std::task::Poll::Pending - }); - - Box::pin(export) - } - - fn revert_chain( - &self, - blocks: NumberFor - ) -> Result<(), Error> { - let reverted = self.client.revert(blocks)?; - let info = self.client.chain_info(); - - if reverted.is_zero() { - info!("There aren't any non-finalized blocks to revert."); - } else { - info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); - } - Ok(()) - } - - fn check_block( - self, - block_id: BlockId - ) -> Pin> + Send>> { - match self.client.block(&block_id) { - Ok(Some(block)) => { - let mut buf = Vec::new(); - 1u64.encode_to(&mut buf); - block.encode_to(&mut buf); - let reader = std::io::Cursor::new(buf); - self.import_blocks(reader, true) - } - Ok(None) => Box::pin(future::err("Unknown block".into())), - Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), - } - } -} diff --git a/client/service/src/check_block.rs b/client/service/src/check_block.rs new file mode 100644 index 0000000000000..c4a101b832734 --- /dev/null +++ b/client/service/src/check_block.rs @@ -0,0 +1,67 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain utilities. + +#![allow(unused_imports)] + +use crate::error; +//use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; +use crate::error::Error; +use sc_chain_spec::ChainSpec; +use log::{warn, info}; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion +}; +use sp_runtime::generic::{BlockId, SignedBlock}; +use codec::{Decode, Encode, IoReader}; +use sc_client::{Client, LocalCallExecutor}; +use sp_consensus::{ + BlockOrigin, + import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, +}; +use sc_executor::{NativeExecutor, NativeExecutionDispatch}; + +use std::{io::{Read, Write, Seek}, pin::Pin}; +use sc_client_api::BlockBackend; +use std::sync::Arc; +use crate::import_blocks::import_blocks; + +pub fn check_block( + client: Arc>, + import_queue: IQ, + block_id: BlockId +) -> Pin> + Send>> +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: ImportQueue + 'static, + RA: Send + Sync + 'static, +{ + match client.block(&block_id) { + Ok(Some(block)) => { + let mut buf = Vec::new(); + 1u64.encode_to(&mut buf); + block.encode_to(&mut buf); + let reader = std::io::Cursor::new(buf); + import_blocks(client, import_queue, reader, true) + } + Ok(None) => Box::pin(future::err("Unknown block".into())), + Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), + } +} diff --git a/client/service/src/export_blocks.rs b/client/service/src/export_blocks.rs new file mode 100644 index 0000000000000..e8744e5dd8953 --- /dev/null +++ b/client/service/src/export_blocks.rs @@ -0,0 +1,118 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain utilities. + +#![allow(unused_imports)] + +use crate::error; +//use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; +use crate::error::Error; +use sc_chain_spec::ChainSpec; +use log::{warn, info}; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion +}; +use sp_runtime::generic::{BlockId, SignedBlock}; +use codec::{Decode, Encode, IoReader}; +use sc_client::{Client, LocalCallExecutor}; +use sp_consensus::{ + BlockOrigin, + import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, +}; +use sc_executor::{NativeExecutor, NativeExecutionDispatch}; + +use std::{io::{Read, Write, Seek}, pin::Pin}; +use sc_client_api::BlockBackend; +use std::sync::Arc; + +pub fn export_blocks( + client: Arc>, + mut output: impl Write + 'static, + from: NumberFor, + to: Option>, + binary: bool +) -> Pin>>> +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + let client = client; + let mut block = from; + + let last = match to { + Some(v) if v.is_zero() => One::one(), + Some(v) => v, + None => client.chain_info().best_number, + }; + + let mut wrote_header = false; + + // Exporting blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we write a block to the output, the `Future` re-schedules itself and returns + // `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block exports, + // or to stop the operation completely. + let export = future::poll_fn(move |cx| { + if last < block { + return std::task::Poll::Ready(Err("Invalid block range specified".into())); + } + + if !wrote_header { + info!("Exporting blocks from #{} to #{}", block, last); + if binary { + let last_: u64 = last.saturated_into::(); + let block_: u64 = block.saturated_into::(); + let len: u64 = last_ - block_ + 1; + output.write_all(&len.encode())?; + } + wrote_header = true; + } + + match client.block(&BlockId::number(block))? { + Some(block) => { + if binary { + output.write_all(&block.encode())?; + } else { + serde_json::to_writer(&mut output, &block) + .map_err(|e| format!("Error writing JSON: {}", e))?; + } + }, + // Reached end of the chain. + None => return std::task::Poll::Ready(Ok(())), + } + if (block % 10000.into()).is_zero() { + info!("#{}", block); + } + if block == last { + return std::task::Poll::Ready(Ok(())); + } + block += One::one(); + + // Re-schedule the task in order to continue the operation. + cx.waker().wake_by_ref(); + std::task::Poll::Pending + }); + + Box::pin(export) +} + + diff --git a/client/service/src/import_blocks.rs b/client/service/src/import_blocks.rs new file mode 100644 index 0000000000000..e38fa90f9a21a --- /dev/null +++ b/client/service/src/import_blocks.rs @@ -0,0 +1,185 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain utilities. + +#![allow(unused_imports)] + +use crate::error; +//use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; +use crate::error::Error; +use sc_chain_spec::ChainSpec; +use log::{warn, info}; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion +}; +use sp_runtime::generic::{BlockId, SignedBlock}; +use codec::{Decode, Encode, IoReader}; +use sc_client::{Client, LocalCallExecutor}; +use sp_consensus::{ + BlockOrigin, + import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, +}; +use sc_executor::{NativeExecutor, NativeExecutionDispatch}; + +use std::{io::{Read, Write, Seek}, pin::Pin}; +use sc_client_api::BlockBackend; +use std::sync::Arc; + +pub fn import_blocks( + client: Arc>, + import_queue: IQ, + input: impl Read + Seek + Send + 'static, + force: bool, +) -> Pin> + Send>> +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + IQ: ImportQueue + 'static, + RA: Send + Sync + 'static, +{ + struct WaitLink { + imported_blocks: u64, + has_error: bool, + } + + impl WaitLink { + fn new() -> WaitLink { + WaitLink { + imported_blocks: 0, + has_error: false, + } + } + } + + impl Link for WaitLink { + fn blocks_processed( + &mut self, + imported: usize, + _count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + self.imported_blocks += imported as u64; + + for result in results { + if let (Err(err), hash) = result { + warn!("There was an error importing block with hash {:?}: {:?}", hash, err); + self.has_error = true; + break; + } + } + } + } + + let client = client; + let mut queue = import_queue; + + let mut io_reader_input = IoReader(input); + let mut count = None::; + let mut read_block_count = 0; + let mut link = WaitLink::new(); + + // Importing blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we read a block from the input or import a bunch of blocks from the import + // queue, the `Future` re-schedules itself and returns `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block imports, + // or to stop the operation completely. + let import = future::poll_fn(move |cx| { + // Start by reading the number of blocks if not done so already. + let count = match count { + Some(c) => c, + None => { + let c: u64 = match Decode::decode(&mut io_reader_input) { + Ok(c) => c, + Err(err) => { + let err = format!("Error reading file: {}", err); + return std::task::Poll::Ready(Err(From::from(err))); + }, + }; + info!("📦 Importing {} blocks", c); + count = Some(c); + c + } + }; + + // Read blocks from the input. + if read_block_count < count { + match SignedBlock::::decode(&mut io_reader_input) { + Ok(signed) => { + let (header, extrinsics) = signed.block.deconstruct(); + let hash = header.hash(); + // import queue handles verification and importing it into the client + queue.import_blocks(BlockOrigin::File, vec![ + IncomingBlock:: { + hash, + header: Some(header), + body: Some(extrinsics), + justification: signed.justification, + origin: None, + allow_missing_state: false, + import_existing: force, + } + ]); + } + Err(e) => { + warn!("Error reading block data at {}: {}", read_block_count, e); + return std::task::Poll::Ready(Ok(())); + } + } + + read_block_count += 1; + if read_block_count % 1000 == 0 { + info!("#{} blocks were added to the queue", read_block_count); + } + + cx.waker().wake_by_ref(); + return std::task::Poll::Pending; + } + + let blocks_before = link.imported_blocks; + queue.poll_actions(cx, &mut link); + + if link.has_error { + info!( + "Stopping after #{} blocks because of an error", + link.imported_blocks, + ); + return std::task::Poll::Ready(Ok(())); + } + + if link.imported_blocks / 1000 != blocks_before / 1000 { + info!( + "#{} blocks were imported (#{} left)", + link.imported_blocks, + count - link.imported_blocks + ); + } + + if link.imported_blocks >= count { + info!("🎉 Imported {} blocks. Best: #{}", read_block_count, client.chain_info().best_number); + return std::task::Poll::Ready(Ok(())); + + } else { + // Polling the import queue will re-schedule the task when ready. + return std::task::Poll::Pending; + } + }); + Box::pin(import) +} diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 16b1d0d718f87..8475bdedfe6b4 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -28,7 +28,17 @@ mod metrics; mod builder; mod status_sinks; mod task_manager; - +mod revert_chain; +mod check_block; +mod import_blocks; +mod export_blocks; + +pub use revert_chain::*; +pub use check_block::*; +pub use import_blocks::*; +pub use export_blocks::*; +pub use sp_consensus::import_queue::ImportQueue; +pub use builder::*; use std::{borrow::Cow, io, pin::Pin}; use std::marker::PhantomData; use std::net::SocketAddr; @@ -38,7 +48,7 @@ use wasm_timer::Instant; use std::task::{Poll, Context}; use parking_lot::Mutex; -use sc_client::Client; +pub use sc_client::Client; use futures::{ Future, FutureExt, Stream, StreamExt, compat::*, @@ -54,11 +64,13 @@ use parity_util_mem::MallocSizeOf; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; pub use self::error::Error; +/* pub use self::builder::{ new_full_client, ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor, }; +*/ pub use config::{Configuration, Role, PruningMode, DatabaseConfig}; pub use sc_chain_spec::{ ChainSpec, GenericChainSpec, Properties, RuntimeGenesis, Extension as ChainSpecExtension, diff --git a/client/service/src/revert_chain.rs b/client/service/src/revert_chain.rs new file mode 100644 index 0000000000000..064ea2acc6075 --- /dev/null +++ b/client/service/src/revert_chain.rs @@ -0,0 +1,62 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain utilities. + +#![allow(unused_imports)] + +use crate::error; +//use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; +use crate::error::Error; +use sc_chain_spec::ChainSpec; +use log::{warn, info}; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion +}; +use sp_runtime::generic::{BlockId, SignedBlock}; +use codec::{Decode, Encode, IoReader}; +use sc_client::{Client, LocalCallExecutor}; +use sp_consensus::{ + BlockOrigin, + import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, +}; +use sc_executor::{NativeExecutor, NativeExecutionDispatch}; + +use std::{io::{Read, Write, Seek}, pin::Pin}; +use sc_client_api::BlockBackend; +use std::sync::Arc; + +pub fn revert_chain( + client: Arc>, + blocks: NumberFor +) -> Result<(), Error> +where + B: BlockT, + BA: sc_client_api::backend::Backend + 'static, + CE: sc_client_api::call_executor::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + let reverted = client.revert(blocks)?; + let info = client.chain_info(); + + if reverted.is_zero() { + info!("There aren't any non-finalized blocks to revert."); + } else { + info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); + } + Ok(()) +}