diff --git a/Cargo.lock b/Cargo.lock index afdfb5e81cc9a..b3dd853538676 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,7 +195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d0864d84b8e07b145449be9a8537db86bf9de5ce03b913214694643b4743502" dependencies = [ "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1040,7 +1040,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47c5e5ac752e18207b12e16b10631ae5f7f68f8805f335f9b817ead83d9ffce1" dependencies = [ "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1080,7 +1080,7 @@ checksum = "e2323f3f47db9a0e77ce7a300605d8d2098597fc451ed1a97bb1f6411bb550a7" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1182,7 +1182,7 @@ checksum = "2ed9afacaea0301eefb738c9deea725e6d53938004597cdc518a8cf9a7aa2f03" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1335,7 +1335,7 @@ checksum = "030a733c8287d6213886dd487564ff5c8f6aae10278b3588ed177f9d18f8d231" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "synstructure", ] @@ -1526,7 +1526,7 @@ dependencies = [ "frame-support-procedural-tools", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1537,7 +1537,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1546,7 +1546,7 @@ version = "2.0.0-rc4" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -1763,7 +1763,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -2309,7 +2309,7 @@ checksum = "7ef5550a42e3740a0e71f909d4c861056a284060af885ae7aa6242820f920d9d" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -2448,7 +2448,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -2740,7 +2740,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f09548626b737ed64080fde595e06ce1117795b8b9fc4d2629fa36561c583171" dependencies = [ "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -4635,7 +4635,7 @@ dependencies = [ "proc-macro2", "quote 1.0.6", "sp-runtime", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -4869,7 +4869,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -4919,7 +4919,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f557c32c6d268a07c921471619c0295f5efad3a0e76d4f97a05c091a51d110b2" dependencies = [ "proc-macro2", - "syn 1.0.17", + "syn 1.0.33", "synstructure", ] @@ -5007,7 +5007,7 @@ dependencies = [ "proc-macro-hack", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -5071,7 +5071,7 @@ checksum = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -5210,7 +5210,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "version_check", ] @@ -5222,7 +5222,7 @@ checksum = "4f5444ead4e9935abd7f27dc51f7e852a0569ac888096d5ec2499470794e2e53" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "syn-mid", "version_check", ] @@ -5241,9 +5241,9 @@ checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" [[package]] name = "proc-macro2" -version = "1.0.10" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3" +checksum = "beae6331a816b1f65d04c45b078fd8e6c93e8071771f41b8163255bbd8d7c8fa" dependencies = [ "unicode-xid 0.2.0", ] @@ -5315,7 +5315,7 @@ dependencies = [ "itertools 0.8.2", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -5683,7 +5683,7 @@ checksum = "602eb59cda66fcb9aec25841fb76bc01d2b34282dcdd705028da297db6f3eec8" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -5763,7 +5763,7 @@ checksum = "475e68978dc5b743f2f40d8e0a8fdc83f1c5e78cbf4b8fa5e74e73beebc340de" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -5888,7 +5888,7 @@ checksum = "b3bba175698996010c4f6dce5e7f173b6eb781fce25d2cfc45e27091ce0b79f6" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -6021,7 +6021,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -6048,6 +6048,7 @@ dependencies = [ "sc-service", "sc-telemetry", "sc-tracing", + "serde", "serde_json", "sp-blockchain", "sp-core", @@ -7076,7 +7077,7 @@ checksum = "f8584eea9b9ff42825b46faf46a8c24d2cff13ec152fa2a50df788b87c07ee28" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -7166,22 +7167,22 @@ checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" [[package]] name = "serde" -version = "1.0.110" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c" +checksum = "5317f7588f0a5078ee60ef675ef96735a1442132dc645eb1d12c018620ed8cd3" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.110" +version = "1.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984" +checksum = "2a0be94b04690fbaed37cddffc5c134bf537c8e3329d53e982fe04c374978f8e" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -7295,7 +7296,7 @@ checksum = "a945ec7f7ce853e89ffa36be1e27dce9a43e82ff9093bf3461c30d5da74ed11b" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -7393,7 +7394,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -7657,7 +7658,7 @@ version = "2.0.0-rc4" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -7756,7 +7757,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -7848,7 +7849,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -8149,7 +8150,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -8170,7 +8171,7 @@ dependencies = [ "heck", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -8538,9 +8539,9 @@ dependencies = [ [[package]] name = "syn" -version = "1.0.17" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03" +checksum = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd" dependencies = [ "proc-macro2", "quote 1.0.6", @@ -8555,7 +8556,7 @@ checksum = "7be3539f6c128a931cf19dcee741c1af532c7fd387baa739c03dd2e96479338a" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -8575,7 +8576,7 @@ checksum = "67656ea1dc1b41b1451851562ea232ec2e5a80242139f7e679ceccfb5d61f545" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "unicode-xid 0.2.0", ] @@ -8638,7 +8639,7 @@ dependencies = [ "lazy_static", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "version_check", ] @@ -8668,7 +8669,7 @@ checksum = "ca972988113b7715266f91250ddb98070d033c62a011fa0fcc57434a649310dd" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -8874,7 +8875,7 @@ checksum = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -9075,7 +9076,7 @@ checksum = "99bbad0de3fd923c9c3232ead88510b783e5a4d16a6154adffa3d53308de984c" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", ] [[package]] @@ -9427,7 +9428,7 @@ dependencies = [ "log", "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "wasm-bindgen-shared", ] @@ -9461,7 +9462,7 @@ checksum = "8eb197bd3a47553334907ffd2f16507b4f4f01bbec3ac921a7719e0decdfe72a" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -9782,7 +9783,7 @@ checksum = "de251eec69fc7c1bc3923403d18ececb929380e016afe103da75f396704f8ca2" dependencies = [ "proc-macro2", "quote 1.0.6", - "syn 1.0.17", + "syn 1.0.33", "synstructure", ] diff --git a/bin/node-template/node/src/command.rs b/bin/node-template/node/src/command.rs index 4f2fd3aad6fd3..1bc436a063beb 100644 --- a/bin/node-template/node/src/command.rs +++ b/bin/node-template/node/src/command.rs @@ -71,7 +71,10 @@ pub fn run() -> sc_cli::Result<()> { match &cli.subcommand { Some(subcommand) => { let runner = cli.create_runner(subcommand)?; - runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0)) + runner.run_subcommand(subcommand, |config| { + let (builder, _, _) = new_full_start!(config); + Ok(builder.to_chain_ops_parts()) + }) } None => { let runner = cli.create_runner(&cli.run)?; diff --git a/bin/node/cli/src/command.rs b/bin/node/cli/src/command.rs index b07e0cdc907e0..4ac796370c6f8 100644 --- a/bin/node/cli/src/command.rs +++ b/bin/node/cli/src/command.rs @@ -97,8 +97,10 @@ pub fn run() -> Result<()> { } Some(Subcommand::Base(subcommand)) => { let runner = cli.create_runner(subcommand)?; - - runner.run_subcommand(subcommand, |config| Ok(new_full_start!(config).0)) + runner.run_subcommand(subcommand, |config| { + let (builder, _, _, _) = new_full_start!(config); + Ok(builder.to_chain_ops_parts()) + }) } } } diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 616b4f3481324..6ebf2f9bf8982 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -43,6 +43,7 @@ structopt = "0.3.8" sc-tracing = { version = "2.0.0-rc4", path = "../tracing" } chrono = "0.4.10" parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] } +serde = "1.0.111" [target.'cfg(not(target_os = "unknown"))'.dependencies] rpassword = "4.0.1" diff --git a/client/cli/src/commands/build_spec_cmd.rs b/client/cli/src/commands/build_spec_cmd.rs index 23626359ff131..616c5139f64f0 100644 --- a/client/cli/src/commands/build_spec_cmd.rs +++ b/client/cli/src/commands/build_spec_cmd.rs @@ -22,7 +22,7 @@ use crate::params::SharedParams; use crate::CliConfiguration; use log::info; use sc_network::config::build_multiaddr; -use sc_service::{config::MultiaddrWithPeerId, Configuration}; +use sc_service::{config::{MultiaddrWithPeerId, NetworkConfiguration}, ChainSpec}; use structopt::StructOpt; use std::io::Write; @@ -51,13 +51,16 @@ pub struct BuildSpecCmd { impl BuildSpecCmd { /// Run the build-spec command - pub fn run(&self, config: Configuration) -> error::Result<()> { + pub fn run( + &self, + mut spec: Box, + network_config: NetworkConfiguration, + ) -> error::Result<()> { info!("Building chain spec"); - let mut spec = config.chain_spec; let raw_output = self.raw; if spec.boot_nodes().is_empty() && !self.disable_default_bootnode { - let keys = config.network.node_key.into_keypair()?; + let keys = network_config.node_key.into_keypair()?; let peer_id = keys.public().into_peer_id(); let addr = MultiaddrWithPeerId { multiaddr: build_multiaddr![Ip4([127, 0, 0, 1]), Tcp(30333u16)], diff --git a/client/cli/src/commands/check_block_cmd.rs b/client/cli/src/commands/check_block_cmd.rs index c000ea7fb11ee..b536d4f26bb6c 100644 --- a/client/cli/src/commands/check_block_cmd.rs +++ b/client/cli/src/commands/check_block_cmd.rs @@ -19,9 +19,9 @@ use crate::{ CliConfiguration, error, params::{ImportParams, SharedParams, BlockNumberOrHash}, }; -use sc_service::{Configuration, ServiceBuilderCommand}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; -use std::{fmt::Debug, str::FromStr}; +use sc_client_api::{BlockBackend, UsageProvider}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::{fmt::Debug, str::FromStr, sync::Arc}; use structopt::StructOpt; /// The `check-block` command used to validate blocks. @@ -48,21 +48,21 @@ pub struct CheckBlockCmd { impl CheckBlockCmd { /// Run the check-block command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: Arc, + import_queue: IQ, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: BlockT + Debug, - as FromStr>::Err: std::fmt::Debug, - BB::Hash: FromStr, - ::Err: std::fmt::Debug, + B: BlockT + for<'de> serde::Deserialize<'de>, + C: BlockBackend + UsageProvider + Send + Sync + 'static, + IQ: sc_service::ImportQueue + 'static, + B::Hash: FromStr, + ::Err: Debug, + <::Number as FromStr>::Err: Debug, { let start = std::time::Instant::now(); - builder(config)?.check_block(self.input.parse()?).await?; + sc_service::chain_ops::check_block(client, import_queue, self.input.parse()?).await?; println!("Completed in {} ms.", start.elapsed().as_millis()); Ok(()) diff --git a/client/cli/src/commands/export_blocks_cmd.rs b/client/cli/src/commands/export_blocks_cmd.rs index 7c523c0555d55..118832a79d29d 100644 --- a/client/cli/src/commands/export_blocks_cmd.rs +++ b/client/cli/src/commands/export_blocks_cmd.rs @@ -21,13 +21,16 @@ use crate::params::{BlockNumber, DatabaseParams, PruningParams, SharedParams}; use crate::CliConfiguration; use log::info; use sc_service::{ - config::DatabaseConfig, Configuration, ServiceBuilderCommand, + config::DatabaseConfig, chain_ops::export_blocks, }; +use sc_client_api::{BlockBackend, UsageProvider}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; use std::fs; use std::io; use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; use structopt::StructOpt; /// The `export-blocks` command used to export blocks. @@ -68,19 +71,17 @@ pub struct ExportBlocksCmd { impl ExportBlocksCmd { /// Run the export-blocks command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: Arc, + database_config: DatabaseConfig, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + C: BlockBackend + UsageProvider + 'static, + <::Number as FromStr>::Err: Debug, { - if let DatabaseConfig::RocksDb { ref path, .. } = &config.database { + if let DatabaseConfig::RocksDb { ref path, .. } = database_config { info!("DB path: {}", path.display()); } @@ -94,8 +95,7 @@ impl ExportBlocksCmd { None => Box::new(io::stdout()), }; - builder(config)? - .export_blocks(file, from.into(), to, binary) + export_blocks(client, file, from.into(), to, binary) .await .map_err(Into::into) } diff --git a/client/cli/src/commands/export_state_cmd.rs b/client/cli/src/commands/export_state_cmd.rs index 23a43a178abe5..c078db0d8aea9 100644 --- a/client/cli/src/commands/export_state_cmd.rs +++ b/client/cli/src/commands/export_state_cmd.rs @@ -20,10 +20,10 @@ use crate::{ CliConfiguration, error, params::{PruningParams, SharedParams, BlockNumberOrHash}, }; use log::info; -use sc_service::{Configuration, ServiceBuilderCommand}; -use sp_runtime::traits::{Block as BlockT, NumberFor}; -use std::{fmt::Debug, str::FromStr, io::Write}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use std::{fmt::Debug, str::FromStr, io::Write, sync::Arc}; use structopt::StructOpt; +use sc_client_api::{StorageProvider, UsageProvider}; /// The `export-state` command used to export the state of a given block into /// a chain spec. @@ -44,23 +44,22 @@ pub struct ExportStateCmd { impl ExportStateCmd { /// Run the `export-state` command - pub fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: Arc, + mut input_spec: Box, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: BlockT + Debug, - as FromStr>::Err: std::fmt::Debug, - BB::Hash: FromStr, - ::Err: std::fmt::Debug, + B: BlockT, + C: UsageProvider + StorageProvider, + BA: sc_client_api::backend::Backend, + B::Hash: FromStr, + ::Err: Debug, + <::Number as FromStr>::Err: Debug, { info!("Exporting raw state..."); - let mut input_spec = config.chain_spec.cloned_box(); let block_id = self.input.as_ref().map(|b| b.parse()).transpose()?; - let raw_state = builder(config)?.export_raw_state(block_id)?; + let raw_state = sc_service::chain_ops::export_raw_state(client, block_id)?; input_spec.set_storage(raw_state); info!("Generating new chain spec..."); diff --git a/client/cli/src/commands/import_blocks_cmd.rs b/client/cli/src/commands/import_blocks_cmd.rs index 8e178c4b97964..00f8ec43b02fe 100644 --- a/client/cli/src/commands/import_blocks_cmd.rs +++ b/client/cli/src/commands/import_blocks_cmd.rs @@ -20,13 +20,15 @@ use crate::error; use crate::params::ImportParams; use crate::params::SharedParams; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; +use sc_service::chain_ops::import_blocks; +use sp_runtime::traits::Block as BlockT; use std::fmt::Debug; use std::fs; use std::io::{self, Read, Seek}; use std::path::PathBuf; +use std::sync::Arc; use structopt::StructOpt; +use sc_client_api::UsageProvider; /// The `import-blocks` command used to import blocks. #[derive(Debug, StructOpt)] @@ -61,17 +63,15 @@ impl ReadPlusSeek for T {} impl ImportBlocksCmd { /// Run the import-blocks command - pub async fn run( + pub async fn run( &self, - config: Configuration, - builder: B, + client: Arc, + import_queue: IQ, ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + C: UsageProvider + Send + Sync + 'static, + B: BlockT + for<'de> serde::Deserialize<'de>, + IQ: sc_service::ImportQueue + 'static, { let file: Box = match &self.input { Some(filename) => Box::new(fs::File::open(filename)?), @@ -82,8 +82,7 @@ impl ImportBlocksCmd { } }; - builder(config)? - .import_blocks(file, false, self.binary) + import_blocks(client, import_queue, file, false, self.binary) .await .map_err(Into::into) } diff --git a/client/cli/src/commands/purge_chain_cmd.rs b/client/cli/src/commands/purge_chain_cmd.rs index 053f427309828..9c9c6e91fb241 100644 --- a/client/cli/src/commands/purge_chain_cmd.rs +++ b/client/cli/src/commands/purge_chain_cmd.rs @@ -19,7 +19,7 @@ use crate::error; use crate::params::{DatabaseParams, SharedParams}; use crate::CliConfiguration; -use sc_service::Configuration; +use sc_service::DatabaseConfig; use std::fmt::Debug; use std::fs; use std::io::{self, Write}; @@ -43,8 +43,8 @@ pub struct PurgeChainCmd { impl PurgeChainCmd { /// Run the purge command - pub fn run(&self, config: Configuration) -> error::Result<()> { - let db_path = config.database.path() + pub fn run(&self, database_config: DatabaseConfig) -> error::Result<()> { + let db_path = database_config.path() .ok_or_else(|| error::Error::Input("Cannot purge custom database implementation".into()) )?; diff --git a/client/cli/src/commands/revert_cmd.rs b/client/cli/src/commands/revert_cmd.rs index 1b5489df708a7..bbfb0d2ff99a7 100644 --- a/client/cli/src/commands/revert_cmd.rs +++ b/client/cli/src/commands/revert_cmd.rs @@ -19,10 +19,13 @@ use crate::error; use crate::params::{BlockNumber, PruningParams, SharedParams}; use crate::CliConfiguration; -use sc_service::{Configuration, ServiceBuilderCommand}; +use sc_service::chain_ops::revert_chain; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use std::fmt::Debug; +use std::str::FromStr; +use std::sync::Arc; use structopt::StructOpt; +use sc_client_api::{Backend, UsageProvider}; /// The `revert` command used revert the chain to a previous state. #[derive(Debug, StructOpt)] @@ -42,16 +45,19 @@ pub struct RevertCmd { impl RevertCmd { /// Run the revert command - pub fn run(&self, config: Configuration, builder: B) -> error::Result<()> + pub async fn run( + &self, + client: Arc, + backend: Arc, + ) -> error::Result<()> where - B: FnOnce(Configuration) -> Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as std::str::FromStr>::Err: std::fmt::Debug, - ::Hash: std::str::FromStr, + B: BlockT, + BA: Backend, + C: UsageProvider, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, { let blocks = self.num.parse()?; - builder(config)?.revert_chain(blocks)?; + revert_chain(client, backend, blocks)?; Ok(()) } diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index fcc869dc87069..807a5620ec0ce 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -25,10 +25,11 @@ use futures::pin_mut; use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; -use sc_service::{Configuration, ServiceBuilderCommand, TaskType, TaskManager}; +use sc_service::{Configuration, TaskType, TaskManager}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; -use std::{fmt::Debug, marker::PhantomData, str::FromStr}; +use std::{fmt::Debug, marker::PhantomData, str::FromStr, sync::Arc}; +use sc_client_api::{UsageProvider, BlockBackend, StorageProvider}; #[cfg(target_family = "unix")] async fn main(func: F) -> std::result::Result<(), Box> @@ -92,7 +93,11 @@ pub fn build_runtime() -> std::result::Result(mut tokio_runtime: tokio::runtime::Runtime, future: FUT) -> Result<()> +fn run_until_exit( + mut tokio_runtime: tokio::runtime::Runtime, + future: FUT, + mut task_manager: TaskManager, +) -> Result<()> where FUT: Future> + future::Future, ERR: 'static + std::error::Error, @@ -102,6 +107,9 @@ where tokio_runtime.block_on(main(f)).map_err(|e| e.to_string())?; + task_manager.terminate(); + drop(tokio_runtime); + Ok(()) } @@ -173,29 +181,47 @@ impl Runner { /// A helper function that runs a future with tokio and stops if the process receives the signal /// `SIGTERM` or `SIGINT`. - pub fn run_subcommand(self, subcommand: &Subcommand, builder: B) -> Result<()> + pub fn run_subcommand(self, subcommand: &Subcommand, builder: BU) + -> Result<()> where - B: FnOnce(Configuration) -> sc_service::error::Result, - BC: ServiceBuilderCommand + Unpin, - BB: sp_runtime::traits::Block + Debug, - <<::Header as HeaderT>::Number as FromStr>::Err: Debug, - ::Hash: FromStr, - <::Hash as FromStr>::Err: Debug, + BU: FnOnce(Configuration) + -> sc_service::error::Result<(Arc, Arc, IQ, TaskManager)>, + B: BlockT + for<'de> serde::Deserialize<'de>, + BA: sc_client_api::backend::Backend + 'static, + IQ: sc_service::ImportQueue + 'static, + ::Hash: FromStr, + <::Hash as FromStr>::Err: Debug, + <<::Header as HeaderT>::Number as FromStr>::Err: Debug, + CL: UsageProvider + BlockBackend + StorageProvider + Send + Sync + + 'static, { + let chain_spec = self.config.chain_spec.cloned_box(); + let network_config = self.config.network.clone(); + let db_config = self.config.database.clone(); + match subcommand { - Subcommand::BuildSpec(cmd) => cmd.run(self.config), + Subcommand::BuildSpec(cmd) => cmd.run(chain_spec, network_config), Subcommand::ExportBlocks(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + let (client, _, _, task_manager) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, db_config), task_manager) } Subcommand::ImportBlocks(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + let (client, _, import_queue, task_manager) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, import_queue), task_manager) } Subcommand::CheckBlock(cmd) => { - run_until_exit(self.tokio_runtime, cmd.run(self.config, builder)) + let (client, _, import_queue, task_manager) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, import_queue), task_manager) } - Subcommand::Revert(cmd) => cmd.run(self.config, builder), - Subcommand::PurgeChain(cmd) => cmd.run(self.config), - Subcommand::ExportState(cmd) => cmd.run(self.config, builder), + Subcommand::Revert(cmd) => { + let (client, backend, _, task_manager) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, backend), task_manager) + }, + Subcommand::PurgeChain(cmd) => cmd.run(db_config), + Subcommand::ExportState(cmd) => { + let (client, _, _, task_manager) = builder(self.config)?; + run_until_exit(self.tokio_runtime, cmd.run(client, chain_spec), task_manager) + }, } } @@ -221,11 +247,14 @@ impl Runner { /// A helper function that runs a future with tokio and stops if the process receives /// the signal SIGTERM or SIGINT - pub fn async_run(self, runner: impl FnOnce(Configuration) -> FUT) -> Result<()> + pub fn async_run( + self, runner: impl FnOnce(Configuration) -> Result<(FUT, TaskManager)>, + ) -> Result<()> where FUT: Future>, { - run_until_exit(self.tokio_runtime, runner(self.config)) + let (future, task_manager) = runner(self.config)?; + run_until_exit(self.tokio_runtime, future, task_manager) } /// Get an immutable reference to the node Configuration diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 234356856b313..3a1c5c85af5da 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -45,15 +45,11 @@ use sc_network::NetworkService; use parking_lot::{Mutex, RwLock}; use sp_runtime::generic::BlockId; use sp_runtime::traits::{ - Block as BlockT, NumberFor, SaturatedConversion, HashFor, Zero, BlockIdTo, + Block as BlockT, SaturatedConversion, HashFor, Zero, BlockIdTo, }; use sp_api::{ProvideRuntimeApi, CallApiAt}; use sc_executor::{NativeExecutor, NativeExecutionDispatch, RuntimeInfo}; -use std::{ - collections::HashMap, - io::{Read, Write, Seek}, - marker::PhantomData, sync::Arc, pin::Pin -}; +use std::{collections::HashMap, marker::PhantomData, sync::Arc, pin::Pin}; use wasm_timer::SystemTime; use sc_telemetry::{telemetry, SUBSTRATE_INFO}; use sp_transaction_pool::{LocalTransactionPool, MaintainedTransactionPool}; @@ -67,7 +63,6 @@ use sc_client_api::{ proof_provider::ProofProvider, execution_extensions::ExecutionExtensions }; -use sp_core::storage::Storage; use sp_blockchain::{HeaderMetadata, HeaderBackend}; use crate::{ServiceComponents, TelemetryOnConnectSinks, RpcHandlers, NetworkStatusSinks}; @@ -523,6 +518,11 @@ impl self.remote_backend.clone() } + /// Consume the builder and return the parts needed for chain operations. + pub fn to_chain_ops_parts(self) -> (Arc, Arc, TImpQu, TaskManager) { + (self.client, self.backend, self.import_queue, self.task_manager) + } + /// Defines which head-of-chain strategy to use. pub fn with_opt_select_chain( self, @@ -840,50 +840,6 @@ impl } } -/// Implemented on `ServiceBuilder`. Allows running block commands, such as import/export/validate -/// components to the builder. -pub trait ServiceBuilderCommand { - /// Block type this API operates on. - type Block: BlockT; - /// Native execution dispatch required by some commands. - type NativeDispatch: NativeExecutionDispatch + 'static; - /// Starts the process of importing blocks. - fn import_blocks( - self, - input: impl Read + Seek + Send + 'static, - force: bool, - binary: bool, - ) -> Pin> + Send>>; - - /// Performs the blocks export. - fn export_blocks( - self, - output: impl Write + 'static, - from: NumberFor, - to: Option>, - binary: bool - ) -> Pin>>>; - - /// Performs a revert of `blocks` blocks. - fn revert_chain( - &self, - blocks: NumberFor - ) -> Result<(), Error>; - - /// Re-validate known block. - fn check_block( - self, - block: BlockId - ) -> Pin> + Send>>; - - /// Export the raw state at the given `block`. If `block` is `None`, the - /// best block will be used. - fn export_raw_state( - &self, - block: Option>, - ) -> Result; -} - impl ServiceBuilder< TBl, diff --git a/client/service/src/chain_ops.rs b/client/service/src/chain_ops.rs deleted file mode 100644 index cb4ed24b60b62..0000000000000 --- a/client/service/src/chain_ops.rs +++ /dev/null @@ -1,614 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -//! Chain utilities. - -use crate::error; -use crate::builder::{ServiceBuilderCommand, ServiceBuilder}; -use crate::error::Error; -use sc_chain_spec::ChainSpec; -use log::{warn, info}; -use futures::{future, prelude::*}; -use sp_runtime::traits::{ - Block as BlockT, NumberFor, One, Zero, Header, SaturatedConversion, MaybeSerializeDeserialize, -}; -use sp_runtime::generic::{BlockId, SignedBlock}; -use codec::{Decode, Encode, IoReader as CodecIoReader}; -use crate::client::{Client, LocalCallExecutor}; -use sp_consensus::{ - BlockOrigin, - import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, -}; -use sc_executor::{NativeExecutor, NativeExecutionDispatch}; -use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageChild, StorageMap}; -use sc_client_api::{StorageProvider, BlockBackend, UsageProvider}; - -use std::{io::{Read, Write, Seek}, pin::Pin, collections::HashMap}; -use std::time::{Duration, Instant}; -use futures_timer::Delay; -use std::task::Poll; -use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer}; -use std::convert::{TryFrom, TryInto}; -use sp_runtime::traits::{CheckedDiv, Saturating}; - -/// Number of blocks we will add to the queue before waiting for the queue to catch up. -const MAX_PENDING_BLOCKS: u64 = 1_024; - -/// Number of milliseconds to wait until next poll. -const DELAY_TIME: u64 = 2_000; - -/// Number of milliseconds that must have passed between two updates. -const TIME_BETWEEN_UPDATES: u64 = 3_000; - -/// Build a chain spec json -pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result { - spec.as_json(raw).map_err(Into::into) -} - - -/// Helper enum that wraps either a binary decoder (from parity-scale-codec), or a JSON decoder (from serde_json). -/// Implements the Iterator Trait, calling `next()` will decode the next SignedBlock and return it. -enum BlockIter where - R: std::io::Read + std::io::Seek, -{ - Binary { - // Total number of blocks we are expecting to decode. - num_expected_blocks: u64, - // Number of blocks we have decoded thus far. - read_block_count: u64, - // Reader to the data, used for decoding new blocks. - reader: CodecIoReader, - }, - Json { - // Nubmer of blocks we have decoded thus far. - read_block_count: u64, - // Stream to the data, used for decoding new blocks. - reader: StreamDeserializer<'static, JsonIoRead, SignedBlock>, - }, -} - -impl BlockIter where - R: Read + Seek + 'static, - B: BlockT + MaybeSerializeDeserialize, -{ - fn new(input: R, binary: bool) -> Result { - if binary { - let mut reader = CodecIoReader(input); - // If the file is encoded in binary format, it is expected to first specify the number - // of blocks that are going to be decoded. We read it and add it to our enum struct. - let num_expected_blocks: u64 = Decode::decode(&mut reader) - .map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?; - Ok(BlockIter::Binary { - num_expected_blocks, - read_block_count: 0, - reader, - }) - } else { - let stream_deser = Deserializer::from_reader(input) - .into_iter::>(); - Ok(BlockIter::Json { - reader: stream_deser, - read_block_count: 0, - }) - } - } - - /// Returns the number of blocks read thus far. - fn read_block_count(&self) -> u64 { - match self { - BlockIter::Binary { read_block_count, .. } - | BlockIter::Json { read_block_count, .. } - => *read_block_count, - } - } - - /// Returns the total number of blocks to be imported, if possible. - fn num_expected_blocks(&self) -> Option { - match self { - BlockIter::Binary { num_expected_blocks, ..} => Some(*num_expected_blocks), - BlockIter::Json {..} => None - } - } -} - -impl Iterator for BlockIter where - R: Read + Seek + 'static, - B: BlockT + MaybeSerializeDeserialize, -{ - type Item = Result, String>; - - fn next(&mut self) -> Option { - match self { - BlockIter::Binary { num_expected_blocks, read_block_count, reader } => { - if read_block_count < num_expected_blocks { - let block_result: Result, _> = SignedBlock::::decode(reader) - .map_err(|e| e.to_string()); - *read_block_count += 1; - Some(block_result) - } else { - // `read_block_count` == `num_expected_blocks` so we've read enough blocks. - None - } - } - BlockIter::Json { reader, read_block_count } => { - let res = Some(reader.next()?.map_err(|e| e.to_string())); - *read_block_count += 1; - res - } - } - } -} - -/// Imports the SignedBlock to the queue. -fn import_block_to_queue( - signed_block: SignedBlock, - queue: &mut TImpQu, - force: bool -) where - TBl: BlockT + MaybeSerializeDeserialize, - TImpQu: 'static + ImportQueue, -{ - let (header, extrinsics) = signed_block.block.deconstruct(); - let hash = header.hash(); - // import queue handles verification and importing it into the client. - queue.import_blocks(BlockOrigin::File, vec![ - IncomingBlock:: { - hash, - header: Some(header), - body: Some(extrinsics), - justification: signed_block.justification, - origin: None, - allow_missing_state: false, - import_existing: force, - } - ]); -} - -/// Returns true if we have imported every block we were supposed to import, else returns false. -fn importing_is_done( - num_expected_blocks: Option, - read_block_count: u64, - imported_blocks: u64 -) -> bool { - if let Some(num_expected_blocks) = num_expected_blocks { - imported_blocks >= num_expected_blocks - } else { - imported_blocks >= read_block_count - } -} - -/// Structure used to log the block importing speed. -struct Speedometer { - best_number: NumberFor, - last_number: Option>, - last_update: Instant, -} - -impl Speedometer { - /// Creates a fresh Speedometer. - fn new() -> Self { - Self { - best_number: NumberFor::::from(0), - last_number: None, - last_update: Instant::now(), - } - } - - /// Calculates `(best_number - last_number) / (now - last_update)` and - /// logs the speed of import. - fn display_speed(&self) { - // Number of milliseconds elapsed since last time. - let elapsed_ms = { - let elapsed = self.last_update.elapsed(); - let since_last_millis = elapsed.as_secs() * 1000; - let since_last_subsec_millis = elapsed.subsec_millis() as u64; - since_last_millis + since_last_subsec_millis - }; - - // Number of blocks that have been imported since last time. - let diff = match self.last_number { - None => return, - Some(n) => self.best_number.saturating_sub(n) - }; - - if let Ok(diff) = TryInto::::try_into(diff) { - // If the number of blocks can be converted to a regular integer, then it's easy: just - // do the math and turn it into a `f64`. - let speed = diff.saturating_mul(10_000).checked_div(u128::from(elapsed_ms)) - .map_or(0.0, |s| s as f64) / 10.0; - info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed); - } else { - // If the number of blocks can't be converted to a regular integer, then we need a more - // algebraic approach and we stay within the realm of integers. - let one_thousand = NumberFor::::from(1_000); - let elapsed = NumberFor::::from( - >::try_from(elapsed_ms).unwrap_or(u32::max_value()) - ); - - let speed = diff.saturating_mul(one_thousand).checked_div(&elapsed) - .unwrap_or_else(Zero::zero); - info!("📦 Current best block: {} ({} bps)", self.best_number, speed) - } - } - - /// Updates the Speedometer. - fn update(&mut self, best_number: NumberFor) { - self.last_number = Some(self.best_number); - self.best_number = best_number; - self.last_update = Instant::now(); - } - - // If more than TIME_BETWEEN_UPDATES has elapsed since last update, - // then print and update the speedometer. - fn notify_user(&mut self, best_number: NumberFor) { - let delta = Duration::from_millis(TIME_BETWEEN_UPDATES); - if Instant::now().duration_since(self.last_update) >= delta { - self.display_speed(); - self.update(best_number); - } - } -} - -/// Different State that the `import_blocks` future could be in. -enum ImportState where - R: Read + Seek + 'static, - B: BlockT + MaybeSerializeDeserialize, -{ - /// We are reading from the BlockIter structure, adding those blocks to the queue if possible. - Reading{block_iter: BlockIter}, - /// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to catch up. - WaitingForImportQueueToCatchUp{ - block_iter: BlockIter, - delay: Delay, - block: SignedBlock - }, - // We have added all the blocks to the queue but they are still being processed. - WaitingForImportQueueToFinish{ - num_expected_blocks: Option, - read_block_count: u64, - delay: Delay, - }, -} - -impl< - TBl, TRtApi, TBackend, - TExecDisp, TFchr, TSc, TImpQu, TFprb, TFpp, - TExPool, TRpc, Backend -> ServiceBuilderCommand for ServiceBuilder< - TBl, TRtApi, - Client>, TBl, TRtApi>, - TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend -> where - TBl: BlockT + MaybeSerializeDeserialize, - TBackend: 'static + sc_client_api::backend::Backend + Send, - TExecDisp: 'static + NativeExecutionDispatch, - TImpQu: 'static + ImportQueue, - TRtApi: 'static + Send + Sync, - Self: Send + 'static, -{ - type Block = TBl; - type NativeDispatch = TExecDisp; - - fn import_blocks( - mut self, - input: impl Read + Seek + Send + 'static, - force: bool, - binary: bool, - ) -> Pin> + Send>> { - struct WaitLink { - imported_blocks: u64, - has_error: bool, - } - - impl WaitLink { - fn new() -> WaitLink { - WaitLink { - imported_blocks: 0, - has_error: false, - } - } - } - - impl Link for WaitLink { - fn blocks_processed( - &mut self, - imported: usize, - _num_expected_blocks: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)> - ) { - self.imported_blocks += imported as u64; - - for result in results { - if let (Err(err), hash) = result { - warn!("There was an error importing block with hash {:?}: {:?}", hash, err); - self.has_error = true; - break; - } - } - } - } - - let mut link = WaitLink::new(); - let block_iter_res: Result, String> = BlockIter::new(input, binary); - - let block_iter = match block_iter_res { - Ok(block_iter) => block_iter, - Err(e) => { - // We've encountered an error while creating the block iterator - // so we can just return a future that returns an error. - return future::ready(Err(Error::Other(e))).boxed() - } - }; - - let mut state = Some(ImportState::Reading{block_iter}); - let mut speedometer = Speedometer::::new(); - - // Importing blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we read a block from the input or import a bunch of blocks from the import - // queue, the `Future` re-schedules itself and returns `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block imports, - // or to stop the operation completely. - let import = future::poll_fn(move |cx| { - let client = &self.client; - let queue = &mut self.import_queue; - match state.take().expect("state should never be None; qed") { - ImportState::Reading{mut block_iter} => { - match block_iter.next() { - None => { - // The iterator is over: we now need to wait for the import queue to finish. - let num_expected_blocks = block_iter.num_expected_blocks(); - let read_block_count = block_iter.read_block_count(); - let delay = Delay::new(Duration::from_millis(DELAY_TIME)); - state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); - }, - Some(block_result) => { - let read_block_count = block_iter.read_block_count(); - match block_result { - Ok(block) => { - if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { - // The queue is full, so do not add this block and simply wait until - // the queue has made some progress. - let delay = Delay::new(Duration::from_millis(DELAY_TIME)); - state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); - } else { - // Queue is not full, we can keep on adding blocks to the queue. - import_block_to_queue(block, queue, force); - state = Some(ImportState::Reading{block_iter}); - } - } - Err(e) => { - return Poll::Ready( - Err(Error::Other(format!("Error reading block #{}: {}", read_block_count, e)))) - } - } - } - } - }, - ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => { - let read_block_count = block_iter.read_block_count(); - if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { - // Queue is still full, so wait until there is room to insert our block. - match Pin::new(&mut delay).poll(cx) { - Poll::Pending => { - state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); - return Poll::Pending - }, - Poll::Ready(_) => { - delay.reset(Duration::from_millis(DELAY_TIME)); - }, - } - state = Some(ImportState::WaitingForImportQueueToCatchUp{block_iter, delay, block}); - } else { - // Queue is no longer full, so we can add our block to the queue. - import_block_to_queue(block, queue, force); - // Switch back to Reading state. - state = Some(ImportState::Reading{block_iter}); - } - }, - ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, mut delay} => { - // All the blocks have been added to the queue, which doesn't mean they - // have all been properly imported. - if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) { - // Importing is done, we can log the result and return. - info!( - "🎉 Imported {} blocks. Best: #{}", - read_block_count, client.chain_info().best_number - ); - return Poll::Ready(Ok(())) - } else { - // Importing is not done, we still have to wait for the queue to finish. - // Wait for the delay, because we know the queue is lagging behind. - match Pin::new(&mut delay).poll(cx) { - Poll::Pending => { - state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); - return Poll::Pending - }, - Poll::Ready(_) => { - delay.reset(Duration::from_millis(DELAY_TIME)); - }, - } - - state = Some(ImportState::WaitingForImportQueueToFinish{num_expected_blocks, read_block_count, delay}); - } - } - } - - queue.poll_actions(cx, &mut link); - - let best_number = client.chain_info().best_number; - speedometer.notify_user(best_number); - - if link.has_error { - return Poll::Ready(Err( - Error::Other( - format!("Stopping after #{} blocks because of an error", link.imported_blocks) - ) - )) - } - - cx.waker().wake_by_ref(); - Poll::Pending - }); - Box::pin(import) - } - - fn export_blocks( - self, - mut output: impl Write + 'static, - from: NumberFor, - to: Option>, - binary: bool - ) -> Pin>>> { - let mut block = from; - - let last = match to { - Some(v) if v.is_zero() => One::one(), - Some(v) => v, - None => self.client.chain_info().best_number, - }; - - let mut wrote_header = false; - - // Exporting blocks is implemented as a future, because we want the operation to be - // interruptible. - // - // Every time we write a block to the output, the `Future` re-schedules itself and returns - // `Poll::Pending`. - // This makes it possible either to interleave other operations in-between the block exports, - // or to stop the operation completely. - let export = future::poll_fn(move |cx| { - let client = &self.client; - - if last < block { - return Poll::Ready(Err("Invalid block range specified".into())); - } - - if !wrote_header { - info!("Exporting blocks from #{} to #{}", block, last); - if binary { - let last_: u64 = last.saturated_into::(); - let block_: u64 = block.saturated_into::(); - let len: u64 = last_ - block_ + 1; - output.write_all(&len.encode())?; - } - wrote_header = true; - } - - match client.block(&BlockId::number(block))? { - Some(block) => { - if binary { - output.write_all(&block.encode())?; - } else { - serde_json::to_writer(&mut output, &block) - .map_err(|e| format!("Error writing JSON: {}", e))?; - } - }, - // Reached end of the chain. - None => return Poll::Ready(Ok(())), - } - if (block % 10000.into()).is_zero() { - info!("#{}", block); - } - if block == last { - return Poll::Ready(Ok(())); - } - block += One::one(); - - // Re-schedule the task in order to continue the operation. - cx.waker().wake_by_ref(); - Poll::Pending - }); - - Box::pin(export) - } - - fn revert_chain( - &self, - blocks: NumberFor - ) -> Result<(), Error> { - let reverted = self.client.revert(blocks)?; - let info = self.client.chain_info(); - - if reverted.is_zero() { - info!("There aren't any non-finalized blocks to revert."); - } else { - info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); - } - Ok(()) - } - - fn check_block( - self, - block_id: BlockId - ) -> Pin> + Send>> { - match self.client.block(&block_id) { - Ok(Some(block)) => { - let mut buf = Vec::new(); - 1u64.encode_to(&mut buf); - block.encode_to(&mut buf); - let reader = std::io::Cursor::new(buf); - self.import_blocks(reader, true, true) - } - Ok(None) => Box::pin(future::err("Unknown block".into())), - Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), - } - } - - fn export_raw_state( - &self, - block: Option>, - ) -> Result { - let block = block.unwrap_or_else( - || BlockId::Hash(self.client.usage_info().chain.best_hash) - ); - - let empty_key = StorageKey(Vec::new()); - let mut top_storage = self.client.storage_pairs(&block, &empty_key)?; - let mut children_default = HashMap::new(); - - // Remove all default child storage roots from the top storage and collect the child storage - // pairs. - while let Some(pos) = top_storage - .iter() - .position(|(k, _)| k.0.starts_with(well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX)) { - let (key, _) = top_storage.swap_remove(pos); - - let key = StorageKey( - key.0[well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.len()..].to_vec(), - ); - let child_info = ChildInfo::new_default(&key.0); - - let keys = self.client.child_storage_keys(&block, &child_info, &empty_key)?; - let mut pairs = StorageMap::new(); - keys.into_iter().try_for_each(|k| { - if let Some(value) = self.client.child_storage(&block, &child_info, &k)? { - pairs.insert(k.0, value.0); - } - - Ok::<_, Error>(()) - })?; - - children_default.insert(key.0, StorageChild { child_info, data: pairs }); - } - - let top = top_storage.into_iter().map(|(k, v)| (k.0, v.0)).collect(); - Ok(Storage { top, children_default }) - } -} diff --git a/client/service/src/chain_ops/check_block.rs b/client/service/src/chain_ops/check_block.rs new file mode 100644 index 0000000000000..34baeb55445a8 --- /dev/null +++ b/client/service/src/chain_ops/check_block.rs @@ -0,0 +1,51 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use futures::{future, prelude::*}; +use sp_runtime::traits::Block as BlockT; +use sp_runtime::generic::BlockId; +use codec::Encode; +use sp_consensus::import_queue::ImportQueue; +use sc_client_api::{BlockBackend, UsageProvider}; + +use std::pin::Pin; +use std::sync::Arc; +use crate::chain_ops::import_blocks; + +/// Re-validate known block. +pub fn check_block( + client: Arc, + import_queue: IQ, + block_id: BlockId +) -> Pin> + Send>> +where + C: BlockBackend + UsageProvider + Send + Sync + 'static, + B: BlockT + for<'de> serde::Deserialize<'de>, + IQ: ImportQueue + 'static, +{ + match client.block(&block_id) { + Ok(Some(block)) => { + let mut buf = Vec::new(); + 1u64.encode_to(&mut buf); + block.encode_to(&mut buf); + let reader = std::io::Cursor::new(buf); + import_blocks(client, import_queue, reader, true, true) + } + Ok(None) => Box::pin(future::err("Unknown block".into())), + Err(e) => Box::pin(future::err(format!("Error reading block: {:?}", e).into())), + } +} diff --git a/client/service/src/chain_ops/export_blocks.rs b/client/service/src/chain_ops/export_blocks.rs new file mode 100644 index 0000000000000..2f32cbf7fbdb7 --- /dev/null +++ b/client/service/src/chain_ops/export_blocks.rs @@ -0,0 +1,104 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use log::info; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, One, Zero, SaturatedConversion +}; +use sp_runtime::generic::BlockId; +use codec::Encode; + +use std::{io::Write, pin::Pin}; +use sc_client_api::{BlockBackend, UsageProvider}; +use std::sync::Arc; +use std::task::Poll; + +/// Performs the blocks export. +pub fn export_blocks( + client: Arc, + mut output: impl Write + 'static, + from: NumberFor, + to: Option>, + binary: bool +) -> Pin>>> +where + C: BlockBackend + UsageProvider + 'static, + B: BlockT, +{ + let mut block = from; + + let last = match to { + Some(v) if v.is_zero() => One::one(), + Some(v) => v, + None => client.usage_info().chain.best_number, + }; + + let mut wrote_header = false; + + // Exporting blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we write a block to the output, the `Future` re-schedules itself and returns + // `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block exports, + // or to stop the operation completely. + let export = future::poll_fn(move |cx| { + let client = &client; + + if last < block { + return Poll::Ready(Err("Invalid block range specified".into())); + } + + if !wrote_header { + info!("Exporting blocks from #{} to #{}", block, last); + if binary { + let last_: u64 = last.saturated_into::(); + let block_: u64 = block.saturated_into::(); + let len: u64 = last_ - block_ + 1; + output.write_all(&len.encode())?; + } + wrote_header = true; + } + + match client.block(&BlockId::number(block))? { + Some(block) => { + if binary { + output.write_all(&block.encode())?; + } else { + serde_json::to_writer(&mut output, &block) + .map_err(|e| format!("Error writing JSON: {}", e))?; + } + }, + // Reached end of the chain. + None => return Poll::Ready(Ok(())), + } + if (block % 10000.into()).is_zero() { + info!("#{}", block); + } + if block == last { + return Poll::Ready(Ok(())); + } + block += One::one(); + + // Re-schedule the task in order to continue the operation. + cx.waker().wake_by_ref(); + Poll::Pending + }); + + Box::pin(export) +} diff --git a/client/service/src/chain_ops/export_raw_state.rs b/client/service/src/chain_ops/export_raw_state.rs new file mode 100644 index 0000000000000..3fe44dbdb142d --- /dev/null +++ b/client/service/src/chain_ops/export_raw_state.rs @@ -0,0 +1,71 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use sp_runtime::traits::Block as BlockT; +use sp_runtime::generic::BlockId; +use sp_core::storage::{StorageKey, well_known_keys, ChildInfo, Storage, StorageChild, StorageMap}; +use sc_client_api::{StorageProvider, UsageProvider}; + +use std::{collections::HashMap, sync::Arc}; + +/// Export the raw state at the given `block`. If `block` is `None`, the +/// best block will be used. +pub fn export_raw_state( + client: Arc, + block: Option>, +) -> Result +where + C: UsageProvider + StorageProvider, + B: BlockT, + BA: sc_client_api::backend::Backend, +{ + let block = block.unwrap_or_else( + || BlockId::Hash(client.usage_info().chain.best_hash) + ); + + let empty_key = StorageKey(Vec::new()); + let mut top_storage = client.storage_pairs(&block, &empty_key)?; + let mut children_default = HashMap::new(); + + // Remove all default child storage roots from the top storage and collect the child storage + // pairs. + while let Some(pos) = top_storage + .iter() + .position(|(k, _)| k.0.starts_with(well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX)) { + let (key, _) = top_storage.swap_remove(pos); + + let key = StorageKey( + key.0[well_known_keys::DEFAULT_CHILD_STORAGE_KEY_PREFIX.len()..].to_vec(), + ); + let child_info = ChildInfo::new_default(&key.0); + + let keys = client.child_storage_keys(&block, &child_info, &empty_key)?; + let mut pairs = StorageMap::new(); + keys.into_iter().try_for_each(|k| { + if let Some(value) = client.child_storage(&block, &child_info, &k)? { + pairs.insert(k.0, value.0); + } + + Ok::<_, Error>(()) + })?; + + children_default.insert(key.0, StorageChild { child_info, data: pairs }); + } + + let top = top_storage.into_iter().map(|(k, v)| (k.0, v.0)).collect(); + Ok(Storage { top, children_default }) +} diff --git a/client/service/src/chain_ops/import_blocks.rs b/client/service/src/chain_ops/import_blocks.rs new file mode 100644 index 0000000000000..46ad0d0501d93 --- /dev/null +++ b/client/service/src/chain_ops/import_blocks.rs @@ -0,0 +1,472 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::error; +use crate::error::Error; +use sc_chain_spec::ChainSpec; +use log::{warn, info}; +use futures::{future, prelude::*}; +use sp_runtime::traits::{ + Block as BlockT, NumberFor, Zero, Header, MaybeSerializeDeserialize, +}; +use sp_runtime::generic::SignedBlock; +use codec::{Decode, IoReader as CodecIoReader}; +use sp_consensus::{ + BlockOrigin, + import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult, ImportQueue}, +}; + +use std::{io::{Read, Seek}, pin::Pin}; +use std::time::{Duration, Instant}; +use futures_timer::Delay; +use std::task::Poll; +use serde_json::{de::IoRead as JsonIoRead, Deserializer, StreamDeserializer}; +use std::convert::{TryFrom, TryInto}; +use sp_runtime::traits::{CheckedDiv, Saturating}; +use sc_client_api::UsageProvider; + +/// Number of blocks we will add to the queue before waiting for the queue to catch up. +const MAX_PENDING_BLOCKS: u64 = 1_024; + +/// Number of milliseconds to wait until next poll. +const DELAY_TIME: u64 = 2_000; + +/// Number of milliseconds that must have passed between two updates. +const TIME_BETWEEN_UPDATES: u64 = 3_000; + +use std::sync::Arc; + +/// Build a chain spec json +pub fn build_spec(spec: &dyn ChainSpec, raw: bool) -> error::Result { + spec.as_json(raw).map_err(Into::into) +} + + +/// Helper enum that wraps either a binary decoder (from parity-scale-codec), or a JSON decoder +/// (from serde_json). Implements the Iterator Trait, calling `next()` will decode the next +/// SignedBlock and return it. +enum BlockIter where + R: std::io::Read + std::io::Seek, +{ + Binary { + // Total number of blocks we are expecting to decode. + num_expected_blocks: u64, + // Number of blocks we have decoded thus far. + read_block_count: u64, + // Reader to the data, used for decoding new blocks. + reader: CodecIoReader, + }, + Json { + // Nubmer of blocks we have decoded thus far. + read_block_count: u64, + // Stream to the data, used for decoding new blocks. + reader: StreamDeserializer<'static, JsonIoRead, SignedBlock>, + }, +} + +impl BlockIter where + R: Read + Seek + 'static, + B: BlockT + MaybeSerializeDeserialize, +{ + fn new(input: R, binary: bool) -> Result { + if binary { + let mut reader = CodecIoReader(input); + // If the file is encoded in binary format, it is expected to first specify the number + // of blocks that are going to be decoded. We read it and add it to our enum struct. + let num_expected_blocks: u64 = Decode::decode(&mut reader) + .map_err(|e| format!("Failed to decode the number of blocks: {:?}", e))?; + Ok(BlockIter::Binary { + num_expected_blocks, + read_block_count: 0, + reader, + }) + } else { + let stream_deser = Deserializer::from_reader(input) + .into_iter::>(); + Ok(BlockIter::Json { + reader: stream_deser, + read_block_count: 0, + }) + } + } + + /// Returns the number of blocks read thus far. + fn read_block_count(&self) -> u64 { + match self { + BlockIter::Binary { read_block_count, .. } + | BlockIter::Json { read_block_count, .. } + => *read_block_count, + } + } + + /// Returns the total number of blocks to be imported, if possible. + fn num_expected_blocks(&self) -> Option { + match self { + BlockIter::Binary { num_expected_blocks, ..} => Some(*num_expected_blocks), + BlockIter::Json {..} => None + } + } +} + +impl Iterator for BlockIter where + R: Read + Seek + 'static, + B: BlockT + MaybeSerializeDeserialize, +{ + type Item = Result, String>; + + fn next(&mut self) -> Option { + match self { + BlockIter::Binary { num_expected_blocks, read_block_count, reader } => { + if read_block_count < num_expected_blocks { + let block_result: Result, _> = SignedBlock::::decode(reader) + .map_err(|e| e.to_string()); + *read_block_count += 1; + Some(block_result) + } else { + // `read_block_count` == `num_expected_blocks` so we've read enough blocks. + None + } + } + BlockIter::Json { reader, read_block_count } => { + let res = Some(reader.next()?.map_err(|e| e.to_string())); + *read_block_count += 1; + res + } + } + } +} + +/// Imports the SignedBlock to the queue. +fn import_block_to_queue( + signed_block: SignedBlock, + queue: &mut TImpQu, + force: bool +) where + TBl: BlockT + MaybeSerializeDeserialize, + TImpQu: 'static + ImportQueue, +{ + let (header, extrinsics) = signed_block.block.deconstruct(); + let hash = header.hash(); + // import queue handles verification and importing it into the client. + queue.import_blocks(BlockOrigin::File, vec![ + IncomingBlock:: { + hash, + header: Some(header), + body: Some(extrinsics), + justification: signed_block.justification, + origin: None, + allow_missing_state: false, + import_existing: force, + } + ]); +} + +/// Returns true if we have imported every block we were supposed to import, else returns false. +fn importing_is_done( + num_expected_blocks: Option, + read_block_count: u64, + imported_blocks: u64 +) -> bool { + if let Some(num_expected_blocks) = num_expected_blocks { + imported_blocks >= num_expected_blocks + } else { + imported_blocks >= read_block_count + } +} + +/// Structure used to log the block importing speed. +struct Speedometer { + best_number: NumberFor, + last_number: Option>, + last_update: Instant, +} + +impl Speedometer { + /// Creates a fresh Speedometer. + fn new() -> Self { + Self { + best_number: NumberFor::::from(0), + last_number: None, + last_update: Instant::now(), + } + } + + /// Calculates `(best_number - last_number) / (now - last_update)` and + /// logs the speed of import. + fn display_speed(&self) { + // Number of milliseconds elapsed since last time. + let elapsed_ms = { + let elapsed = self.last_update.elapsed(); + let since_last_millis = elapsed.as_secs() * 1000; + let since_last_subsec_millis = elapsed.subsec_millis() as u64; + since_last_millis + since_last_subsec_millis + }; + + // Number of blocks that have been imported since last time. + let diff = match self.last_number { + None => return, + Some(n) => self.best_number.saturating_sub(n) + }; + + if let Ok(diff) = TryInto::::try_into(diff) { + // If the number of blocks can be converted to a regular integer, then it's easy: just + // do the math and turn it into a `f64`. + let speed = diff.saturating_mul(10_000).checked_div(u128::from(elapsed_ms)) + .map_or(0.0, |s| s as f64) / 10.0; + info!("📦 Current best block: {} ({:4.1} bps)", self.best_number, speed); + } else { + // If the number of blocks can't be converted to a regular integer, then we need a more + // algebraic approach and we stay within the realm of integers. + let one_thousand = NumberFor::::from(1_000); + let elapsed = NumberFor::::from( + >::try_from(elapsed_ms).unwrap_or(u32::max_value()) + ); + + let speed = diff.saturating_mul(one_thousand).checked_div(&elapsed) + .unwrap_or_else(Zero::zero); + info!("📦 Current best block: {} ({} bps)", self.best_number, speed) + } + } + + /// Updates the Speedometer. + fn update(&mut self, best_number: NumberFor) { + self.last_number = Some(self.best_number); + self.best_number = best_number; + self.last_update = Instant::now(); + } + + // If more than TIME_BETWEEN_UPDATES has elapsed since last update, + // then print and update the speedometer. + fn notify_user(&mut self, best_number: NumberFor) { + let delta = Duration::from_millis(TIME_BETWEEN_UPDATES); + if Instant::now().duration_since(self.last_update) >= delta { + self.display_speed(); + self.update(best_number); + } + } +} + +/// Different State that the `import_blocks` future could be in. +enum ImportState where + R: Read + Seek + 'static, + B: BlockT + MaybeSerializeDeserialize, +{ + /// We are reading from the BlockIter structure, adding those blocks to the queue if possible. + Reading{block_iter: BlockIter}, + /// The queue is full (contains at least MAX_PENDING_BLOCKS blocks) and we are waiting for it to + /// catch up. + WaitingForImportQueueToCatchUp{ + block_iter: BlockIter, + delay: Delay, + block: SignedBlock + }, + // We have added all the blocks to the queue but they are still being processed. + WaitingForImportQueueToFinish{ + num_expected_blocks: Option, + read_block_count: u64, + delay: Delay, + }, +} + +/// Starts the process of importing blocks. +pub fn import_blocks( + client: Arc, + mut import_queue: IQ, + input: impl Read + Seek + Send + 'static, + force: bool, + binary: bool, +) -> Pin> + Send>> +where + C: UsageProvider + Send + Sync + 'static, + B: BlockT + for<'de> serde::Deserialize<'de>, + IQ: ImportQueue + 'static, +{ + struct WaitLink { + imported_blocks: u64, + has_error: bool, + } + + impl WaitLink { + fn new() -> WaitLink { + WaitLink { + imported_blocks: 0, + has_error: false, + } + } + } + + impl Link for WaitLink { + fn blocks_processed( + &mut self, + imported: usize, + _num_expected_blocks: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + self.imported_blocks += imported as u64; + + for result in results { + if let (Err(err), hash) = result { + warn!("There was an error importing block with hash {:?}: {:?}", hash, err); + self.has_error = true; + break; + } + } + } + } + + let mut link = WaitLink::new(); + let block_iter_res: Result, String> = BlockIter::new(input, binary); + + let block_iter = match block_iter_res { + Ok(block_iter) => block_iter, + Err(e) => { + // We've encountered an error while creating the block iterator + // so we can just return a future that returns an error. + return future::ready(Err(Error::Other(e))).boxed() + } + }; + + let mut state = Some(ImportState::Reading{block_iter}); + let mut speedometer = Speedometer::::new(); + + // Importing blocks is implemented as a future, because we want the operation to be + // interruptible. + // + // Every time we read a block from the input or import a bunch of blocks from the import + // queue, the `Future` re-schedules itself and returns `Poll::Pending`. + // This makes it possible either to interleave other operations in-between the block imports, + // or to stop the operation completely. + let import = future::poll_fn(move |cx| { + let client = &client; + let queue = &mut import_queue; + match state.take().expect("state should never be None; qed") { + ImportState::Reading{mut block_iter} => { + match block_iter.next() { + None => { + // The iterator is over: we now need to wait for the import queue to finish. + let num_expected_blocks = block_iter.num_expected_blocks(); + let read_block_count = block_iter.read_block_count(); + let delay = Delay::new(Duration::from_millis(DELAY_TIME)); + state = Some(ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, delay + }); + }, + Some(block_result) => { + let read_block_count = block_iter.read_block_count(); + match block_result { + Ok(block) => { + if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { + // The queue is full, so do not add this block and simply wait + // until the queue has made some progress. + let delay = Delay::new(Duration::from_millis(DELAY_TIME)); + state = Some(ImportState::WaitingForImportQueueToCatchUp { + block_iter, delay, block + }); + } else { + // Queue is not full, we can keep on adding blocks to the queue. + import_block_to_queue(block, queue, force); + state = Some(ImportState::Reading{block_iter}); + } + } + Err(e) => { + return Poll::Ready( + Err(Error::Other( + format!("Error reading block #{}: {}", read_block_count, e) + ))) + } + } + } + } + }, + ImportState::WaitingForImportQueueToCatchUp{block_iter, mut delay, block} => { + let read_block_count = block_iter.read_block_count(); + if read_block_count - link.imported_blocks >= MAX_PENDING_BLOCKS { + // Queue is still full, so wait until there is room to insert our block. + match Pin::new(&mut delay).poll(cx) { + Poll::Pending => { + state = Some(ImportState::WaitingForImportQueueToCatchUp { + block_iter, delay, block + }); + return Poll::Pending + }, + Poll::Ready(_) => { + delay.reset(Duration::from_millis(DELAY_TIME)); + }, + } + state = Some(ImportState::WaitingForImportQueueToCatchUp { + block_iter, delay, block + }); + } else { + // Queue is no longer full, so we can add our block to the queue. + import_block_to_queue(block, queue, force); + // Switch back to Reading state. + state = Some(ImportState::Reading{block_iter}); + } + }, + ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, mut delay + } => { + // All the blocks have been added to the queue, which doesn't mean they + // have all been properly imported. + if importing_is_done(num_expected_blocks, read_block_count, link.imported_blocks) { + // Importing is done, we can log the result and return. + info!( + "🎉 Imported {} blocks. Best: #{}", + read_block_count, client.usage_info().chain.best_number + ); + return Poll::Ready(Ok(())) + } else { + // Importing is not done, we still have to wait for the queue to finish. + // Wait for the delay, because we know the queue is lagging behind. + match Pin::new(&mut delay).poll(cx) { + Poll::Pending => { + state = Some(ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, delay + }); + return Poll::Pending + }, + Poll::Ready(_) => { + delay.reset(Duration::from_millis(DELAY_TIME)); + }, + } + + state = Some(ImportState::WaitingForImportQueueToFinish { + num_expected_blocks, read_block_count, delay + }); + } + } + } + + queue.poll_actions(cx, &mut link); + + let best_number = client.usage_info().chain.best_number; + speedometer.notify_user(best_number); + + if link.has_error { + return Poll::Ready(Err( + Error::Other( + format!("Stopping after #{} blocks because of an error", link.imported_blocks) + ) + )) + } + + cx.waker().wake_by_ref(); + Poll::Pending + }); + Box::pin(import) +} diff --git a/client/service/src/chain_ops/mod.rs b/client/service/src/chain_ops/mod.rs new file mode 100644 index 0000000000000..af6e6f632fc06 --- /dev/null +++ b/client/service/src/chain_ops/mod.rs @@ -0,0 +1,29 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Chain utilities. + +mod check_block; +mod export_blocks; +mod export_raw_state; +mod import_blocks; +mod revert_chain; + +pub use check_block::*; +pub use export_blocks::*; +pub use export_raw_state::*; +pub use import_blocks::*; +pub use revert_chain::*; diff --git a/client/service/src/chain_ops/revert_chain.rs b/client/service/src/chain_ops/revert_chain.rs new file mode 100644 index 0000000000000..129aea0408685 --- /dev/null +++ b/client/service/src/chain_ops/revert_chain.rs @@ -0,0 +1,43 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::error::Error; +use log::info; +use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; +use sc_client_api::{Backend, UsageProvider}; +use std::sync::Arc; + +/// Performs a revert of `blocks` blocks. +pub fn revert_chain( + client: Arc, + backend: Arc, + blocks: NumberFor +) -> Result<(), Error> +where + B: BlockT, + C: UsageProvider, + BA: Backend, +{ + let reverted = backend.revert(blocks, false)?; + let info = client.usage_info().chain; + + if reverted.is_zero() { + info!("There aren't any non-finalized blocks to revert."); + } else { + info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); + } + Ok(()) +} diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index c3c8f60e689ad..1d41490956858 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -23,7 +23,6 @@ #![recursion_limit="128"] pub mod config; -#[macro_use] pub mod chain_ops; pub mod error; @@ -55,7 +54,7 @@ use sp_utils::{status_sinks, mpsc::{tracing_unbounded, TracingUnboundedReceiver, pub use self::error::Error; pub use self::builder::{ new_full_client, new_client, - ServiceBuilder, ServiceBuilderCommand, TFullClient, TLightClient, TFullBackend, TLightBackend, + ServiceBuilder, TFullClient, TLightClient, TFullBackend, TLightBackend, TFullCallExecutor, TLightCallExecutor, RpcExtensionBuilder, }; pub use config::{ @@ -79,6 +78,7 @@ pub use sc_network::config::{ pub use sc_tracing::TracingReceiver; pub use task_manager::SpawnTaskHandle; pub use task_manager::TaskManager; +pub use sp_consensus::import_queue::ImportQueue; use sc_client_api::{Backend, BlockchainEvents}; const DEFAULT_PROTOCOL_ID: &str = "sup";