diff --git a/core/cli/src/informant.rs b/core/cli/src/informant.rs index b5a2f03d79546..d8f0471a89f75 100644 --- a/core/cli/src/informant.rs +++ b/core/cli/src/informant.rs @@ -21,22 +21,19 @@ use futures::{Future, Stream}; use futures03::{StreamExt as _, TryStreamExt as _}; use log::{info, warn}; use sr_primitives::{generic::BlockId, traits::Header}; -use service::{Service, Components}; +use service::AbstractService; use tokio::runtime::TaskExecutor; mod display; /// Spawn informant on the event loop #[deprecated(note = "Please use informant::build instead, and then create the task manually")] -pub fn start(service: &Service, exit: ::exit_future::Exit, handle: TaskExecutor) where - C: Components, -{ +pub fn start(service: &impl AbstractService, exit: ::exit_future::Exit, handle: TaskExecutor) { handle.spawn(exit.until(build(service)).map(|_| ())); } /// Creates an informant in the form of a `Future` that must be polled regularly. -pub fn build(service: &Service) -> impl Future -where C: Components { +pub fn build(service: &impl AbstractService) -> impl Future { let client = service.client(); let mut display = display::InformantDisplay::new(); diff --git a/core/cli/src/lib.rs b/core/cli/src/lib.rs index ef5290413166d..b5e5eb0c30cfe 100644 --- a/core/cli/src/lib.rs +++ b/core/cli/src/lib.rs @@ -28,8 +28,9 @@ pub mod informant; use client::ExecutionStrategies; use service::{ - config::Configuration, - ServiceFactory, FactoryFullConfiguration, RuntimeGenesis, + config::Configuration, ServiceFactory, + ServiceBuilderExport, ServiceBuilderImport, ServiceBuilderRevert, + FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, PruningMode, ChainSpec, }; use network::{ @@ -342,6 +343,36 @@ impl<'a> ParseAndPrepareExport<'a> { config, exit.into_exit(), file, from.into(), to.map(Into::into), json ).map_err(Into::into) } + + /// Runs the command and exports from the chain. + pub fn run_with_builder( + self, + builder: F, + spec_factory: S, + exit: E, + ) -> error::Result<()> + where S: FnOnce(&str) -> Result>, String>, + F: FnOnce(Configuration) -> Result, + B: ServiceBuilderExport, + C: Default, + G: RuntimeGenesis, + E: IntoExit + { + let config = create_config_with_db_path(spec_factory, &self.params.shared_params, self.version)?; + + info!("DB path: {}", config.database_path.display()); + let from = self.params.from.unwrap_or(1); + let to = self.params.to; + let json = self.params.json; + + let file: Box = match self.params.output { + Some(filename) => Box::new(File::create(filename)?), + None => Box::new(stdout()), + }; + + builder(config)?.export_blocks(exit.into_exit(), file, from.into(), to.map(Into::into), json)?; + Ok(()) + } } /// Command ready to import the chain. @@ -381,6 +412,41 @@ impl<'a> ParseAndPrepareImport<'a> { tokio::run(fut); Ok(()) } + + /// Runs the command and imports to the chain. + pub fn run_with_builder( + self, + builder: F, + spec_factory: S, + exit: E, + ) -> error::Result<()> + where S: FnOnce(&str) -> Result>, String>, + F: FnOnce(Configuration) -> Result, + B: ServiceBuilderImport, + C: Default, + G: RuntimeGenesis, + E: IntoExit + { + let mut config = create_config_with_db_path(spec_factory, &self.params.shared_params, self.version)?; + config.execution_strategies = ExecutionStrategies { + importing: self.params.execution.into(), + other: self.params.execution.into(), + ..Default::default() + }; + + let file: Box = match self.params.input { + Some(filename) => Box::new(File::open(filename)?), + None => { + let mut buffer = Vec::new(); + stdin().read_to_end(&mut buffer)?; + Box::new(Cursor::new(buffer)) + }, + }; + + let fut = builder(config)?.import_blocks(exit.into_exit(), file)?; + tokio::run(fut); + Ok(()) + } } /// Command ready to purge the chain. @@ -450,6 +516,23 @@ impl<'a> ParseAndPrepareRevert<'a> { let blocks = self.params.num; Ok(service::chain_ops::revert_chain::(config, blocks.into())?) } + + /// Runs the command and reverts the chain. + pub fn run_with_builder( + self, + builder: F, + spec_factory: S + ) -> error::Result<()> + where S: FnOnce(&str) -> Result>, String>, + F: FnOnce(Configuration) -> Result, + B: ServiceBuilderRevert, + C: Default, + G: RuntimeGenesis { + let config = create_config_with_db_path(spec_factory, &self.params.shared_params, self.version)?; + let blocks = self.params.num; + builder(config)?.revert_chain(blocks.into())?; + Ok(()) + } } /// Parse command line interface arguments and executes the desired command. diff --git a/core/service/src/chain_ops.rs b/core/service/src/chain_ops.rs index c801b81186f18..df943d7597a80 100644 --- a/core/service/src/chain_ops.rs +++ b/core/service/src/chain_ops.rs @@ -18,42 +18,27 @@ use std::{self, io::{Read, Write, Seek}}; use futures::prelude::*; -use futures03::TryFutureExt as _; use log::{info, warn}; -use sr_primitives::generic::{SignedBlock, BlockId}; -use sr_primitives::traits::{SaturatedConversion, Zero, One, Block, Header, NumberFor}; -use consensus_common::import_queue::{ImportQueue, IncomingBlock, Link, BlockImportError, BlockImportResult}; -use network::message; +use sr_primitives::generic::BlockId; +use sr_primitives::traits::{SaturatedConversion, Zero, One, Header, NumberFor}; +use consensus_common::import_queue::ImportQueue; -use consensus_common::BlockOrigin; use crate::components::{self, Components, ServiceFactory, FactoryFullConfiguration, FactoryBlockNumber, RuntimeGenesis}; use crate::new_client; use codec::{Decode, Encode, IoReader}; use crate::error; use crate::chain_spec::ChainSpec; -/// Export a range of blocks to a binary stream. -pub fn export_blocks( - config: FactoryFullConfiguration, - exit: E, - mut output: W, - from: FactoryBlockNumber, - to: Option>, - json: bool -) -> error::Result<()> - where - F: ServiceFactory, - E: Future + Send + 'static, - W: Write, -{ - let client = new_client::(&config)?; - let mut block = from; +#[macro_export] +macro_rules! export_blocks { +($client:ident, $exit:ident, $output:ident, $from:ident, $to:ident, $json:ident) => {{ + let mut block = $from; - let last = match to { + let last = match $to { Some(v) if v.is_zero() => One::one(), Some(v) => v, - None => client.info().chain.best_number, + None => $client.info().chain.best_number, }; if last < block { @@ -62,28 +47,28 @@ pub fn export_blocks( let (exit_send, exit_recv) = std::sync::mpsc::channel(); ::std::thread::spawn(move || { - let _ = exit.wait(); + let _ = $exit.wait(); let _ = exit_send.send(()); }); info!("Exporting blocks from #{} to #{}", block, last); - if !json { + if !$json { let last_: u64 = last.saturated_into::(); let block_: u64 = block.saturated_into::(); let len: u64 = last_ - block_ + 1; - output.write(&len.encode())?; + $output.write(&len.encode())?; } loop { if exit_recv.try_recv().is_ok() { break; } - match client.block(&BlockId::number(block))? { + match $client.block(&BlockId::number(block))? { Some(block) => { - if json { - serde_json::to_writer(&mut output, &block) + if $json { + serde_json::to_writer(&mut $output, &block) .map_err(|e| format!("Error writing JSON: {}", e))?; } else { - output.write(&block.encode())?; + $output.write(&block.encode())?; } }, None => break, @@ -97,8 +82,37 @@ pub fn export_blocks( block += One::one(); } Ok(()) +}} +} + +/// Export a range of blocks to a binary stream. +pub fn export_blocks( + config: FactoryFullConfiguration, + exit: E, + mut output: W, + from: FactoryBlockNumber, + to: Option>, + json: bool +) -> error::Result<()> + where + F: ServiceFactory, + E: Future + Send + 'static, + W: Write, +{ + let client = new_client::(&config)?; + export_blocks!(client, exit, output, from, to, json) } +#[macro_export] +macro_rules! import_blocks { +($block:ty, $client:ident, $queue:ident, $exit:ident, $input:ident) => {{ + use consensus_common::import_queue::{IncomingBlock, Link, BlockImportError, BlockImportResult}; + use consensus_common::BlockOrigin; + use network::message; + use sr_primitives::generic::SignedBlock; + use sr_primitives::traits::Block; + use futures03::TryFutureExt as _; + struct WaitLink { imported_blocks: u64, has_error: bool, @@ -132,31 +146,13 @@ impl Link for WaitLink { } } -/// Returns a future that import blocks from a binary stream. -pub fn import_blocks( - mut config: FactoryFullConfiguration, - exit: E, - input: R -) -> error::Result> - where F: ServiceFactory, E: Future + Send + 'static, R: Read + Seek, -{ - let client = new_client::(&config)?; - // FIXME #1134 this shouldn't need a mutable config. - let select_chain = components::FullComponents::::build_select_chain(&mut config, client.clone())?; - let (mut queue, _) = components::FullComponents::::build_import_queue( - &mut config, - client.clone(), - select_chain, - None, - )?; - let (exit_send, exit_recv) = std::sync::mpsc::channel(); ::std::thread::spawn(move || { - let _ = exit.wait(); + let _ = $exit.wait(); let _ = exit_send.send(()); }); - let mut io_reader_input = IoReader(input); + let mut io_reader_input = IoReader($input); let count: u64 = Decode::decode(&mut io_reader_input) .map_err(|e| format!("Error reading file: {}", e))?; info!("Importing {} blocks", count); @@ -165,11 +161,11 @@ pub fn import_blocks( if exit_recv.try_recv().is_ok() { break; } - match SignedBlock::::decode(&mut io_reader_input) { + match SignedBlock::<$block>::decode(&mut io_reader_input) { Ok(signed) => { let (header, extrinsics) = signed.block.deconstruct(); let hash = header.hash(); - let block = message::BlockData:: { + let block = message::BlockData::<$block> { hash, justification: signed.justification, header: Some(header), @@ -178,8 +174,8 @@ pub fn import_blocks( message_queue: None }; // import queue handles verification and importing it into the client - queue.import_blocks(BlockOrigin::File, vec![ - IncomingBlock:: { + $queue.import_blocks(BlockOrigin::File, vec![ + IncomingBlock::<$block> { hash: block.hash, header: block.header, body: block.body, @@ -208,7 +204,7 @@ pub fn import_blocks( let blocks_before = link.imported_blocks; let _ = futures03::future::poll_fn(|cx| { - queue.poll_actions(cx, &mut link); + $queue.poll_actions(cx, &mut link); std::task::Poll::Pending::> }).compat().poll(); if link.has_error { @@ -226,24 +222,41 @@ pub fn import_blocks( ); } if link.imported_blocks >= count { - info!("Imported {} blocks. Best: #{}", block_count, client.info().chain.best_number); + info!("Imported {} blocks. Best: #{}", block_count, $client.info().chain.best_number); Ok(Async::Ready(())) } else { Ok(Async::NotReady) } })) +}} } -/// Revert the chain. -pub fn revert_chain( - config: FactoryFullConfiguration, - blocks: FactoryBlockNumber -) -> error::Result<()> - where F: ServiceFactory, +/// Returns a future that import blocks from a binary stream. +pub fn import_blocks( + mut config: FactoryFullConfiguration, + exit: E, + input: R +) -> error::Result> + where F: ServiceFactory, E: Future + Send + 'static, R: Read + Seek, { let client = new_client::(&config)?; - let reverted = client.revert(blocks)?; - let info = client.info().chain; + // FIXME #1134 this shouldn't need a mutable config. + let select_chain = components::FullComponents::::build_select_chain(&mut config, client.clone())?; + let (mut queue, _) = components::FullComponents::::build_import_queue( + &mut config, + client.clone(), + select_chain, + None + )?; + + import_blocks!(F::Block, client, queue, exit, input) +} + +#[macro_export] +macro_rules! revert_chain { +($client:ident, $blocks:ident) => {{ + let reverted = $client.revert($blocks)?; + let info = $client.info().chain; if reverted.is_zero() { info!("There aren't any non-finalized blocks to revert."); @@ -251,6 +264,18 @@ pub fn revert_chain( info!("Reverted {} blocks. Best: #{} ({})", reverted, info.best_number, info.best_hash); } Ok(()) +}} +} + +/// Revert the chain. +pub fn revert_chain( + config: FactoryFullConfiguration, + blocks: FactoryBlockNumber +) -> error::Result<()> + where F: ServiceFactory, +{ + let client = new_client::(&config)?; + revert_chain!(client, blocks) } /// Build a chain spec json diff --git a/core/service/src/components.rs b/core/service/src/components.rs index b88abd4a98b03..967e4439f291d 100644 --- a/core/service/src/components.rs +++ b/core/service/src/components.rs @@ -30,7 +30,7 @@ use network::{ use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; use transaction_pool::txpool::{self, Options as TransactionPoolOptions, Pool as TransactionPool}; use sr_primitives::{ - BuildStorage, traits::{Block as BlockT, Header as HeaderT, ProvideRuntimeApi}, generic::BlockId + BuildStorage, traits::{Block as BlockT, Header as HeaderT, NumberFor, ProvideRuntimeApi}, generic::BlockId }; use crate::config::Configuration; use primitives::{Blake2Hasher, H256, traits::BareCryptoStorePtr}; @@ -192,25 +192,44 @@ impl StartRPC for C where transaction_pool: Arc>, keystore: KeyStorePtr, ) -> rpc::RpcHandler { - let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone()); - let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone()); - let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone()); - let author = rpc::apis::author::Author::new( - client, - transaction_pool, - subscriptions, - keystore, - ); - let system = rpc::apis::system::System::new(rpc_system_info, system_send_back); - rpc::rpc_handler::, ComponentExHash, _, _, _, _>( - state, - chain, - author, - system, - ) + start_rpc(client, system_send_back, rpc_system_info, task_executor, transaction_pool, keystore) } } +pub(crate) fn start_rpc( + client: Arc>, + system_send_back: mpsc::UnboundedSender>, + rpc_system_info: SystemInfo, + task_executor: TaskExecutor, + transaction_pool: Arc>, + keystore: KeyStorePtr, +) -> rpc::RpcHandler +where + Block: BlockT::Out>, + Backend: client::backend::Backend + 'static, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: runtime_api::Metadata + session::SessionKeys, + Api: Send + Sync + 'static, + Executor: client::CallExecutor + Send + Sync + Clone + 'static, + PoolApi: txpool::ChainApi + 'static { + let subscriptions = rpc::apis::Subscriptions::new(task_executor.clone()); + let chain = rpc::apis::chain::Chain::new(client.clone(), subscriptions.clone()); + let state = rpc::apis::state::State::new(client.clone(), subscriptions.clone()); + let author = rpc::apis::author::Author::new( + client, + transaction_pool, + subscriptions, + keystore, + ); + let system = rpc::apis::system::System::new(rpc_system_info, system_send_back); + rpc::rpc_handler::::Hash, _, _, _, _>( + state, + chain, + author, + system, + ) +} + /// Something that can maintain transaction pool on every imported block. pub trait MaintainTransactionPool { fn maintain_transaction_pool( @@ -220,7 +239,7 @@ pub trait MaintainTransactionPool { ) -> error::Result<()>; } -fn maintain_transaction_pool( +pub(crate) fn maintain_transaction_pool( id: &BlockId, client: &Client, transaction_pool: &TransactionPool, @@ -288,12 +307,36 @@ impl OffchainWorker for C where network_state: &Arc, is_validator: bool, ) -> error::Result + Send>> { - let future = offchain.on_block_imported(number, pool, network_state.clone(), is_validator) - .map(|()| Ok(())); - Ok(Box::new(Compat::new(future))) + offchain_workers(number, offchain, pool, network_state, is_validator) } } +pub(crate) fn offchain_workers( + number: &NumberFor, + offchain: &offchain::OffchainWorkers< + Client, + >::OffchainStorage, + Block + >, + pool: &Arc>, + network_state: &Arc, + is_validator: bool, +) -> error::Result + Send>> +where + Block: BlockT::Out>, + Backend: client::backend::Backend + 'static, + Api: 'static, + >::OffchainStorage: 'static, + Client: ProvideRuntimeApi + Send + Sync, + as ProvideRuntimeApi>::Api: offchain::OffchainWorkerApi, + Executor: client::CallExecutor + 'static, + PoolApi: txpool::ChainApi + 'static, +{ + let future = offchain.on_block_imported(number, pool, network_state.clone(), is_validator) + .map(|()| Ok(())); + Ok(Box::new(Compat::new(future))) +} + /// The super trait that combines all required traits a `Service` needs to implement. pub trait ServiceTrait: Deref> diff --git a/core/service/src/factory.rs b/core/service/src/factory.rs new file mode 100644 index 0000000000000..9868cf23c4019 --- /dev/null +++ b/core/service/src/factory.rs @@ -0,0 +1,607 @@ +// Copyright 2017-2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +use crate::{NewService, NetworkStatus, NetworkState, error::Error, DEFAULT_PROTOCOL_ID}; +use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, components::maintain_transaction_pool}; +use crate::{components, TransactionPoolAdapter}; +use crate::config::{Configuration, Roles}; +use client::{BlockchainEvents, Client, runtime_api}; +use codec::{Decode, Encode, IoReader}; +use consensus_common::import_queue::ImportQueue; +use futures::{prelude::*, sync::mpsc}; +use futures03::{StreamExt as _, TryStreamExt as _}; +use keystore::Store as Keystore; +use log::{info, warn}; +use network::{FinalityProofProvider, OnDemand, NetworkService, NetworkStateInfo}; +use network::{config::BoxFinalityProofRequestBuilder, specialization::NetworkSpecialization}; +use parking_lot::{Mutex, RwLock}; +use primitives::{Blake2Hasher, H256, Hasher}; +use sr_primitives::{BuildStorage, generic::BlockId}; +use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi, NumberFor, One, Zero, Header, SaturatedConversion}; +use substrate_executor::{NativeExecutor, NativeExecutionDispatch}; +use serde::{Serialize, de::DeserializeOwned}; +use std::{io::{Read, Write, Seek}, marker::PhantomData, sync::Arc, sync::atomic::AtomicBool}; +use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; +use tel::{telemetry, SUBSTRATE_INFO}; +use transaction_pool::txpool::{ChainApi, Pool as TransactionPool}; + +/// Aggregator for the components required to build a service. +/// +/// # Usage +/// +/// Call [`ServiceBuilder::new_full`] or [`ServiceBuilder::new_light`], then call the various +/// `with_` methods to add the required components that you built yourself: +/// +/// - [`with_select_chain`](ServiceBuilder::with_select_chain) +/// - [`with_import_queue`](ServiceBuilder::with_import_queue) +/// - [`with_network_protocol`](ServiceBuilder::with_network_protocol) +/// - [`with_finality_proof_provider`](ServiceBuilder::with_finality_proof_provider) +/// - [`with_transaction_pool`](ServiceBuilder::with_transaction_pool) +/// +/// After this is done, call [`build`](ServiceBuilder::build) to construct the service. +/// +/// The order in which the `with_*` methods are called doesn't matter, as the correct binding of +/// generics is done when you call `build`. +/// +pub struct ServiceBuilder { + config: Configuration, + client: Arc, + keystore: Arc>, + fetcher: Option, + select_chain: Option, + import_queue: TImpQu, + finality_proof_request_builder: Option, + finality_proof_provider: Option, + network_protocol: TNetP, + transaction_pool: Arc, + marker: PhantomData<(TBl, TRtApi)>, +} + +impl ServiceBuilder<(), (), TCfg, TGen, (), (), (), (), (), (), (), ()> +where TGen: Serialize + DeserializeOwned + BuildStorage { + /// Start the service builder with a configuration. + pub fn new_full, TRtApi, TExecDisp: NativeExecutionDispatch>( + config: Configuration + ) -> Result, + client::LocalCallExecutor, NativeExecutor>, + TBl, + TRtApi + >, + Arc>, + (), + (), + BoxFinalityProofRequestBuilder, + (), + (), + () + >, Error> { + let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; + + let db_settings = client_db::DatabaseSettings { + cache_size: None, + state_cache_size: config.state_cache_size, + state_cache_child_ratio: + config.state_cache_child_ratio.map(|v| (v, 100)), + path: config.database_path.clone(), + pruning: config.pruning.clone(), + }; + + let executor = NativeExecutor::::new(config.default_heap_pages); + + let client = Arc::new(client_db::new_client( + db_settings, + executor, + &config.chain_spec, + config.execution_strategies.clone(), + Some(keystore.clone()), + )?); + + Ok(ServiceBuilder { + config, + client, + keystore, + fetcher: None, + select_chain: None, + import_queue: (), + finality_proof_request_builder: None, + finality_proof_provider: None, + network_protocol: (), + transaction_pool: Arc::new(()), + marker: PhantomData, + }) + } + + /// Start the service builder with a configuration. + pub fn new_light, TRtApi, TExecDisp: NativeExecutionDispatch + 'static>( + config: Configuration + ) -> Result, network::OnDemand, Blake2Hasher>, + client::light::call_executor::RemoteOrLocalCallExecutor< + TBl, + client::light::backend::Backend< + client_db::light::LightStorage, + network::OnDemand, + Blake2Hasher + >, + client::light::call_executor::RemoteCallExecutor< + client::light::blockchain::Blockchain< + client_db::light::LightStorage, + network::OnDemand + >, + network::OnDemand, + >, + client::LocalCallExecutor< + client::light::backend::Backend< + client_db::light::LightStorage, + network::OnDemand, + Blake2Hasher + >, + NativeExecutor + > + >, + TBl, + TRtApi + >, + Arc>, + (), + (), + BoxFinalityProofRequestBuilder, + (), + (), + () + >, Error> { + let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; + + let db_settings = client_db::DatabaseSettings { + cache_size: config.database_cache_size.map(|u| u as usize), + state_cache_size: config.state_cache_size, + state_cache_child_ratio: + config.state_cache_child_ratio.map(|v| (v, 100)), + path: config.database_path.clone(), + pruning: config.pruning.clone(), + }; + + let executor = NativeExecutor::::new(config.default_heap_pages); + + let db_storage = client_db::light::LightStorage::new(db_settings)?; + let light_blockchain = client::light::new_light_blockchain(db_storage); + let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor.clone())); + let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); + let client_backend = client::light::new_light_backend(light_blockchain, fetcher.clone()); + let client = client::light::new_light(client_backend, fetcher.clone(), &config.chain_spec, executor)?; + + Ok(ServiceBuilder { + config, + client: Arc::new(client), + keystore, + fetcher: Some(fetcher), + select_chain: None, + import_queue: (), + finality_proof_request_builder: None, + finality_proof_provider: None, + network_protocol: (), + transaction_pool: Arc::new(()), + marker: PhantomData, + }) + } +} + +impl + ServiceBuilder { + + /// Returns a reference to the client that was stored in this builder. + pub fn client(&self) -> &Arc { + &self.client + } + + /// Returns a reference to the select-chain that was stored in this builder. + pub fn select_chain(&self) -> Option<&TSc> { + self.select_chain.as_ref() + } + + /// Defines which head-of-chain strategy to use. + pub fn with_opt_select_chain( + mut self, + select_chain_builder: impl FnOnce(&mut Configuration, Arc) -> Result, Error> + ) -> Result, Error> { + let select_chain = select_chain_builder(&mut self.config, self.client.clone())?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + marker: self.marker, + }) + } + + /// Defines which head-of-chain strategy to use. + pub fn with_select_chain( + self, + builder: impl FnOnce(&mut Configuration, Arc) -> Result + ) -> Result, Error> { + self.with_opt_select_chain(|cfg, cl| builder(cfg, cl).map(Option::Some)) + } + + /// Defines which import queue to use. + pub fn with_import_queue( + mut self, + builder: impl FnOnce(&mut Configuration, Arc, Option, Arc) -> Result + ) -> Result, Error> + where TSc: Clone { + let import_queue = builder(&mut self.config, self.client.clone(), self.select_chain.clone(), self.transaction_pool.clone())?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + marker: self.marker, + }) + } + + /// Defines which network specialization protocol to use. + pub fn with_network_protocol( + self, + network_protocol_builder: impl FnOnce(&Configuration) -> Result + ) -> Result, Error> { + let network_protocol = network_protocol_builder(&self.config)?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol, + transaction_pool: self.transaction_pool, + marker: self.marker, + }) + } + + /// Defines which strategy to use for providing finality proofs. + pub fn with_opt_finality_proof_provider( + self, + builder: impl FnOnce(Arc) -> Result>>, Error> + ) -> Result>, + TNetP, + TExPool + >, Error> { + let finality_proof_provider = builder(self.client.clone())?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + marker: self.marker, + }) + } + + /// Defines which strategy to use for providing finality proofs. + pub fn with_finality_proof_provider( + self, + build: impl FnOnce(Arc) -> Result>, Error> + ) -> Result>, + TNetP, + TExPool + >, Error> { + self.with_opt_finality_proof_provider(|client| build(client).map(Option::Some)) + } + + /// Defines which import queue to use. + pub fn with_import_queue_and_opt_fprb( + mut self, + builder: impl FnOnce(&mut Configuration, Arc, Option, Arc) + -> Result<(UImpQu, Option), Error> + ) -> Result, Error> + where TSc: Clone { + let (import_queue, fprb) = builder(&mut self.config, self.client.clone(), self.select_chain.clone(), self.transaction_pool.clone())?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue, + finality_proof_request_builder: fprb, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: self.transaction_pool, + marker: self.marker, + }) + } + + /// Defines which import queue to use. + pub fn with_import_queue_and_fprb( + self, + builder: impl FnOnce(&mut Configuration, Arc, Option, Arc) -> Result<(UImpQu, UFprb), Error> + ) -> Result, Error> + where TSc: Clone { + self.with_import_queue_and_opt_fprb(|cfg, cl, sc, tx| builder(cfg, cl, sc, tx).map(|(q, f)| (q, Some(f)))) + } + + /// Defines which transaction pool to use. + pub fn with_transaction_pool( + self, + transaction_pool_builder: impl FnOnce(transaction_pool::txpool::Options, Arc) -> Result + ) -> Result, Error> { + let transaction_pool = transaction_pool_builder(self.config.transaction_pool.clone(), self.client.clone())?; + + Ok(ServiceBuilder { + config: self.config, + client: self.client, + keystore: self.keystore, + fetcher: self.fetcher, + select_chain: self.select_chain, + import_queue: self.import_queue, + finality_proof_request_builder: self.finality_proof_request_builder, + finality_proof_provider: self.finality_proof_provider, + network_protocol: self.network_protocol, + transaction_pool: Arc::new(transaction_pool), + marker: self.marker, + }) + } +} + +/// Implemented on `ServiceBuilder`. Allows importing blocks once you have given all the required +/// components to the builder. +pub trait ServiceBuilderImport { + /// Starts the process of importing blocks. + fn import_blocks( + self, + exit: impl Future + Send + 'static, + input: impl Read + Seek, + ) -> Result + Send>, Error>; +} + +/// Implemented on `ServiceBuilder`. Allows exporting blocks once you have given all the required +/// components to the builder. +pub trait ServiceBuilderExport { + /// Type of block of the builder. + type Block: BlockT; + + /// Performs the blocks export. + fn export_blocks( + &self, + exit: impl Future + Send + 'static, + output: impl Write, + from: NumberFor, + to: Option>, + json: bool + ) -> Result<(), Error>; +} + +/// Implemented on `ServiceBuilder`. Allows reverting the chain once you have given all the +/// required components to the builder. +pub trait ServiceBuilderRevert { + /// Type of block of the builder. + type Block: BlockT; + + /// Performs a revert of `blocks` bocks. + fn revert_chain( + &self, + blocks: NumberFor + ) -> Result<(), Error>; +} + +impl ServiceBuilderImport for + ServiceBuilder, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool> +where + TBl: BlockT::Out>, + TBackend: 'static + client::backend::Backend + Send, + TExec: 'static + client::CallExecutor + Send + Sync + Clone, + TImpQu: 'static + ImportQueue, + TRtApi: 'static + Send + Sync, +{ + fn import_blocks( + self, + exit: impl Future + Send + 'static, + input: impl Read + Seek, + ) -> Result + Send>, Error> { + let client = self.client; + let mut queue = self.import_queue; + import_blocks!(TBl, client, queue, exit, input) + .map(|f| Box::new(f) as Box<_>) + } +} + +impl ServiceBuilderExport for + ServiceBuilder, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool> +where + TBl: BlockT::Out>, + TBackend: 'static + client::backend::Backend + Send, + TExec: 'static + client::CallExecutor + Send + Sync + Clone +{ + type Block = TBl; + + fn export_blocks( + &self, + exit: impl Future + Send + 'static, + mut output: impl Write, + from: NumberFor, + to: Option>, + json: bool + ) -> Result<(), Error> { + let client = &self.client; + export_blocks!(client, exit, output, from, to, json) + } +} + +impl ServiceBuilderRevert for + ServiceBuilder, TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool> +where + TBl: BlockT::Out>, + TBackend: 'static + client::backend::Backend + Send, + TExec: 'static + client::CallExecutor + Send + Sync + Clone +{ + type Block = TBl; + + fn revert_chain( + &self, + blocks: NumberFor + ) -> Result<(), Error> { + let client = &self.client; + revert_chain!(client, blocks) + } +} + +impl +ServiceBuilder< + TBl, + TRtApi, + TCfg, + TGen, + Client, + Arc>, + TSc, + TImpQu, + BoxFinalityProofRequestBuilder, + Arc>, + TNetP, + TransactionPool +> where + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: + runtime_api::Metadata + offchain::OffchainWorkerApi + runtime_api::TaggedTransactionQueue + session::SessionKeys, + TBl: BlockT::Out>, + TRtApi: 'static + Send + Sync, + TCfg: Default, + TGen: Serialize + DeserializeOwned + BuildStorage, + TBackend: 'static + client::backend::Backend + Send, + TExec: 'static + client::CallExecutor + Send + Sync + Clone, + TSc: Clone, + TImpQu: 'static + ImportQueue, + TNetP: NetworkSpecialization, + TExPoolApi: 'static + ChainApi::Hash>, +{ + /// Builds the service. + pub fn build(self) -> Result, + TBl, + Client, + TSc, + NetworkStatus, + NetworkService::Hash>, + TransactionPool, + offchain::OffchainWorkers< + Client, + TBackend::OffchainStorage, + TBl + >, + >, Error> { + let mut config = self.config; + session::generate_initial_session_keys( + self.client.clone(), + config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default() + )?; + let ( + client, + fetcher, + keystore, + select_chain, + import_queue, + finality_proof_request_builder, + finality_proof_provider, + network_protocol, + transaction_pool + ) = ( + self.client, + self.fetcher, + self.keystore, + self.select_chain, + self.import_queue, + self.finality_proof_request_builder, + self.finality_proof_provider, + self.network_protocol, + self.transaction_pool + ); + + new_impl!( + TBl, + config, + move |_| -> Result<_, Error> { + Ok(( + client, + fetcher, + keystore, + select_chain, + import_queue, + finality_proof_request_builder, + finality_proof_provider, + network_protocol, + transaction_pool + )) + }, + |h, c, tx| maintain_transaction_pool(h, c, tx), + |n, o, p, ns, v| components::offchain_workers(n, o, p, ns, v), + |c, ssb, si, te, tp, ks| components::start_rpc(c, ssb, si, te, tp, ks), + ) + } +} diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index dc34a488535e5..a751de9269409 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -22,25 +22,29 @@ mod components; mod chain_spec; pub mod config; +#[macro_use] pub mod chain_ops; pub mod error; use std::io; +use std::marker::PhantomData; use std::net::SocketAddr; use std::collections::HashMap; use std::sync::atomic::{AtomicBool, Ordering}; +use std::ops::DerefMut; use std::time::{Duration, Instant}; use futures::sync::mpsc; use parking_lot::Mutex; -use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT}; +use client::{BlockchainEvents, backend::Backend, runtime_api::BlockT, Client}; use exit_future::Signal; use futures::prelude::*; use futures03::stream::{StreamExt as _, TryStreamExt as _}; use keystore::Store as Keystore; -use network::{NetworkState, NetworkStateInfo}; +use network::{NetworkService, NetworkState, NetworkStateInfo, specialization::NetworkSpecialization}; use log::{log, info, warn, debug, error, Level}; use codec::{Encode, Decode}; +use primitives::{Blake2Hasher, H256}; use sr_primitives::generic::BlockId; use sr_primitives::traits::{Header, NumberFor, SaturatedConversion}; use substrate_executor::NativeExecutor; @@ -48,6 +52,7 @@ use sysinfo::{get_current_pid, ProcessExt, System, SystemExt}; use tel::{telemetry, SUBSTRATE_INFO}; pub use self::error::Error; +pub use self::factory::{ServiceBuilder, ServiceBuilderExport, ServiceBuilderImport, ServiceBuilderRevert}; pub use config::{Configuration, Roles, PruningMode}; pub use chain_spec::{ChainSpec, Properties}; pub use transaction_pool::txpool::{ @@ -59,7 +64,7 @@ pub use components::{ ServiceFactory, FullBackend, FullExecutor, LightBackend, LightExecutor, Components, PoolApi, ComponentClient, ComponentOffchainStorage, ComponentBlock, FullClient, LightClient, FullComponents, LightComponents, - CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock, + CodeExecutor, NetworkService as ComponentNetworkService, FactoryChainSpec, FactoryBlock, FactoryFullConfiguration, RuntimeGenesis, FactoryGenesis, ComponentExHash, ComponentExtrinsic, FactoryExtrinsic, InitialSessionKeys, }; @@ -75,14 +80,32 @@ const DEFAULT_PROTOCOL_ID: &str = "sup"; /// Substrate service. pub struct Service { - client: Arc>, - select_chain: Option, - network: Arc>, + inner: NewService< + FactoryFullConfiguration, + ComponentBlock, + ComponentClient, + Components::SelectChain, + NetworkStatus>, + ComponentNetworkService, + TransactionPool, + offchain::OffchainWorkers< + ComponentClient, + ComponentOffchainStorage, + ComponentBlock + >, + >, +} + +/// Substrate service. +pub struct NewService { + client: Arc, + select_chain: Option, + network: Arc, /// Sinks to propagate network status updates. network_status_sinks: Arc>, NetworkState + TNetStatus, NetworkState )>>>>, - transaction_pool: Arc>, + transaction_pool: Arc, /// A future that resolves when the service has exited, this is useful to /// make sure any internally spawned futures stop when the service does. exit: exit_future::Exit, @@ -100,17 +123,14 @@ pub struct Service { /// The elements must then be polled manually. to_poll: Vec + Send>>, /// Configuration of this Service - config: FactoryFullConfiguration, + config: TCfg, rpc_handlers: rpc::RpcHandler, _rpc: Box, _telemetry: Option, _telemetry_on_connect_sinks: Arc>>>, - _offchain_workers: Option, - ComponentOffchainStorage, - ComponentBlock> - >>, + _offchain_workers: Option>, keystore: keystore::KeyStorePtr, + marker: PhantomData, } /// Creates bare client without any networking. @@ -155,50 +175,37 @@ pub struct TelemetryOnConnect { pub telemetry_connection_sinks: TelemetryOnConnectNotifications, } -impl Service { - /// Creates a new service. - pub fn new( - mut config: FactoryFullConfiguration, - ) -> Result { +macro_rules! new_impl { + ( + $block:ty, + $config:ident, + $build_components:expr, + $maintain_transaction_pool:expr, + $offchain_workers:expr, + $start_rpc:expr, + ) => {{ let (signal, exit) = exit_future::signal(); // List of asynchronous tasks to spawn. We collect them, then spawn them all at once. let (to_spawn_tx, to_spawn_rx) = mpsc::unbounded:: + Send>>(); - // Create client - let executor = NativeExecutor::new(config.default_heap_pages); - - let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; - - let (client, on_demand) = Components::build_client(&config, executor, Some(keystore.clone()))?; - let select_chain = Components::build_select_chain(&mut config, client.clone())?; - - let transaction_pool = Arc::new( - Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())? - ); - let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { - imports_external_transactions: !config.roles.is_light(), - pool: transaction_pool.clone(), - client: client.clone(), - }); - - let (import_queue, finality_proof_request_builder) = Components::build_import_queue( - &mut config, - client.clone(), - select_chain.clone(), - Some(transaction_pool.clone()), - )?; + // Create all the components. + let ( + client, + on_demand, + keystore, + select_chain, + import_queue, + finality_proof_request_builder, + finality_proof_provider, + network_protocol, + transaction_pool + ) = $build_components(&mut $config)?; let import_queue = Box::new(import_queue); - let finality_proof_provider = Components::build_finality_proof_provider(client.clone())?; let chain_info = client.info().chain; - Components::RuntimeServices::generate_initial_session_keys( - client.clone(), - config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), - )?; - - let version = config.full_version(); + let version = $config.full_version(); info!("Highest known block at #{}", chain_info.best_number); telemetry!( SUBSTRATE_INFO; @@ -207,10 +214,14 @@ impl Service { "best" => ?chain_info.best_hash ); - let network_protocol = ::build_network_protocol(&config)?; + let transaction_pool_adapter = Arc::new(TransactionPoolAdapter { + imports_external_transactions: !$config.roles.is_light(), + pool: transaction_pool.clone(), + client: client.clone(), + }); let protocol_id = { - let protocol_id_full = match config.chain_spec.protocol_id() { + let protocol_id_full = match $config.chain_spec.protocol_id() { Some(pid) => pid, None => { warn!("Using default protocol ID {:?} because none is configured in the \ @@ -223,8 +234,8 @@ impl Service { }; let network_params = network::config::Params { - roles: config.roles, - network_config: config.network.clone(), + roles: $config.roles, + network_config: $config.network.clone(), chain: client.clone(), finality_proof_provider, finality_proof_request_builder, @@ -242,7 +253,7 @@ impl Service { #[allow(deprecated)] let offchain_storage = client.backend().offchain_storage(); - let offchain_workers = match (config.offchain_worker, offchain_storage) { + let offchain_workers = match ($config.offchain_worker, offchain_storage) { (true, Some(db)) => { Some(Arc::new(offchain::OffchainWorkers::new(client.clone(), db))) }, @@ -260,7 +271,7 @@ impl Service { let offchain = offchain_workers.as_ref().map(Arc::downgrade); let to_spawn_tx_ = to_spawn_tx.clone(); let network_state_info: Arc = network.clone(); - let is_validator = config.roles.is_authority(); + let is_validator = $config.roles.is_authority(); let events = client.import_notification_stream() .map(|v| Ok::<_, ()>(v)).compat() @@ -268,7 +279,7 @@ impl Service { let number = *notification.header.number(); if let (Some(txpool), Some(client)) = (txpool.upgrade(), wclient.upgrade()) { - Components::RuntimeServices::maintain_transaction_pool( + $maintain_transaction_pool( &BlockId::hash(notification.hash), &*client, &*txpool, @@ -276,7 +287,7 @@ impl Service { } if let (Some(txpool), Some(offchain)) = (txpool.upgrade(), offchain.as_ref().and_then(|o| o.upgrade())) { - let future = Components::RuntimeServices::offchain_workers( + let future = $offchain_workers( &number, &offchain, &txpool, @@ -320,7 +331,7 @@ impl Service { let client_ = client.clone(); let mut sys = System::new(); let self_pid = get_current_pid().ok(); - let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus>, NetworkState)>(); + let (netstat_tx, netstat_rx) = mpsc::unbounded::<(NetworkStatus<_>, NetworkState)>(); network_status_sinks.lock().push(netstat_tx); let tel_task = netstat_rx.for_each(move |(net_status, network_state)| { let info = client_.info(); @@ -373,12 +384,12 @@ impl Service { let (system_rpc_tx, system_rpc_rx) = futures03::channel::mpsc::unbounded(); let gen_handler = || { let system_info = rpc::apis::system::SystemInfo { - chain_name: config.chain_spec.name().into(), - impl_name: config.impl_name.into(), - impl_version: config.impl_version.into(), - properties: config.chain_spec.properties(), + chain_name: $config.chain_spec.name().into(), + impl_name: $config.impl_name.into(), + impl_version: $config.impl_version.into(), + properties: $config.chain_spec.properties(), }; - Components::RuntimeServices::start_rpc( + $start_rpc( client.clone(), system_rpc_tx.clone(), system_info.clone(), @@ -388,7 +399,7 @@ impl Service { ) }; let rpc_handlers = gen_handler(); - let rpc = start_rpc_servers(&config, gen_handler)?; + let rpc = start_rpc_servers(&$config, gen_handler)?; let _ = to_spawn_tx.unbounded_send(Box::new(build_network_future( network_mut, @@ -404,17 +415,17 @@ impl Service { let telemetry_connection_sinks: Arc>>> = Default::default(); // Telemetry - let telemetry = config.telemetry_endpoints.clone().map(|endpoints| { - let is_authority = config.roles.is_authority(); + let telemetry = $config.telemetry_endpoints.clone().map(|endpoints| { + let is_authority = $config.roles.is_authority(); let network_id = network.local_peer_id().to_base58(); - let name = config.name.clone(); - let impl_name = config.impl_name.to_owned(); + let name = $config.name.clone(); + let impl_name = $config.impl_name.to_owned(); let version = version.clone(); - let chain_name = config.chain_spec.name().to_owned(); + let chain_name = $config.chain_spec.name().to_owned(); let telemetry_connection_sinks_ = telemetry_connection_sinks.clone(); let telemetry = tel::init_telemetry(tel::TelemetryConfig { endpoints, - wasm_external_transport: config.telemetry_external_transport.take(), + wasm_external_transport: $config.telemetry_external_transport.take(), }); let future = telemetry.clone() .map(|ev| Ok::<_, ()>(ev)) @@ -444,7 +455,7 @@ impl Service { telemetry }); - Ok(Service { + Ok(NewService { client, network, network_status_sinks, @@ -456,56 +467,230 @@ impl Service { to_spawn_tx, to_spawn_rx, to_poll: Vec::new(), - config, + $config, rpc_handlers, _rpc: rpc, _telemetry: telemetry, _offchain_workers: offchain_workers, _telemetry_on_connect_sinks: telemetry_connection_sinks.clone(), keystore, + marker: PhantomData::<$block>, }) + }} +} + +mod factory; + +impl Service { + /// Creates a new service. + pub fn new( + mut config: FactoryFullConfiguration, + ) -> Result { + let inner = new_impl!( + ComponentBlock, + config, + |mut config: &mut FactoryFullConfiguration| -> Result<_, error::Error> { + // Create client + let executor = NativeExecutor::new(config.default_heap_pages); + + let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; + + let (client, on_demand) = Components::build_client(&config, executor, Some(keystore.clone()))?; + let select_chain = Components::build_select_chain(&mut config, client.clone())?; + + let transaction_pool = Arc::new( + Components::build_transaction_pool(config.transaction_pool.clone(), client.clone())? + ); + + let (import_queue, finality_proof_request_builder) = Components::build_import_queue( + &mut config, + client.clone(), + select_chain.clone(), + Some(transaction_pool.clone()), + )?; + let finality_proof_provider = Components::build_finality_proof_provider(client.clone())?; + + Components::RuntimeServices::generate_initial_session_keys( + client.clone(), + config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), + )?; + + let network_protocol = ::build_network_protocol(&config)?; + + Ok(( + client, + on_demand, + keystore, + select_chain, + import_queue, + finality_proof_request_builder, + finality_proof_provider, + network_protocol, + transaction_pool + )) + }, + Components::RuntimeServices::maintain_transaction_pool, + Components::RuntimeServices::offchain_workers, + Components::RuntimeServices::start_rpc, + ); + + inner.map(|inner| Service { inner }) } +} + +/// Abstraction over a Substrate service. +pub trait AbstractService: 'static + Future + + Executor + Send>> + Send { + /// Type of block of this chain. + type Block: BlockT; + /// Backend storage for the client. + type Backend: 'static + client::backend::Backend; + /// How to execute calls towards the runtime. + type Executor: 'static + client::CallExecutor + Send + Sync + Clone; + /// API that the runtime provides. + type RuntimeApi: Send + Sync; + /// Configuration struct of the service. + type Config; + /// Chain selection algorithm. + type SelectChain; + /// API of the transaction pool. + type TransactionPoolApi: ChainApi; + /// Network specialization. + type NetworkSpecialization: NetworkSpecialization; + + /// Get event stream for telemetry connection established events. + fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications; + + /// Returns the configuration passed on construction. + fn config(&self) -> &Self::Config; + + /// Returns the configuration passed on construction. + fn config_mut(&mut self) -> &mut Self::Config; + + /// return a shared instance of Telemetry (if enabled) + fn telemetry(&self) -> Option; + + /// Spawns a task in the background that runs the future passed as parameter. + fn spawn_task(&self, task: impl Future + Send + 'static); + + /// Spawns a task in the background that runs the future passed as + /// parameter. The given task is considered essential, i.e. if it errors we + /// trigger a service exit. + fn spawn_essential_task(&self, task: impl Future + Send + 'static); + + /// Returns a handle for spawning tasks. + fn spawn_task_handle(&self) -> SpawnTaskHandle; + + /// Returns the keystore that stores keys. + fn keystore(&self) -> keystore::KeyStorePtr; + + /// Starts an RPC query. + /// + /// The query is passed as a string and must be a JSON text similar to what an HTTP client + /// would for example send. + /// + /// Returns a `Future` that contains the optional response. + /// + /// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to + /// send back spontaneous events. + fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box, Error = ()> + Send>; + + /// Get shared client instance. + fn client(&self) -> Arc>; + + /// Get clone of select chain. + fn select_chain(&self) -> Option; + + /// Get shared network instance. + fn network(&self) -> Arc>; + + /// Returns a receiver that periodically receives a status of the network. + fn network_status(&self) -> mpsc::UnboundedReceiver<(NetworkStatus, NetworkState)>; + + /// Get shared transaction pool instance. + fn transaction_pool(&self) -> Arc>; - /// Returns a reference to the config passed at initialization. - pub fn config(&self) -> &FactoryFullConfiguration { + /// Get a handle to a future that will resolve on exit. + fn on_exit(&self) -> ::exit_future::Exit; +} + +impl Deref for Service +where FactoryFullConfiguration: Send { + type Target = NewService< + FactoryFullConfiguration, + ComponentBlock, + ComponentClient, + Components::SelectChain, + NetworkStatus>, + ComponentNetworkService, + TransactionPool, + offchain::OffchainWorkers< + ComponentClient, + ComponentOffchainStorage, + ComponentBlock + >, + >; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for Service +where FactoryFullConfiguration: Send { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl AbstractService for + NewService, TSc, NetworkStatus, NetworkService, TransactionPool, TOc> +where TCfg: 'static + Send, + TBl: BlockT, + TBackend: 'static + client::backend::Backend, + TExec: 'static + client::CallExecutor + Send + Sync + Clone, + TRtApi: 'static + Send + Sync, + TSc: 'static + Clone + Send, + TExPoolApi: 'static + ChainApi, + TOc: 'static + Send + Sync, + TNetSpec: NetworkSpecialization, +{ + type Block = TBl; + type Backend = TBackend; + type Executor = TExec; + type RuntimeApi = TRtApi; + type Config = TCfg; + type SelectChain = TSc; + type TransactionPoolApi = TExPoolApi; + type NetworkSpecialization = TNetSpec; + + fn config(&self) -> &Self::Config { &self.config } - /// Returns a reference to the config passed at initialization. - /// - /// > **Note**: This method is currently necessary because we extract some elements from the - /// > configuration at the end of the service initialization. It is intended to be - /// > removed. - pub fn config_mut(&mut self) -> &mut FactoryFullConfiguration { + fn config_mut(&mut self) -> &mut Self::Config { &mut self.config } - /// Get event stream for telemetry connection established events. - pub fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications { + fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications { let (sink, stream) = mpsc::unbounded(); self._telemetry_on_connect_sinks.lock().push(sink); stream } - /// Return a shared instance of Telemetry (if enabled) - pub fn telemetry(&self) -> Option { + fn telemetry(&self) -> Option { self._telemetry.as_ref().map(|t| t.clone()) } - /// Returns the keystore instance. - pub fn keystore(&self) -> keystore::KeyStorePtr { + fn keystore(&self) -> keystore::KeyStorePtr { self.keystore.clone() } - /// Spawns a task in the background that runs the future passed as parameter. - pub fn spawn_task(&self, task: impl Future + Send + 'static) { + fn spawn_task(&self, task: impl Future + Send + 'static) { let _ = self.to_spawn_tx.unbounded_send(Box::new(task)); } - /// Spawns a task in the background that runs the future passed as - /// parameter. The given task is considered essential, i.e. if it errors we - /// trigger a service exit. - pub fn spawn_essential_task(&self, task: impl Future + Send + 'static) { + fn spawn_essential_task(&self, task: impl Future + Send + 'static) { let essential_failed = self.essential_failed.clone(); let essential_task = Box::new(task.map_err(move |_| { error!("Essential task failed. Shutting down service."); @@ -515,62 +700,45 @@ impl Service { let _ = self.to_spawn_tx.unbounded_send(essential_task); } - /// Returns a handle for spawning tasks. - pub fn spawn_task_handle(&self) -> SpawnTaskHandle { + fn spawn_task_handle(&self) -> SpawnTaskHandle { SpawnTaskHandle { sender: self.to_spawn_tx.clone(), } } - /// Starts an RPC query. - /// - /// The query is passed as a string and must be a JSON text similar to what an HTTP client - /// would for example send. - /// - /// Returns a `Future` that contains the optional response. - /// - /// If the request subscribes you to events, the `Sender` in the `RpcSession` object is used to - /// send back spontaneous events. - pub fn rpc_query(&self, mem: &RpcSession, request: &str) - -> impl Future, Error = ()> - { - self.rpc_handlers.handle_request(request, mem.metadata.clone()) + fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box, Error = ()> + Send> { + Box::new(self.rpc_handlers.handle_request(request, mem.metadata.clone())) } - /// Get shared client instance. - pub fn client(&self) -> Arc> { + fn client(&self) -> Arc> { self.client.clone() } - /// Get clone of select chain. - pub fn select_chain(&self) -> Option<::SelectChain> { + fn select_chain(&self) -> Option { self.select_chain.clone() } - /// Get shared network instance. - pub fn network(&self) -> Arc> { + fn network(&self) -> Arc> { self.network.clone() } - /// Returns a receiver that periodically receives a status of the network. - pub fn network_status(&self) -> mpsc::UnboundedReceiver<(NetworkStatus>, NetworkState)> { + fn network_status(&self) -> mpsc::UnboundedReceiver<(NetworkStatus, NetworkState)> { let (sink, stream) = mpsc::unbounded(); self.network_status_sinks.lock().push(sink); stream } - /// Get shared transaction pool instance. - pub fn transaction_pool(&self) -> Arc> { + fn transaction_pool(&self) -> Arc> { self.transaction_pool.clone() } - /// Get a handle to a future that will resolve on exit. - pub fn on_exit(&self) -> ::exit_future::Exit { + fn on_exit(&self) -> ::exit_future::Exit { self.exit.clone() } } -impl Future for Service where Components: components::Components { +impl Future for +NewService { type Item = (); type Error = Error; @@ -601,9 +769,17 @@ impl Future for Service where Components: components::Co } } -impl Executor + Send>> - for Service where Components: components::Components -{ +impl Future for Service where Components: components::Components { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.inner.poll() + } +} + +impl Executor + Send>> for +NewService { fn execute( &self, future: Box + Send> @@ -617,6 +793,91 @@ impl Executor + Send>> } } +impl Executor + Send>> + for Service where Components: components::Components +{ + fn execute( + &self, + future: Box + Send> + ) -> Result<(), futures::future::ExecuteError + Send>>> { + self.inner.execute(future) + } +} + +impl AbstractService for T +where T: 'static + Deref + DerefMut + Future + Send + + Executor + Send>>, + T::Target: AbstractService { + type Block = <::Target as AbstractService>::Block; + type Backend = <::Target as AbstractService>::Backend; + type Executor = <::Target as AbstractService>::Executor; + type RuntimeApi = <::Target as AbstractService>::RuntimeApi; + type Config = <::Target as AbstractService>::Config; + type SelectChain = <::Target as AbstractService>::SelectChain; + type TransactionPoolApi = <::Target as AbstractService>::TransactionPoolApi; + type NetworkSpecialization = <::Target as AbstractService>::NetworkSpecialization; + + fn telemetry_on_connect_stream(&self) -> TelemetryOnConnectNotifications { + (**self).telemetry_on_connect_stream() + } + + fn config(&self) -> &Self::Config { + (**self).config() + } + + fn config_mut(&mut self) -> &mut Self::Config { + (&mut **self).config_mut() + } + + fn telemetry(&self) -> Option { + (**self).telemetry() + } + + fn spawn_task(&self, task: impl Future + Send + 'static) { + (**self).spawn_task(task) + } + + fn spawn_essential_task(&self, task: impl Future + Send + 'static) { + (**self).spawn_essential_task(task) + } + + fn spawn_task_handle(&self) -> SpawnTaskHandle { + (**self).spawn_task_handle() + } + + fn keystore(&self) -> keystore::KeyStorePtr { + (**self).keystore() + } + + fn rpc_query(&self, mem: &RpcSession, request: &str) -> Box, Error = ()> + Send> { + (**self).rpc_query(mem, request) + } + + fn client(&self) -> Arc> { + (**self).client() + } + + fn select_chain(&self) -> Option { + (**self).select_chain() + } + + fn network(&self) -> Arc> { + (**self).network() + } + + fn network_status(&self) -> mpsc::UnboundedReceiver<(NetworkStatus, NetworkState)> { + (**self).network_status() + } + + fn transaction_pool(&self) -> Arc> { + (**self).transaction_pool() + } + + fn on_exit(&self) -> ::exit_future::Exit { + (**self).on_exit() + } +} + /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. @@ -744,7 +1005,8 @@ pub struct NetworkStatus { pub average_upload_per_sec: u64, } -impl Drop for Service where Components: components::Components { +impl Drop for +NewService { fn drop(&mut self) { debug!(target: "service", "Substrate service shutdown"); if let Some(signal) = self.signal.take() { diff --git a/core/service/test/src/lib.rs b/core/service/test/src/lib.rs index 1b3c43dae74bb..1fc3a8dcff099 100644 --- a/core/service/test/src/lib.rs +++ b/core/service/test/src/lib.rs @@ -27,32 +27,31 @@ use tempdir::TempDir; use tokio::{runtime::Runtime, prelude::FutureExt}; use tokio::timer::Interval; use service::{ - ServiceFactory, + AbstractService, + ChainSpec, Configuration, - FactoryFullConfiguration, - FactoryChainSpec, Roles, - FactoryExtrinsic, + Error, }; use network::{multiaddr, Multiaddr}; use network::config::{NetworkConfiguration, TransportConfig, NodeKeyConfig, Secret, NonReservedPeerMode}; -use sr_primitives::generic::BlockId; +use sr_primitives::{generic::BlockId, traits::Block as BlockT}; use consensus::{BlockImportParams, BlockImport}; /// Maximum duration of single wait call. const MAX_WAIT_TIME: Duration = Duration::from_secs(60 * 3); -struct TestNet { +struct TestNet { runtime: Runtime, - authority_nodes: Vec<(usize, SyncService, Multiaddr)>, - full_nodes: Vec<(usize, SyncService, Multiaddr)>, - light_nodes: Vec<(usize, SyncService, Multiaddr)>, - chain_spec: FactoryChainSpec, + authority_nodes: Vec<(usize, SyncService, Multiaddr)>, + full_nodes: Vec<(usize, SyncService, Multiaddr)>, + light_nodes: Vec<(usize, SyncService, Multiaddr)>, + chain_spec: ChainSpec, base_port: u16, nodes: usize, } -/// Wraps around an `Arc>` and implements `Future`. +/// Wraps around an `Arc` and implements `Future`. pub struct SyncService(Arc>); impl SyncService { @@ -82,15 +81,17 @@ impl> Future for SyncService { } } -impl TestNet { +impl TestNet +where F: Send + 'static, L: Send +'static +{ pub fn run_until_all_full( &mut self, full_predicate: FP, light_predicate: LP, ) where - FP: Send + Fn(usize, &SyncService) -> bool + 'static, - LP: Send + Fn(usize, &SyncService) -> bool + 'static, + FP: Send + Fn(usize, &SyncService) -> bool + 'static, + LP: Send + Fn(usize, &SyncService) -> bool + 'static, { let full_nodes = self.full_nodes.clone(); let light_nodes = self.light_nodes.clone(); @@ -125,14 +126,14 @@ impl TestNet { } } -fn node_config ( +fn node_config ( index: usize, - spec: &FactoryChainSpec, + spec: &ChainSpec, role: Roles, key_seed: Option, base_port: u16, root: &TempDir, -) -> FactoryFullConfiguration +) -> Configuration<(), G> { let root = root.path().join(format!("node-{}", index)); @@ -194,18 +195,18 @@ fn node_config ( } } -impl TestNet where - F::FullService: Future, - F::LightService: Future, +impl TestNet where + F: AbstractService, + L: AbstractService, { fn new( temp: &TempDir, - spec: FactoryChainSpec, - full: usize, - light: usize, - authorities: Vec, + spec: ChainSpec, + full: impl Iterator) -> Result>, + light: impl Iterator) -> Result>, + authorities: impl Iterator) -> Result)>, base_port: u16 - ) -> TestNet { + ) -> TestNet { let _ = env_logger::try_init(); fdlimit::raise_fd_limit(); let runtime = Runtime::new().expect("Error creating tokio runtime"); @@ -222,70 +223,76 @@ impl TestNet where net } - fn insert_nodes(&mut self, temp: &TempDir, full: usize, light: usize, authorities: Vec) { - let mut nodes = self.nodes; - let base_port = self.base_port; - let spec = &self.chain_spec; + fn insert_nodes( + &mut self, + temp: &TempDir, + full: impl Iterator) -> Result>, + light: impl Iterator) -> Result>, + authorities: impl Iterator) -> Result)> + ) { let executor = self.runtime.executor(); - self.authority_nodes.extend(authorities.iter().enumerate().map(|(index, key)| { - let node_config = node_config::( - index, - &spec, + + for (key, authority) in authorities { + let node_config = node_config( + self.nodes, + &self.chain_spec, Roles::AUTHORITY, - Some(key.clone()), - base_port, + Some(key), + self.base_port, &temp, ); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service")); + let service = SyncService::from(authority(node_config).expect("Error creating test node service")); executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); - ((index + nodes), service, addr) - })); - nodes += authorities.len(); + self.authority_nodes.push((self.nodes, service, addr)); + self.nodes += 1; + } - self.full_nodes.extend((nodes..nodes + full).map(|index| { - let node_config = node_config::(index, &spec, Roles::FULL, None, base_port, &temp); + for full in full { + let node_config = node_config(self.nodes, &self.chain_spec, Roles::FULL, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = SyncService::from(F::new_full(node_config).expect("Error creating test node service")); + let service = SyncService::from(full(node_config).expect("Error creating test node service")); executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); - (index, service, addr) - })); - nodes += full; + self.full_nodes.push((self.nodes, service, addr)); + self.nodes += 1; + } - self.light_nodes.extend((nodes..nodes + light).map(|index| { - let node_config = node_config::(index, &spec, Roles::LIGHT, None, base_port, &temp); + for light in light { + let node_config = node_config(self.nodes, &self.chain_spec, Roles::LIGHT, None, self.base_port, &temp); let addr = node_config.network.listen_addresses.iter().next().unwrap().clone(); - let service = SyncService::from(F::new_light(node_config).expect("Error creating test node service")); + let service = SyncService::from(light(node_config).expect("Error creating test node service")); executor.spawn(service.clone().map_err(|_| ())); let addr = addr.with(multiaddr::Protocol::P2p(service.get().network().local_peer_id().into())); - (index, service, addr) - })); - nodes += light; - - self.nodes = nodes; + self.light_nodes.push((self.nodes, service, addr)); + self.nodes += 1; + } } } -pub fn connectivity(spec: FactoryChainSpec) where - F::FullService: Future, - F::LightService: Future, +pub fn connectivity(spec: ChainSpec, full_builder: Fb, light_builder: Lb) where + Fb: Fn(Configuration<(), G>) -> Result, + F: AbstractService, + Lb: Fn(Configuration<(), G>) -> Result, + L: AbstractService, { const NUM_FULL_NODES: usize = 5; const NUM_LIGHT_NODES: usize = 5; { let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir"); let runtime = { - let mut network = TestNet::::new( + let mut network = TestNet::new( &temp, spec.clone(), - NUM_FULL_NODES, - NUM_LIGHT_NODES, - vec![], + (0..NUM_FULL_NODES).map(|_| { |cfg| full_builder(cfg) }), + (0..NUM_LIGHT_NODES).map(|_| { |cfg| light_builder(cfg) }), + // Note: this iterator is empty but we can't just use `iter::empty()`, otherwise + // the type of the closure cannot be inferred. + (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg) })), 30400, ); info!("Checking star topology"); @@ -311,12 +318,14 @@ pub fn connectivity(spec: FactoryChainSpec) where { let temp = TempDir::new("substrate-connectivity-test").expect("Error creating test dir"); { - let mut network = TestNet::::new( + let mut network = TestNet::new( &temp, spec, - NUM_FULL_NODES, - NUM_LIGHT_NODES, - vec![], + (0..NUM_FULL_NODES).map(|_| { |cfg| full_builder(cfg) }), + (0..NUM_LIGHT_NODES).map(|_| { |cfg| light_builder(cfg) }), + // Note: this iterator is empty but we can't just use `iter::empty()`, otherwise + // the type of the closure cannot be inferred. + (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg) })), 30400, ); info!("Checking linked topology"); @@ -345,24 +354,27 @@ pub fn connectivity(spec: FactoryChainSpec) where } } -pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrinsic_factory: E) where - F: ServiceFactory, - F::FullService: Future, - F::LightService: Future, - B: FnMut(&SyncService) -> BlockImportParams, - E: FnMut(&SyncService) -> FactoryExtrinsic, +pub fn sync(spec: ChainSpec, full_builder: Fb, light_builder: Lb, mut block_factory: B, mut extrinsic_factory: E) where + Fb: Fn(Configuration<(), G>) -> Result, + F: AbstractService, + Lb: Fn(Configuration<(), G>) -> Result, + L: AbstractService, + B: FnMut(&F) -> BlockImportParams, + E: FnMut(&F) -> ::Extrinsic, { const NUM_FULL_NODES: usize = 10; // FIXME: BABE light client support is currently not working. const NUM_LIGHT_NODES: usize = 0; const NUM_BLOCKS: usize = 512; let temp = TempDir::new("substrate-sync-test").expect("Error creating test dir"); - let mut network = TestNet::::new( + let mut network = TestNet::new( &temp, spec.clone(), - NUM_FULL_NODES, - NUM_LIGHT_NODES, - vec![], + (0..NUM_FULL_NODES).map(|_| { |cfg| full_builder(cfg) }), + (0..NUM_LIGHT_NODES).map(|_| { |cfg| light_builder(cfg) }), + // Note: this iterator is empty but we can't just use `iter::empty()`, otherwise + // the type of the closure cannot be inferred. + (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg) })), 30500, ); info!("Checking block sync"); @@ -373,7 +385,7 @@ pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrin if i % 128 == 0 { info!("Generating #{}", i); } - let import_data = block_factory(&first_service); + let import_data = block_factory(&first_service.get()); client.import_block(import_data, HashMap::new()).expect("Error importing test block"); } network.full_nodes[0].2.clone() @@ -396,7 +408,7 @@ pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrin info!("Checking extrinsic propagation"); let first_service = network.full_nodes[0].1.clone(); let best_block = BlockId::number(first_service.get().client().info().chain.best_number); - let extrinsic = extrinsic_factory(&first_service); + let extrinsic = extrinsic_factory(&first_service.get()); first_service.get().transaction_pool().submit_one(&best_block, extrinsic).unwrap(); network.run_until_all_full( |_index, service| service.get().transaction_pool().ready().count() == 1, @@ -404,21 +416,22 @@ pub fn sync(spec: FactoryChainSpec, mut block_factory: B, mut extrin ); } -pub fn consensus(spec: FactoryChainSpec, authorities: Vec) where - F: ServiceFactory, - F::FullService: Future, - F::LightService: Future, +pub fn consensus(spec: ChainSpec, full_builder: Fb, light_builder: Lb, authorities: impl IntoIterator) where + Fb: Fn(Configuration<(), G>) -> Result, + F: AbstractService, + Lb: Fn(Configuration<(), G>) -> Result, + L: AbstractService, { const NUM_FULL_NODES: usize = 10; const NUM_LIGHT_NODES: usize = 0; const NUM_BLOCKS: usize = 10; // 10 * 2 sec block production time = ~20 seconds let temp = TempDir::new("substrate-conensus-test").expect("Error creating test dir"); - let mut network = TestNet::::new( + let mut network = TestNet::new( &temp, spec.clone(), - NUM_FULL_NODES / 2, - NUM_LIGHT_NODES / 2, - authorities, + (0..NUM_FULL_NODES / 2).map(|_| { |cfg| full_builder(cfg) }), + (0..NUM_LIGHT_NODES / 2).map(|_| { |cfg| light_builder(cfg) }), + authorities.into_iter().map(|key| (key, { |cfg| full_builder(cfg) })), 30600, ); @@ -441,7 +454,14 @@ pub fn consensus(spec: FactoryChainSpec, authorities: Vec) where ); info!("Adding more peers"); - network.insert_nodes(&temp, NUM_FULL_NODES / 2, NUM_LIGHT_NODES / 2, vec![]); + network.insert_nodes( + &temp, + (0..NUM_FULL_NODES / 2).map(|_| { |cfg| full_builder(cfg) }), + (0..NUM_LIGHT_NODES / 2).map(|_| { |cfg| light_builder(cfg) }), + // Note: this iterator is empty but we can't just use `iter::empty()`, otherwise + // the type of the closure cannot be inferred. + (0..0).map(|_| (String::new(), { |cfg| full_builder(cfg) })), + ); for (_, service, _) in network.full_nodes.iter() { service.get().network().add_reserved_peer(first_address.to_string()).expect("Error adding reserved peer"); } diff --git a/node-template/src/cli.rs b/node-template/src/cli.rs index 4d672491c18e6..d60b59a8c5533 100644 --- a/node-template/src/cli.rs +++ b/node-template/src/cli.rs @@ -4,9 +4,8 @@ use std::cell::RefCell; use tokio::runtime::Runtime; pub use substrate_cli::{VersionInfo, IntoExit, error}; use substrate_cli::{informant, parse_and_prepare, ParseAndPrepare, NoCustom}; -use substrate_service::{ServiceFactory, Roles as ServiceRoles}; +use substrate_service::{AbstractService, Roles as ServiceRoles}; use crate::chain_spec; -use std::ops::Deref; use log::info; /// Parse command line arguments into service configuration. @@ -16,7 +15,7 @@ pub fn run(args: I, exit: E, version: VersionInfo) -> error::Result<()> E: IntoExit, { match parse_and_prepare::(&version, "substrate-node", args) { - ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, |exit, _cli_args, _custom_args, config| { + ParseAndPrepare::Run(cmd) => cmd.run::<(), _, _, _, _>(load_spec, exit, |exit, _cli_args, _custom_args, config| { info!("{}", version.name); info!(" version {}", config.full_version()); info!(" by {}, 2017, 2018", version.author); @@ -27,21 +26,21 @@ pub fn run(args: I, exit: E, version: VersionInfo) -> error::Result<()> match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, - service::Factory::new_light(config).map_err(|e| format!("{:?}", e))?, + service::new_light(config).map_err(|e| format!("{:?}", e))?, exit ), _ => run_until_exit( runtime, - service::Factory::new_full(config).map_err(|e| format!("{:?}", e))?, + service::new_full(config).map_err(|e| format!("{:?}", e))?, exit ), }.map_err(|e| format!("{:?}", e)) }), ParseAndPrepare::BuildSpec(cmd) => cmd.run(load_spec), - ParseAndPrepare::ExportBlocks(cmd) => cmd.run::(load_spec, exit), - ParseAndPrepare::ImportBlocks(cmd) => cmd.run::(load_spec, exit), + ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config| Ok(new_full_start!(config).0), load_spec, exit), + ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config| Ok(new_full_start!(config).0), load_spec, exit), ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec), - ParseAndPrepare::RevertChain(cmd) => cmd.run::(load_spec), + ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _>(|config| Ok(new_full_start!(config).0), load_spec), ParseAndPrepare::CustomCommand(_) => Ok(()) }?; @@ -55,14 +54,13 @@ fn load_spec(id: &str) -> Result, String> { }) } -fn run_until_exit( +fn run_until_exit( mut runtime: Runtime, service: T, e: E, -) -> error::Result<()> where - T: Deref>, - T: Future + Send + 'static, - C: substrate_service::Components, +) -> error::Result<()> +where + T: AbstractService, E: IntoExit, { let (exit_send, exit) = exit_future::signal(); diff --git a/node-template/src/main.rs b/node-template/src/main.rs index 18e9638833fd2..024efcc7db541 100644 --- a/node-template/src/main.rs +++ b/node-template/src/main.rs @@ -4,6 +4,7 @@ #![warn(unused_extern_crates)] mod chain_spec; +#[macro_use] mod service; mod cli; diff --git a/node-template/src/service.rs b/node-template/src/service.rs index 7f2c80c48b2b8..2a627a0aaf987 100644 --- a/node-template/src/service.rs +++ b/node-template/src/service.rs @@ -2,22 +2,18 @@ #![warn(unused_extern_crates)] -use std::sync::Arc; use transaction_pool::{self, txpool::{Pool as TransactionPool}}; -use node_template_runtime::{self, GenesisConfig, opaque::Block, RuntimeApi, WASM_BINARY}; +use node_template_runtime::{self, GenesisConfig, opaque::Block, WASM_BINARY}; use substrate_service::{ - FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, - FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, - error::{Error as ServiceError}, + Configuration, error::{Error as ServiceError}, AbstractService, }; use basic_authorship::ProposerFactory; -use consensus::{import_queue, start_aura, AuraImportQueue, SlotDuration}; +use consensus::{import_queue, start_aura, SlotDuration}; use futures::prelude::*; -use substrate_client::{self as client, LongestChain}; +use substrate_client as client; use inherents::InherentDataProviders; -use network::{config::DummyFinalityProofRequestBuilder, construct_simple_protocol}; +use network::construct_simple_protocol; use substrate_executor::native_executor_instance; -use substrate_service::construct_service_factory; use aura_primitives::sr25519::AuthorityPair as AuraAuthorityPair; pub use substrate_executor::NativeExecutor; @@ -29,113 +25,105 @@ native_executor_instance!( WASM_BINARY ); -#[derive(Default)] -pub struct NodeConfig { - inherent_data_providers: InherentDataProviders, -} - construct_simple_protocol! { /// Demo protocol attachment for substrate. pub struct NodeProtocol where Block = Block { } } -construct_service_factory! { - struct Factory { - Block = Block, - RuntimeApi = RuntimeApi, - NetworkProtocol = NodeProtocol { |config| Ok(NodeProtocol::new()) }, - RuntimeDispatch = Executor, - FullTransactionPoolApi = transaction_pool::ChainApi< - client::Client, FullExecutor, Block, RuntimeApi>, - Block - > { - |config, client| Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) - }, - LightTransactionPoolApi = transaction_pool::ChainApi< - client::Client, LightExecutor, Block, RuntimeApi>, - Block - > { - |config, client| Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) - }, - Genesis = GenesisConfig, - Configuration = NodeConfig, - FullService = FullComponents - { |config: FactoryFullConfiguration| - FullComponents::::new(config) - }, - AuthoritySetup = { - |service: Self::FullService| { - if service.config().roles.is_authority() { - let proposer = ProposerFactory { - client: service.client(), - transaction_pool: service.transaction_pool(), - }; - let client = service.client(); - let select_chain = service.select_chain() - .ok_or_else(|| ServiceError::SelectChainRequired)?; - let aura = start_aura::<_, _, _, _, _, AuraAuthorityPair, _, _, _>( - SlotDuration::get_or_compute(&*client)?, - client.clone(), - select_chain, - client, - proposer, - service.network(), - service.config().custom.inherent_data_providers.clone(), - service.config().force_authoring, - Some(service.keystore()), - )?; - service.spawn_task(Box::new(aura.select(service.on_exit()).then(|_| Ok(())))); - } - Ok(service) - } - }, - LightService = LightComponents - { |config| >::new(config) }, - FullImportQueue = AuraImportQueue< - Self::Block, - > - { | - config: &mut FactoryFullConfiguration, - client: Arc>, - _select_chain: Self::SelectChain, - transaction_pool: Option>>, - | { - import_queue::<_, _, aura_primitives::sr25519::AuthorityPair, _>( - SlotDuration::get_or_compute(&*client)?, - Box::new(client.clone()), - None, - None, - client, - config.custom.inherent_data_providers.clone(), - transaction_pool, - ).map_err(Into::into) - } - }, - LightImportQueue = AuraImportQueue< - Self::Block, - > - { |config: &mut FactoryFullConfiguration, client: Arc>| { - let fprb = Box::new(DummyFinalityProofRequestBuilder::default()) as Box<_>; - import_queue::<_, _, AuraAuthorityPair, TransactionPool>( - SlotDuration::get_or_compute(&*client)?, - Box::new(client.clone()), - None, - None, - client, - config.custom.inherent_data_providers.clone(), - None, - ).map(|q| (q, fprb)).map_err(Into::into) - } - }, - SelectChain = LongestChain, Self::Block> - { |config: &FactoryFullConfiguration, client: Arc>| { +/// Starts a `ServiceBuilder` for a full service. +/// +/// Use this macro if you don't actually need the full service, but just the builder in order to +/// be able to perform chain operations. +macro_rules! new_full_start { + ($config:expr) => {{ + let inherent_data_providers = inherents::InherentDataProviders::new(); + let builder = substrate_service::ServiceBuilder::new_full::< + node_template_runtime::opaque::Block, node_template_runtime::RuntimeApi, crate::service::Executor + >($config)? + .with_select_chain(|_config, client| { #[allow(deprecated)] - Ok(LongestChain::new(client.backend().clone())) - } - }, - FinalityProofProvider = { |_client: Arc>| { - Ok(None) - }}, + Ok(substrate_client::LongestChain::new(client.backend().clone())) + })? + .with_transaction_pool(|config, client| + Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::ChainApi::new(client))) + )? + .with_import_queue(|_config, client, _select_chain, transaction_pool| { + Ok(consensus::import_queue::<_, _, aura_primitives::sr25519::AuthorityPair, _>( + consensus::SlotDuration::get_or_compute(&*client)?, + Box::new(client.clone()), + None, + None, + client, + inherent_data_providers.clone(), + Some(transaction_pool), + )?) + })?; + (builder, inherent_data_providers) + }} +} + +/// Builds a new service for a full client. +pub fn new_full(config: Configuration) +-> Result { + + let (builder, inherent_data_providers) = new_full_start!(config); + let service = builder + .with_opt_finality_proof_provider(|_| Ok(None))? + .with_network_protocol(|_| Ok(NodeProtocol::new()))? + .build()?; + + if service.config().roles.is_authority() { + let proposer = ProposerFactory { + client: service.client(), + transaction_pool: service.transaction_pool(), + }; + let client = service.client(); + let select_chain = service.select_chain() + .ok_or_else(|| ServiceError::SelectChainRequired)?; + let aura = start_aura::<_, _, _, _, _, AuraAuthorityPair, _, _, _>( + SlotDuration::get_or_compute(&*client)?, + client.clone(), + select_chain, + client, + proposer, + service.network(), + inherent_data_providers.clone(), + service.config().force_authoring, + Some(service.keystore()), + )?; + service.spawn_task(Box::new(aura.select(service.on_exit()).then(|_| Ok(())))); } + + Ok(service) +} + +/// Builds a new service for a light client. +pub fn new_light(config: Configuration) +-> Result { + let inherent_data_providers = InherentDataProviders::new(); + Ok(substrate_service::ServiceBuilder::new_light::< + node_template_runtime::opaque::Block, node_template_runtime::RuntimeApi, Executor + >(config)? + .with_select_chain(|_config, client| { + #[allow(deprecated)] + Ok(client::LongestChain::new(client.backend().clone())) + })? + .with_transaction_pool(|config, client| + Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) + )? + .with_import_queue(|_config, client, _select_chain, transaction_pool| { + Ok(import_queue::<_, _, aura_primitives::sr25519::AuthorityPair, _>( + SlotDuration::get_or_compute(&*client)?, + Box::new(client.clone()), + None, + None, + client, + inherent_data_providers.clone(), + Some(transaction_pool) + )?) + })? + .with_opt_finality_proof_provider(|_| Ok(None))? + .with_network_protocol(|_| Ok(NodeProtocol::new()))? + .build()?) } diff --git a/node/cli/src/chain_spec.rs b/node/cli/src/chain_spec.rs index acedb5c14bb42..b0ae48afcd835 100644 --- a/node/cli/src/chain_spec.rs +++ b/node/cli/src/chain_spec.rs @@ -348,8 +348,8 @@ pub fn local_testnet_config() -> ChainSpec { #[cfg(test)] pub(crate) mod tests { use super::*; + use crate::service::{new_full, new_light}; use service_test; - use crate::service::Factory; fn local_testnet_genesis_instant_single() -> GenesisConfig { testnet_genesis( @@ -393,6 +393,10 @@ pub(crate) mod tests { #[test] #[ignore] fn test_connectivity() { - service_test::connectivity::(integration_test_config_with_two_authorities()); + service_test::connectivity( + integration_test_config_with_two_authorities(), + |config| new_full(config), + |config| new_light(config), + ); } } diff --git a/node/cli/src/lib.rs b/node/cli/src/lib.rs index 4e3cfa7f01092..5008061c86bc2 100644 --- a/node/cli/src/lib.rs +++ b/node/cli/src/lib.rs @@ -21,14 +21,14 @@ pub use cli::error; pub mod chain_spec; +#[macro_use] mod service; mod factory_impl; use tokio::prelude::Future; use tokio::runtime::{Builder as RuntimeBuilder, Runtime}; pub use cli::{VersionInfo, IntoExit, NoCustom, SharedParams, ExecutionStrategyParam}; -use substrate_service::{ServiceFactory, Roles as ServiceRoles}; -use std::ops::Deref; +use substrate_service::{AbstractService, Roles as ServiceRoles}; use log::info; use structopt::{StructOpt, clap::App}; use cli::{AugmentClap, GetLogFilter, parse_and_prepare, ParseAndPrepare}; @@ -159,7 +159,7 @@ pub fn run(args: I, exit: E, version: cli::VersionInfo) -> error::Resul E: IntoExit, { match parse_and_prepare::(&version, "substrate-node", args) { - ParseAndPrepare::Run(cmd) => cmd.run(load_spec, exit, |exit, _cli_args, _custom_args, config| { + ParseAndPrepare::Run(cmd) => cmd.run::<(), _, _, _, _>(load_spec, exit, |exit, _cli_args, _custom_args, config| { info!("{}", version.name); info!(" version {}", config.full_version()); info!(" by Parity Technologies, 2017-2019"); @@ -171,23 +171,23 @@ pub fn run(args: I, exit: E, version: cli::VersionInfo) -> error::Resul match config.roles { ServiceRoles::LIGHT => run_until_exit( runtime, - service::Factory::new_light(config).map_err(|e| format!("{:?}", e))?, + service::new_light(config).map_err(|e| format!("{:?}", e))?, exit ), _ => run_until_exit( runtime, - service::Factory::new_full(config).map_err(|e| format!("{:?}", e))?, + service::new_full(config).map_err(|e| format!("{:?}", e))?, exit ), }.map_err(|e| format!("{:?}", e)) }), ParseAndPrepare::BuildSpec(cmd) => cmd.run(load_spec), - ParseAndPrepare::ExportBlocks(cmd) => cmd.run::(load_spec, exit), - ParseAndPrepare::ImportBlocks(cmd) => cmd.run::(load_spec, exit), + ParseAndPrepare::ExportBlocks(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config| Ok(new_full_start!(config).0), load_spec, exit), + ParseAndPrepare::ImportBlocks(cmd) => cmd.run_with_builder::<(), _, _, _, _, _>(|config| Ok(new_full_start!(config).0), load_spec, exit), ParseAndPrepare::PurgeChain(cmd) => cmd.run(load_spec), - ParseAndPrepare::RevertChain(cmd) => cmd.run::(load_spec), + ParseAndPrepare::RevertChain(cmd) => cmd.run_with_builder::<(), _, _, _, _>(|config| Ok(new_full_start!(config).0), load_spec), ParseAndPrepare::CustomCommand(CustomSubcommands::Factory(cli_args)) => { - let mut config = cli::create_config_with_db_path( + let mut config = cli::create_config_with_db_path::<(), _, _>( load_spec, &cli_args.shared_params, &version, @@ -209,9 +209,13 @@ pub fn run(args: I, exit: E, version: cli::VersionInfo) -> error::Resul cli_args.num, cli_args.rounds, ); - transaction_factory::factory::>( + + let service_builder = new_full_start!(config).0; + transaction_factory::factory::, _, _, _, _, _>( factory_state, - config, + service_builder.client(), + service_builder.select_chain() + .expect("The select_chain is always initialized by new_full_start!; QED") ).map_err(|e| format!("Error in transaction factory: {}", e))?; Ok(()) @@ -219,14 +223,13 @@ pub fn run(args: I, exit: E, version: cli::VersionInfo) -> error::Resul } } -fn run_until_exit( +fn run_until_exit( mut runtime: Runtime, service: T, e: E, -) -> error::Result<()> where - T: Deref>, - T: Future + Send + 'static, - C: substrate_service::Components, +) -> error::Result<()> +where + T: AbstractService, E: IntoExit, { let (exit_send, exit) = exit_future::signal(); diff --git a/node/cli/src/service.rs b/node/cli/src/service.rs index 90c76eda84cbe..75ff1cb712006 100644 --- a/node/cli/src/service.rs +++ b/node/cli/src/service.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use std::time::Duration; -use babe::{import_queue, start_babe, BabeImportQueue, Config}; +use babe::{import_queue, start_babe, Config}; use client::{self, LongestChain}; use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use node_executor; @@ -29,14 +29,11 @@ use futures::prelude::*; use node_primitives::Block; use node_runtime::{GenesisConfig, RuntimeApi}; use substrate_service::{ - FactoryFullConfiguration, LightComponents, FullComponents, FullBackend, - FullClient, LightClient, LightBackend, FullExecutor, LightExecutor, - error::{Error as ServiceError}, + AbstractService, ServiceBuilder, config::Configuration, error::{Error as ServiceError}, }; use transaction_pool::{self, txpool::{Pool as TransactionPool}}; use inherents::InherentDataProviders; use network::construct_simple_protocol; -use substrate_service::construct_service_factory; use substrate_service::TelemetryOnConnect; construct_simple_protocol! { @@ -44,243 +41,202 @@ construct_simple_protocol! { pub struct NodeProtocol where Block = Block { } } -type BabeBlockImportForService = babe::BabeBlockImport< - FullBackend, - FullExecutor, - ::Block, - grandpa::BlockImportForService, - ::RuntimeApi, - client::Client< - FullBackend, - FullExecutor, - ::Block, - ::RuntimeApi - >, ->; - -/// Node specific configuration -pub struct NodeConfig { - /// GRANDPA and BABE connection to import block. - // FIXME #1134 rather than putting this on the config, let's have an actual intermediate setup state - pub import_setup: Option<( - BabeBlockImportForService, - grandpa::LinkHalfForService, - babe::BabeLink, - )>, - /// Tasks that were created by previous setup steps and should be spawned. - pub tasks_to_spawn: Option + Send>>>, - inherent_data_providers: InherentDataProviders, -} - -impl Default for NodeConfig where F: substrate_service::ServiceFactory { - fn default() -> NodeConfig { - NodeConfig { - import_setup: None, - inherent_data_providers: InherentDataProviders::new(), - tasks_to_spawn: None, - } - } -} - -construct_service_factory! { - struct Factory { - Block = Block, - RuntimeApi = RuntimeApi, - NetworkProtocol = NodeProtocol { |config| Ok(NodeProtocol::new()) }, - RuntimeDispatch = node_executor::Executor, - FullTransactionPoolApi = - transaction_pool::ChainApi< - client::Client, FullExecutor, Block, RuntimeApi>, - Block - > { - |config, client| - Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) - }, - LightTransactionPoolApi = - transaction_pool::ChainApi< - client::Client, LightExecutor, Block, RuntimeApi>, - Block - > { - |config, client| - Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) - }, - Genesis = GenesisConfig, - Configuration = NodeConfig, - FullService = FullComponents { - |config: FactoryFullConfiguration| FullComponents::::new(config) - }, - AuthoritySetup = { - |mut service: Self::FullService| { - let (block_import, link_half, babe_link) = - service.config_mut().custom.import_setup.take() - .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); - - // spawn any futures that were created in the previous setup steps - if let Some(tasks) = service.config_mut().custom.tasks_to_spawn.take() { - for task in tasks { - service.spawn_task( - task.select(service.on_exit()) - .map(|_| ()) - .map_err(|_| ()) - ); - } - } - - if service.config().roles.is_authority() { - let proposer = substrate_basic_authorship::ProposerFactory { - client: service.client(), - transaction_pool: service.transaction_pool(), - }; - - let client = service.client(); - let select_chain = service.select_chain() - .ok_or(ServiceError::SelectChainRequired)?; - - let babe_config = babe::BabeParams { - config: Config::get_or_compute(&*client)?, - keystore: service.keystore(), - client, - select_chain, - block_import, - env: proposer, - sync_oracle: service.network(), - inherent_data_providers: service.config() - .custom.inherent_data_providers.clone(), - force_authoring: service.config().force_authoring, - time_source: babe_link, - }; - - let babe = start_babe(babe_config)?; - let select = babe.select(service.on_exit()).then(|_| Ok(())); - - // the BABE authoring task is considered infallible, i.e. if it - // fails we take down the service with it. - service.spawn_essential_task(select); - } - - let config = grandpa::Config { - // FIXME #1578 make this available through chainspec - gossip_duration: Duration::from_millis(333), - justification_period: 4096, - name: Some(service.config().name.clone()), - keystore: Some(service.keystore()), - }; - - match (service.config().roles.is_authority(), service.config().disable_grandpa) { - (false, false) => { - // start the lightweight GRANDPA observer - service.spawn_task(Box::new(grandpa::run_grandpa_observer( - config, - link_half, - service.network(), - service.on_exit(), - )?)); - }, - (true, false) => { - // start the full GRANDPA voter - let telemetry_on_connect = TelemetryOnConnect { - telemetry_connection_sinks: service.telemetry_on_connect_stream(), - }; - let grandpa_config = grandpa::GrandpaParams { - config: config, - link: link_half, - network: service.network(), - inherent_data_providers: - service.config().custom.inherent_data_providers.clone(), - on_exit: service.on_exit(), - telemetry_on_connect: Some(telemetry_on_connect), - }; - - // the GRANDPA voter task is considered infallible, i.e. - // if it fails we take down the service with it. - service.spawn_essential_task(grandpa::run_grandpa_voter(grandpa_config)?); - }, - (_, true) => { - grandpa::setup_disabled_grandpa( - service.client(), - &service.config().custom.inherent_data_providers, - service.network(), - )?; - }, - } - - Ok(service) - } - }, - LightService = LightComponents - { |config| >::new(config) }, - FullImportQueue = BabeImportQueue - { - | - config: &mut FactoryFullConfiguration, - client: Arc>, - select_chain: Self::SelectChain, - transaction_pool: Option>>, - | - { +/// Starts a `ServiceBuilder` for a full service. +/// +/// Use this macro if you don't actually need the full service, but just the builder in order to +/// be able to perform chain operations. +macro_rules! new_full_start { + ($config:expr) => {{ + let mut import_setup = None; + let inherent_data_providers = inherents::InherentDataProviders::new(); + let mut tasks_to_spawn = None; + + let builder = substrate_service::ServiceBuilder::new_full::< + node_primitives::Block, node_runtime::RuntimeApi, node_executor::Executor + >($config)? + .with_select_chain(|_config, client| { + #[allow(deprecated)] + Ok(client::LongestChain::new(client.backend().clone())) + })? + .with_transaction_pool(|config, client| + Ok(transaction_pool::txpool::Pool::new(config, transaction_pool::ChainApi::new(client))) + )? + .with_import_queue(|_config, client, mut select_chain, transaction_pool| { + let select_chain = select_chain.take() + .ok_or_else(|| substrate_service::Error::SelectChainRequired)?; let (block_import, link_half) = - grandpa::block_import::<_, _, _, RuntimeApi, FullClient, _>( + grandpa::block_import::<_, _, _, node_runtime::RuntimeApi, _, _>( client.clone(), client.clone(), select_chain )?; let justification_import = block_import.clone(); - let (import_queue, babe_link, babe_block_import, pruning_task) = import_queue( - Config::get_or_compute(&*client)?, + let (import_queue, babe_link, babe_block_import, pruning_task) = babe::import_queue( + babe::Config::get_or_compute(&*client)?, block_import, Some(Box::new(justification_import)), None, client.clone(), client, - config.custom.inherent_data_providers.clone(), - transaction_pool, + inherent_data_providers.clone(), + Some(transaction_pool) )?; - config.custom.import_setup = Some((babe_block_import.clone(), link_half, babe_link)); - config.custom.tasks_to_spawn = Some(vec![Box::new(pruning_task)]); + import_setup = Some((babe_block_import.clone(), link_half, babe_link)); + tasks_to_spawn = Some(vec![Box::new(pruning_task)]); Ok(import_queue) - }}, - LightImportQueue = BabeImportQueue - { |config: &FactoryFullConfiguration, client: Arc>| { - #[allow(deprecated)] - let fetch_checker = client.backend().blockchain().fetcher() - .upgrade() - .map(|fetcher| fetcher.checker().clone()) - .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; - let block_import = grandpa::light_block_import::<_, _, _, RuntimeApi, LightClient>( - client.clone(), Arc::new(fetch_checker), client.clone() - )?; + })?; + + (builder, import_setup, inherent_data_providers, tasks_to_spawn) + }} +} - let finality_proof_import = block_import.clone(); - let finality_proof_request_builder = - finality_proof_import.create_finality_proof_request_builder(); +/// Builds a new service for a full client. +pub fn new_full(config: Configuration) +-> Result { - // FIXME: pruning task isn't started since light client doesn't do `AuthoritySetup`. - let (import_queue, ..) = import_queue::<_, _, _, _, _, _, TransactionPool>( - Config::get_or_compute(&*client)?, - block_import, - None, - Some(Box::new(finality_proof_import)), - client.clone(), - client, - config.custom.inherent_data_providers.clone(), - None, - )?; + let (builder, mut import_setup, inherent_data_providers, mut tasks_to_spawn) = new_full_start!(config); - Ok((import_queue, finality_proof_request_builder)) - }}, - SelectChain = LongestChain, Self::Block> - { |config: &FactoryFullConfiguration, client: Arc>| { - #[allow(deprecated)] - Ok(LongestChain::new(client.backend().clone())) - } + let service = builder.with_network_protocol(|_| Ok(NodeProtocol::new()))? + .with_finality_proof_provider(|client| + Ok(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _) + )? + .build()?; + + let (block_import, link_half, babe_link) = import_setup.take() + .expect("Link Half and Block Import are present for Full Services or setup failed before. qed"); + + // spawn any futures that were created in the previous setup steps + if let Some(tasks) = tasks_to_spawn.take() { + for task in tasks { + service.spawn_task( + task.select(service.on_exit()) + .map(|_| ()) + .map_err(|_| ()) + ); + } + } + + if service.config().roles.is_authority() { + let proposer = substrate_basic_authorship::ProposerFactory { + client: service.client(), + transaction_pool: service.transaction_pool(), + }; + + let client = service.client(); + let select_chain = service.select_chain() + .ok_or(ServiceError::SelectChainRequired)?; + + let babe_config = babe::BabeParams { + config: Config::get_or_compute(&*client)?, + keystore: service.keystore(), + client, + select_chain, + block_import, + env: proposer, + sync_oracle: service.network(), + inherent_data_providers: inherent_data_providers.clone(), + force_authoring: service.config().force_authoring, + time_source: babe_link, + }; + + let babe = start_babe(babe_config)?; + let select = babe.select(service.on_exit()).then(|_| Ok(())); + service.spawn_task(Box::new(select)); + } + + let config = grandpa::Config { + // FIXME #1578 make this available through chainspec + gossip_duration: Duration::from_millis(333), + justification_period: 4096, + name: Some(service.config().name.clone()), + keystore: Some(service.keystore()), + }; + + match (service.config().roles.is_authority(), service.config().disable_grandpa) { + (false, false) => { + // start the lightweight GRANDPA observer + service.spawn_task(Box::new(grandpa::run_grandpa_observer( + config, + link_half, + service.network(), + service.on_exit(), + )?)); + }, + (true, false) => { + // start the full GRANDPA voter + let telemetry_on_connect = TelemetryOnConnect { + telemetry_connection_sinks: service.telemetry_on_connect_stream(), + }; + let grandpa_config = grandpa::GrandpaParams { + config: config, + link: link_half, + network: service.network(), + inherent_data_providers: inherent_data_providers.clone(), + on_exit: service.on_exit(), + telemetry_on_connect: Some(telemetry_on_connect), + }; + service.spawn_task(Box::new(grandpa::run_grandpa_voter(grandpa_config)?)); + }, + (_, true) => { + grandpa::setup_disabled_grandpa( + service.client(), + &inherent_data_providers, + service.network(), + )?; }, - FinalityProofProvider = { |client: Arc>| { - Ok(Some(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _)) - }}, } + + Ok(service) } +/// Builds a new service for a light client. +pub fn new_light(config: Configuration) +-> Result { + let inherent_data_providers = InherentDataProviders::new(); + + ServiceBuilder::new_light::(config)? + .with_select_chain(|_config, client| { + #[allow(deprecated)] + Ok(LongestChain::new(client.backend().clone())) + })? + .with_transaction_pool(|config, client| + Ok(TransactionPool::new(config, transaction_pool::ChainApi::new(client))) + )? + .with_import_queue_and_fprb(|_config, client, _select_chain, transaction_pool| { + #[allow(deprecated)] + let fetch_checker = client.backend().blockchain().fetcher() + .upgrade() + .map(|fetcher| fetcher.checker().clone()) + .ok_or_else(|| "Trying to start light import queue without active fetch checker")?; + let block_import = grandpa::light_block_import::<_, _, _, RuntimeApi, _>( + client.clone(), Arc::new(fetch_checker), client.clone() + )?; + + let finality_proof_import = block_import.clone(); + let finality_proof_request_builder = + finality_proof_import.create_finality_proof_request_builder(); + + // FIXME: pruning task isn't started since light client doesn't do `AuthoritySetup`. + let (import_queue, ..) = import_queue( + Config::get_or_compute(&*client)?, + block_import, + None, + Some(Box::new(finality_proof_import)), + client.clone(), + client, + inherent_data_providers.clone(), + Some(transaction_pool) + )?; + + Ok((import_queue, finality_proof_request_builder)) + })? + .with_network_protocol(|_| Ok(NodeProtocol::new()))? + .with_finality_proof_provider(|client| + Ok(Arc::new(GrandpaFinalityProofProvider::new(client.clone(), client)) as _) + )? + .build() +} #[cfg(test)] mod tests { @@ -301,9 +257,9 @@ mod tests { use timestamp; use finality_tracker; use keyring::AccountKeyring; - use substrate_service::ServiceFactory; + use substrate_service::AbstractService; use service_test::SyncService; - use crate::service::Factory; + use crate::service::{new_full, new_light}; #[cfg(feature = "rhd")] fn test_sync() { @@ -358,14 +314,16 @@ mod tests { let v: Vec = Decode::decode(&mut xt.as_slice()).unwrap(); OpaqueExtrinsic(v) }; - service_test::sync::( + service_test::sync( chain_spec::integration_test_config(), + |config| new_full(config), + |config| new_light(config), block_factory, extrinsic_factory, ); } - #[test] + /*#[test] #[ignore] fn test_sync() { let keystore_path = tempfile::tempdir().expect("Creates keystore path"); @@ -376,127 +334,127 @@ mod tests { let chain_spec = crate::chain_spec::tests::integration_test_config_with_single_authority(); + // For the block factory let mut slot_num = 1u64; - let block_factory = |service: &SyncService<::FullService>| { - let service = service.get(); - let mut inherent_data = service - .config() - .custom - .inherent_data_providers - .create_inherent_data() - .expect("Creates inherent data."); - inherent_data.replace_data(finality_tracker::INHERENT_IDENTIFIER, &1u64); - - let parent_id = BlockId::number(service.client().info().chain.best_number); - let parent_header = service.client().header(&parent_id).unwrap().unwrap(); - let mut proposer_factory = substrate_basic_authorship::ProposerFactory { - client: service.client(), - transaction_pool: service.transaction_pool(), - }; - - let mut digest = Digest::::default(); - - // even though there's only one authority some slots might be empty, - // so we must keep trying the next slots until we can claim one. - let babe_pre_digest = loop { - inherent_data.replace_data(timestamp::INHERENT_IDENTIFIER, &(slot_num * SLOT_DURATION)); - if let Some(babe_pre_digest) = babe::test_helpers::claim_slot( - &*service.client(), - &parent_id, - slot_num, - (278, 1000), - &keystore, - ) { - break babe_pre_digest; - } - - slot_num += 1; - }; - - digest.push(::babe_pre_digest(babe_pre_digest)); - - let mut proposer = proposer_factory.init(&parent_header).unwrap(); - let new_block = futures03::executor::block_on(proposer.propose( - inherent_data, - digest, - std::time::Duration::from_secs(1), - )).expect("Error making test block"); - - let (new_header, new_body) = new_block.deconstruct(); - let pre_hash = new_header.hash(); - // sign the pre-sealed hash of the block and then - // add it to a digest item. - let to_sign = pre_hash.encode(); - let signature = alice.sign(&to_sign[..]); - let item = ::babe_seal( - signature.into(), - ); - slot_num += 1; - - BlockImportParams { - origin: BlockOrigin::File, - header: new_header, - justification: None, - post_digests: vec![item], - body: Some(new_body), - finalized: true, - auxiliary: Vec::new(), - fork_choice: ForkChoiceStrategy::LongestChain, - } - }; + // For the extrinsics factory let bob = Arc::new(AccountKeyring::Bob.pair()); let charlie = Arc::new(AccountKeyring::Charlie.pair()); - let mut index = 0; - let extrinsic_factory = |service: &SyncService<::FullService>| { - let amount = 5 * CENTS; - let to = AddressPublic::from_raw(bob.public().0); - let from = AddressPublic::from_raw(charlie.public().0); - let genesis_hash = service.get().client().block_hash(0).unwrap().unwrap(); - let signer = charlie.clone(); - - let function = Call::Balances(BalancesCall::transfer(to.into(), amount)); - - let check_genesis = system::CheckGenesis::new(); - let check_era = system::CheckEra::from(Era::Immortal); - let check_nonce = system::CheckNonce::from(index); - let check_weight = system::CheckWeight::new(); - let take_fees = balances::TakeFees::from(0); - let extra = (check_genesis, check_era, check_nonce, check_weight, take_fees); - - let raw_payload = (function, extra.clone(), genesis_hash, genesis_hash); - let signature = raw_payload.using_encoded(|payload| if payload.len() > 256 { - signer.sign(&blake2_256(payload)[..]) - } else { - signer.sign(payload) - }); - let xt = UncheckedExtrinsic::new_signed( - raw_payload.0, - from.into(), - signature.into(), - extra, - ).encode(); - let v: Vec = Decode::decode(&mut xt.as_slice()).unwrap(); - index += 1; - OpaqueExtrinsic(v) - }; - - service_test::sync::( + service_test::sync( chain_spec, - block_factory, - extrinsic_factory, + |config| new_full(config), + |config| new_light(config), + |service| { + let mut inherent_data = service + .config() + .custom + .inherent_data_providers + .create_inherent_data() + .expect("Creates inherent data."); + inherent_data.replace_data(finality_tracker::INHERENT_IDENTIFIER, &1u64); + + let parent_id = BlockId::number(service.client().info().chain.best_number); + let parent_header = service.client().header(&parent_id).unwrap().unwrap(); + let mut proposer_factory = substrate_basic_authorship::ProposerFactory { + client: service.client(), + transaction_pool: service.transaction_pool(), + }; + + let mut digest = Digest::::default(); + + // even though there's only one authority some slots might be empty, + // so we must keep trying the next slots until we can claim one. + let babe_pre_digest = loop { + inherent_data.replace_data(timestamp::INHERENT_IDENTIFIER, &(slot_num * SLOT_DURATION)); + if let Some(babe_pre_digest) = babe::test_helpers::claim_slot( + &*service.client(), + &parent_id, + slot_num, + (278, 1000), + &keystore, + ) { + break babe_pre_digest; + } + + slot_num += 1; + }; + + digest.push(::babe_pre_digest(babe_pre_digest)); + + let mut proposer = proposer_factory.init(&parent_header).unwrap(); + let new_block = futures03::executor::block_on(proposer.propose( + inherent_data, + digest, + std::time::Duration::from_secs(1), + )).expect("Error making test block"); + + let (new_header, new_body) = new_block.deconstruct(); + let pre_hash = new_header.hash(); + // sign the pre-sealed hash of the block and then + // add it to a digest item. + let to_sign = pre_hash.encode(); + let signature = alice.sign(&to_sign[..]); + let item = ::babe_seal( + signature.into(), + ); + slot_num += 1; + + BlockImportParams { + origin: BlockOrigin::File, + header: new_header, + justification: None, + post_digests: vec![item], + body: Some(new_body), + finalized: true, + auxiliary: Vec::new(), + fork_choice: ForkChoiceStrategy::LongestChain, + } + }, + |service| { + let amount = 5 * CENTS; + let to = AddressPublic::from_raw(bob.public().0); + let from = AddressPublic::from_raw(charlie.public().0); + let genesis_hash = service.client().block_hash(0).unwrap().unwrap(); + let signer = charlie.clone(); + + let function = Call::Balances(BalancesCall::transfer(to.into(), amount)); + + let check_genesis = system::CheckGenesis::new(); + let check_era = system::CheckEra::from(Era::Immortal); + let check_nonce = system::CheckNonce::from(index); + let check_weight = system::CheckWeight::new(); + let take_fees = balances::TakeFees::from(0); + let extra = (check_genesis, check_era, check_nonce, check_weight, take_fees); + + let raw_payload = (function, extra.clone(), genesis_hash, genesis_hash); + let signature = raw_payload.using_encoded(|payload| if payload.len() > 256 { + signer.sign(&blake2_256(payload)[..]) + } else { + signer.sign(payload) + }); + let xt = UncheckedExtrinsic::new_signed( + raw_payload.0, + from.into(), + signature.into(), + extra, + ).encode(); + let v: Vec = Decode::decode(&mut xt.as_slice()).unwrap(); + + index += 1; + OpaqueExtrinsic(v) + }, ); - } + }*/ #[test] #[ignore] fn test_consensus() { - use super::Factory; - - service_test::consensus::( + service_test::consensus( crate::chain_spec::tests::integration_test_config_with_two_authorities(), + |config| new_full(config), + |config| new_light(config), vec![ "//Alice".into(), "//Bob".into(), diff --git a/test-utils/transaction-factory/src/complex_mode.rs b/test-utils/transaction-factory/src/complex_mode.rs index 25170f8c1888b..81077506703d8 100644 --- a/test-utils/transaction-factory/src/complex_mode.rs +++ b/test-utils/transaction-factory/src/complex_mode.rs @@ -41,28 +41,29 @@ use std::sync::Arc; use log::info; +use client::Client; use client::block_builder::api::BlockBuilder; use client::runtime_api::ConstructRuntimeApi; +use primitives::{Blake2Hasher, Hasher}; use sr_primitives::generic::BlockId; use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi, One, Zero}; -use substrate_service::{ - FactoryBlock, FullClient, ServiceFactory, ComponentClient, FullComponents -}; use crate::{RuntimeAdapter, create_block}; -pub fn next( +pub fn next( factory_state: &mut RA, - client: &Arc>>, + client: &Arc>, genesis_hash: ::Hash, prior_block_hash: ::Hash, - prior_block_id: BlockId, -) -> Option<::Block> + prior_block_id: BlockId, +) -> Option where - F: ServiceFactory, - F::RuntimeApi: ConstructRuntimeApi, FullClient>, - FullClient: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: BlockBuilder>, + Block: BlockT::Out>, + Exec: client::CallExecutor + Send + Sync + Clone, + Backend: client::backend::Backend + Send, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: BlockBuilder, + RtApi: ConstructRuntimeApi> + Send + Sync, RA: RuntimeAdapter, { let total = factory_state.start_number() + factory_state.num() * factory_state.rounds(); @@ -100,7 +101,7 @@ where let inherents = client.runtime_api().inherent_extrinsics(&prior_block_id, inherents) .expect("Failed to create inherent extrinsics"); - let block = create_block::(&client, transfer, inherents); + let block = create_block::(&client, transfer, inherents); info!( "Created block {} with hash {}. Transferring {} from {} to {}.", factory_state.block_no() + RA::Number::one(), diff --git a/test-utils/transaction-factory/src/lib.rs b/test-utils/transaction-factory/src/lib.rs index ab7dfb8ceab22..0e1ae330b793c 100644 --- a/test-utils/transaction-factory/src/lib.rs +++ b/test-utils/transaction-factory/src/lib.rs @@ -26,22 +26,19 @@ use std::fmt::Display; use log::info; -use client::block_builder::api::BlockBuilder; -use client::runtime_api::ConstructRuntimeApi; +use client::{Client, block_builder::api::BlockBuilder, runtime_api::ConstructRuntimeApi}; use consensus_common::{ BlockOrigin, BlockImportParams, InherentData, ForkChoiceStrategy, SelectChain }; use consensus_common::block_import::BlockImport; use codec::{Decode, Encode}; +use primitives::{Blake2Hasher, Hasher}; use sr_primitives::generic::BlockId; use sr_primitives::traits::{ Block as BlockT, Header as HeaderT, ProvideRuntimeApi, SimpleArithmetic, One, Zero, }; -use substrate_service::{ - FactoryBlock, FactoryFullConfiguration, FullClient, new_client, - ServiceFactory, ComponentClient, FullComponents}; pub use crate::modes::Mode; pub mod modes; @@ -94,15 +91,19 @@ pub trait RuntimeAdapter { /// Manufactures transactions. The exact amount depends on /// `mode`, `num` and `rounds`. -pub fn factory( +pub fn factory( mut factory_state: RA, - mut config: FactoryFullConfiguration, + client: &Arc>, + select_chain: &Sc, ) -> cli::error::Result<()> where - F: ServiceFactory, - F::RuntimeApi: ConstructRuntimeApi, FullClient>, - FullClient: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: BlockBuilder>, + Block: BlockT::Out>, + Exec: client::CallExecutor + Send + Sync + Clone, + Backend: client::backend::Backend + Send, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: BlockBuilder, + RtApi: ConstructRuntimeApi> + Send + Sync, + Sc: SelectChain, RA: RuntimeAdapter, <::Block as BlockT>::Hash: From, { @@ -111,26 +112,22 @@ where return Err(cli::error::Error::Input(msg)); } - let client = new_client::(&config)?; - - let select_chain = F::build_select_chain(&mut config, client.clone())?; - - let best_header: Result<::Header, cli::error::Error> = + let best_header: Result<::Header, cli::error::Error> = select_chain.best_chain().map_err(|e| format!("{:?}", e).into()); let mut best_hash = best_header?.hash(); - let best_block_id = BlockId::::hash(best_hash); + let best_block_id = BlockId::::hash(best_hash); let genesis_hash = client.block_hash(Zero::zero())? .expect("Genesis block always exists; qed").into(); while let Some(block) = match factory_state.mode() { - Mode::MasterToNToM => complex_mode::next::( + Mode::MasterToNToM => complex_mode::next::( &mut factory_state, &client, genesis_hash, best_hash.into(), best_block_id, ), - _ => simple_modes::next::( + _ => simple_modes::next::( &mut factory_state, &client, genesis_hash, @@ -139,7 +136,7 @@ where ), } { best_hash = block.header().hash(); - import_block::(&client, block); + import_block(&client, block); info!("Imported block at {}", factory_state.block_no()); } @@ -148,16 +145,18 @@ where } /// Create a baked block from a transfer extrinsic and timestamp inherent. -pub fn create_block( - client: &Arc>>, +pub fn create_block( + client: &Arc>, transfer: ::Extrinsic, - inherent_extrinsics: Vec<::Extrinsic>, -) -> ::Block + inherent_extrinsics: Vec<::Extrinsic>, +) -> Block where - F: ServiceFactory, - FullClient: ProvideRuntimeApi, - F::RuntimeApi: ConstructRuntimeApi, FullClient>, - as ProvideRuntimeApi>::Api: BlockBuilder>, + Block: BlockT::Out>, + Exec: client::CallExecutor + Send + Sync + Clone, + Backend: client::backend::Backend + Send, + Client: ProvideRuntimeApi, + RtApi: ConstructRuntimeApi> + Send + Sync, + as ProvideRuntimeApi>::Api: BlockBuilder, RA: RuntimeAdapter, { let mut block = client.new_block(Default::default()).expect("Failed to create new block"); @@ -173,10 +172,13 @@ where block.bake().expect("Failed to bake block") } -fn import_block( - client: &Arc>>, - block: ::Block -) -> () where F: ServiceFactory +fn import_block( + client: &Arc>, + block: Block +) -> () where + Block: BlockT::Out>, + Exec: client::CallExecutor + Send + Sync + Clone, + Backend: client::backend::Backend + Send, { let import = BlockImportParams { origin: BlockOrigin::File, diff --git a/test-utils/transaction-factory/src/simple_modes.rs b/test-utils/transaction-factory/src/simple_modes.rs index 0554678fbbd0d..141d5616a3a08 100644 --- a/test-utils/transaction-factory/src/simple_modes.rs +++ b/test-utils/transaction-factory/src/simple_modes.rs @@ -36,28 +36,29 @@ use std::sync::Arc; use log::info; +use client::Client; use client::block_builder::api::BlockBuilder; use client::runtime_api::ConstructRuntimeApi; +use primitives::{Blake2Hasher, Hasher}; use sr_primitives::traits::{Block as BlockT, ProvideRuntimeApi, One}; use sr_primitives::generic::BlockId; -use substrate_service::{ - FactoryBlock, FullClient, ServiceFactory, ComponentClient, FullComponents -}; use crate::{Mode, RuntimeAdapter, create_block}; -pub fn next( +pub fn next( factory_state: &mut RA, - client: &Arc>>, + client: &Arc>, genesis_hash: ::Hash, prior_block_hash: ::Hash, - prior_block_id: BlockId, -) -> Option<::Block> + prior_block_id: BlockId, +) -> Option where - F: ServiceFactory, - F::RuntimeApi: ConstructRuntimeApi, FullClient>, - FullClient: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: BlockBuilder>, + Block: BlockT::Out>, + Exec: client::CallExecutor + Send + Sync + Clone, + Backend: client::backend::Backend + Send, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: BlockBuilder, + RtApi: ConstructRuntimeApi> + Send + Sync, RA: RuntimeAdapter, { if factory_state.block_no() >= factory_state.num() { @@ -91,7 +92,7 @@ where let inherents = client.runtime_api().inherent_extrinsics(&prior_block_id, inherents) .expect("Failed to create inherent extrinsics"); - let block = create_block::(&client, transfer, inherents); + let block = create_block::(&client, transfer, inherents); factory_state.set_block_no(factory_state.block_no() + RA::Number::one());