diff --git a/Cargo.lock b/Cargo.lock index 7cdf58413b3de..b714f8935956b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4494,6 +4494,7 @@ dependencies = [ "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", "hash-db 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4984,6 +4985,7 @@ dependencies = [ "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "futures-preview 0.3.0-alpha.17 (registry+https://github.com/rust-lang/crates.io-index)", + "hash-db 0.15.2 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-pubsub 13.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/client/Cargo.toml b/core/client/Cargo.toml index 9edb9b1c85d35..76daf1dc60dbe 100644 --- a/core/client/Cargo.toml +++ b/core/client/Cargo.toml @@ -10,7 +10,8 @@ fnv = { version = "1.0", optional = true } log = { version = "0.4", optional = true } parking_lot = { version = "0.9.0", optional = true } hex = { package = "hex-literal", version = "0.2", optional = true } -futures-preview = { version = "=0.3.0-alpha.17", optional = true } +futures = { version = "0.1", optional = true } +futures03 = { package = "futures-preview", version = "=0.3.0-alpha.17", features = ["compat"], optional = true } consensus = { package = "substrate-consensus-common", path = "../consensus/common", optional = true } executor = { package = "substrate-executor", path = "../executor", optional = true } state-machine = { package = "substrate-state-machine", path = "../state-machine", optional = true } @@ -49,7 +50,8 @@ std = [ "fnv", "log", "hex", - "futures-preview", + "futures", + "futures03", "executor", "state-machine", "keyring", diff --git a/core/client/src/backend.rs b/core/client/src/backend.rs index 07bb6d6c91287..a42fdcff240b9 100644 --- a/core/client/src/backend.rs +++ b/core/client/src/backend.rs @@ -16,8 +16,10 @@ //! Substrate Client data backend +use std::sync::Arc; use std::collections::HashMap; use crate::error; +use crate::light::blockchain::RemoteBlockchain; use primitives::ChangesTrieConfiguration; use sr_primitives::{generic::BlockId, Justification, StorageOverlay, ChildrenStorageOverlay}; use sr_primitives::traits::{Block as BlockT, NumberFor}; @@ -303,4 +305,7 @@ where { /// Returns true if the state for given block is available locally. fn is_local_state_available(&self, block: &BlockId) -> bool; + /// Returns reference to blockchain backend that either resolves blockchain data + /// locally, or prepares request to fetch that data from remote node. + fn remote_blockchain(&self) -> Arc>; } diff --git a/core/client/src/client.rs b/core/client/src/client.rs index 41dae58970544..be11c5d4f2de9 100644 --- a/core/client/src/client.rs +++ b/core/client/src/client.rs @@ -21,7 +21,7 @@ use std::{ panic::UnwindSafe, result, cell::RefCell, rc::Rc, }; use log::{info, trace, warn}; -use futures::channel::mpsc; +use futures03::channel::mpsc; use parking_lot::{Mutex, RwLock}; use codec::{Encode, Decode}; use hash_db::{Hasher, Prefix}; diff --git a/core/client/src/in_mem.rs b/core/client/src/in_mem.rs index f450ceebe2a91..58bfa05a36166 100644 --- a/core/client/src/in_mem.rs +++ b/core/client/src/in_mem.rs @@ -718,6 +718,10 @@ where .map(|num| num.is_zero()) .unwrap_or(false) } + + fn remote_blockchain(&self) -> Arc> { + unimplemented!() + } } /// Prunable in-memory changes trie storage. diff --git a/core/client/src/light/backend.rs b/core/client/src/light/backend.rs index 888c9d2033f67..b3dc6c225766c 100644 --- a/core/client/src/light/backend.rs +++ b/core/client/src/light/backend.rs @@ -231,8 +231,8 @@ impl ClientBackend for Backend where impl RemoteBackend for Backend where Block: BlockT, - S: BlockchainStorage, - F: Fetcher, + S: BlockchainStorage + 'static, + F: Fetcher + 'static, H: Hasher, H::Out: Ord, { @@ -242,6 +242,10 @@ where .map(|num| num.is_zero()) .unwrap_or(false) } + + fn remote_blockchain(&self) -> Arc> { + self.blockchain.clone() + } } impl BlockImportOperation for ImportOperation @@ -358,7 +362,7 @@ where *self.cached_header.write() = Some(cached_header); } - futures::executor::block_on( + futures03::executor::block_on( self.fetcher.upgrade().ok_or(ClientError::NotAvailableOnLightClient)? .remote_read(RemoteReadRequest { block: self.block, diff --git a/core/client/src/light/blockchain.rs b/core/client/src/light/blockchain.rs index a2c2fe72baf93..726d2abdc6f2f 100644 --- a/core/client/src/light/blockchain.rs +++ b/core/client/src/light/blockchain.rs @@ -17,6 +17,7 @@ //! Light client blockchain backend. Only stores headers and justifications of recent //! blocks. CHT roots are stored for headers of ancient blocks. +use std::future::Future; use std::{sync::{Weak, Arc}, collections::HashMap}; use parking_lot::Mutex; @@ -72,6 +73,27 @@ pub trait Storage: AuxStore + BlockchainHeaderBackend { fn cache(&self) -> Option>>; } +/// Remote header. +#[derive(Debug)] +pub enum LocalOrRemote { + /// When data is available locally, it is returned. + Local(Data), + /// When data is unavailable locally, the request to fetch it from remote node is returned. + Remote(Request), + /// When data is unknown. + Unknown, +} + +/// Futures-based blockchain backend that either resolves blockchain data +/// locally, or fetches required data from remote node. +pub trait RemoteBlockchain: Send + Sync { + /// Get block header. + fn header(&self, id: BlockId) -> ClientResult, + >>; +} + /// Light client blockchain. pub struct Blockchain { fetcher: Mutex>, @@ -105,32 +127,10 @@ impl Blockchain { impl BlockchainHeaderBackend for Blockchain where Block: BlockT, S: Storage, F: Fetcher { fn header(&self, id: BlockId) -> ClientResult> { - match self.storage.header(id)? { - Some(header) => Ok(Some(header)), - None => { - let number = match id { - BlockId::Hash(hash) => match self.storage.number(hash)? { - Some(number) => number, - None => return Ok(None), - }, - BlockId::Number(number) => number, - }; - - // if the header is from future or genesis (we never prune genesis) => return - if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown { - return Ok(None); - } - - futures::executor::block_on( - self.fetcher().upgrade() - .ok_or(ClientError::NotAvailableOnLightClient)? - .remote_header(RemoteHeaderRequest { - cht_root: self.storage.header_cht_root(cht::size(), number)?, - block: number, - retry_count: None, - }) - ).map(Some) - } + match RemoteBlockchain::header(self, id)? { + LocalOrRemote::Local(header) => Ok(Some(header)), + LocalOrRemote::Remote(_) => Err(ClientError::NotAvailableOnLightClient), + LocalOrRemote::Unknown => Ok(None), } } @@ -153,12 +153,12 @@ impl BlockchainHeaderBackend for Blockchain where Bloc impl BlockchainBackend for Blockchain where Block: BlockT, S: Storage, F: Fetcher { fn body(&self, id: BlockId) -> ClientResult>> { - let header = match self.header(id)? { + let header = match BlockchainHeaderBackend::header(self, id)? { Some(header) => header, None => return Ok(None), }; - futures::executor::block_on( + futures03::executor::block_on( self.fetcher().upgrade().ok_or(ClientError::NotAvailableOnLightClient)? .remote_body(RemoteBodyRequest { header, @@ -194,6 +194,62 @@ impl, F, Block: BlockT> ProvideCache for Blockchain RemoteBlockchain for Blockchain + where + S: Storage, + F: Fetcher + Send + Sync, +{ + fn header(&self, id: BlockId) -> ClientResult, + >> { + // first, try to read header from local storage + if let Some(local_header) = self.storage.header(id)? { + return Ok(LocalOrRemote::Local(local_header)); + } + + // we need to know block number to check if it's a part of CHT + let number = match id { + BlockId::Hash(hash) => match self.storage.number(hash)? { + Some(number) => number, + None => return Ok(LocalOrRemote::Unknown), + }, + BlockId::Number(number) => number, + }; + + // if the header is genesis (never pruned), non-canonical, or from future => return + if number.is_zero() || self.storage.status(BlockId::Number(number))? == BlockStatus::Unknown { + return Ok(LocalOrRemote::Unknown); + } + + Ok(LocalOrRemote::Remote(RemoteHeaderRequest { + cht_root: self.storage.header_cht_root(cht::size(), number)?, + block: number, + retry_count: None, + })) + } +} + +/// Returns future that resolves header either locally, or remotely. +pub fn future_header>( + blockchain: &dyn RemoteBlockchain, + fetcher: &F, + id: BlockId, +) -> impl Future, ClientError>> { + use futures03::future::{ready, Either, FutureExt}; + + match blockchain.header(id) { + Ok(LocalOrRemote::Remote(request)) => Either::Left( + fetcher + .remote_header(request) + .then(|header| ready(header.map(Some))) + ), + Ok(LocalOrRemote::Unknown) => Either::Right(ready(Ok(None))), + Ok(LocalOrRemote::Local(local_header)) => Either::Right(ready(Ok(Some(local_header)))), + Err(err) => Either::Right(ready(Err(err))), + } +} + #[cfg(test)] pub mod tests { use std::collections::HashMap; diff --git a/core/client/src/light/call_executor.rs b/core/client/src/light/call_executor.rs index 746f36069d69e..f41c14fd42992 100644 --- a/core/client/src/light/call_executor.rs +++ b/core/client/src/light/call_executor.rs @@ -99,7 +99,7 @@ where let block_hash = self.blockchain.expect_block_hash_from_id(id)?; let block_header = self.blockchain.expect_header(id.clone())?; - futures::executor::block_on(self.fetcher.remote_call(RemoteCallRequest { + futures03::executor::block_on(self.fetcher.remote_call(RemoteCallRequest { block: block_hash, header: block_header, method: method.into(), diff --git a/core/client/src/light/fetcher.rs b/core/client/src/light/fetcher.rs index 8502a19ba64c5..2c63cfc434e84 100644 --- a/core/client/src/light/fetcher.rs +++ b/core/client/src/light/fetcher.rs @@ -144,15 +144,15 @@ pub struct RemoteBodyRequest { /// is correct (see FetchedDataChecker) and return already checked data. pub trait Fetcher: Send + Sync { /// Remote header future. - type RemoteHeaderResult: Future>; + type RemoteHeaderResult: Future> + Send + 'static; /// Remote storage read future. - type RemoteReadResult: Future>, ClientError>>; + type RemoteReadResult: Future>, ClientError>> + Send + 'static; /// Remote call result future. - type RemoteCallResult: Future, ClientError>>; + type RemoteCallResult: Future, ClientError>> + Send + 'static; /// Remote changes result future. - type RemoteChangesResult: Future, u32)>, ClientError>>; + type RemoteChangesResult: Future, u32)>, ClientError>> + Send + 'static; /// Remote block body result future. - type RemoteBodyResult: Future, ClientError>>; + type RemoteBodyResult: Future, ClientError>> + Send + 'static; /// Fetch remote header. fn remote_header(&self, request: RemoteHeaderRequest) -> Self::RemoteHeaderResult; @@ -493,7 +493,7 @@ impl<'a, H, Number, Hash> ChangesTrieRootsStorage for RootsStorage<'a #[cfg(test)] pub mod tests { - use futures::future::Ready; + use futures03::future::Ready; use parking_lot::Mutex; use codec::Decode; use crate::client::tests::prepare_client_with_key_changes; @@ -521,7 +521,7 @@ pub mod tests { where E: std::convert::From<&'static str>, { - futures::future::ready(Err("Not implemented on test node".into())) + futures03::future::ready(Err("Not implemented on test node".into())) } impl Fetcher for OkCallFetcher { @@ -544,7 +544,7 @@ pub mod tests { } fn remote_call(&self, _request: RemoteCallRequest
) -> Self::RemoteCallResult { - futures::future::ready(Ok((*self.lock()).clone())) + futures03::future::ready(Ok((*self.lock()).clone())) } fn remote_changes(&self, _request: RemoteChangesRequest
) -> Self::RemoteChangesResult { diff --git a/core/client/src/light/mod.rs b/core/client/src/light/mod.rs index 89d3c60ddc372..c53a2eef2be78 100644 --- a/core/client/src/light/mod.rs +++ b/core/client/src/light/mod.rs @@ -67,8 +67,8 @@ pub fn new_light( >, B, RA>> where B: BlockT, - S: BlockchainStorage, - F: Fetcher, + S: BlockchainStorage + 'static, + F: Fetcher + 'static, GS: BuildStorage, E: CodeExecutor + RuntimeInfo, { diff --git a/core/client/src/notifications.rs b/core/client/src/notifications.rs index 0ddc4c72cdb55..37f90dcc0ba64 100644 --- a/core/client/src/notifications.rs +++ b/core/client/src/notifications.rs @@ -22,7 +22,7 @@ use std::{ }; use fnv::{FnvHashSet, FnvHashMap}; -use futures::channel::mpsc; +use futures03::channel::mpsc; use primitives::storage::{StorageKey, StorageData}; use sr_primitives::traits::Block as BlockT; @@ -347,7 +347,7 @@ mod tests { // given let mut notifications = StorageNotifications::::default(); let child_filter = [(StorageKey(vec![4]), None)]; - let mut recv = futures::executor::block_on_stream( + let mut recv = futures03::executor::block_on_stream( notifications.listen(None, Some(&child_filter[..])) ); @@ -382,13 +382,13 @@ mod tests { // given let mut notifications = StorageNotifications::::default(); let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; - let mut recv1 = futures::executor::block_on_stream( + let mut recv1 = futures03::executor::block_on_stream( notifications.listen(Some(&[StorageKey(vec![1])]), None) ); - let mut recv2 = futures::executor::block_on_stream( + let mut recv2 = futures03::executor::block_on_stream( notifications.listen(Some(&[StorageKey(vec![2])]), None) ); - let mut recv3 = futures::executor::block_on_stream( + let mut recv3 = futures03::executor::block_on_stream( notifications.listen(Some(&[]), Some(&child_filter)) ); @@ -429,16 +429,16 @@ mod tests { let mut notifications = StorageNotifications::::default(); { let child_filter = [(StorageKey(vec![4]), Some(vec![StorageKey(vec![5])]))]; - let _recv1 = futures::executor::block_on_stream( + let _recv1 = futures03::executor::block_on_stream( notifications.listen(Some(&[StorageKey(vec![1])]), None) ); - let _recv2 = futures::executor::block_on_stream( + let _recv2 = futures03::executor::block_on_stream( notifications.listen(Some(&[StorageKey(vec![2])]), None) ); - let _recv3 = futures::executor::block_on_stream( + let _recv3 = futures03::executor::block_on_stream( notifications.listen(None, None) ); - let _recv4 = futures::executor::block_on_stream( + let _recv4 = futures03::executor::block_on_stream( notifications.listen(None, Some(&child_filter)) ); assert_eq!(notifications.listeners.len(), 2); @@ -465,7 +465,7 @@ mod tests { // given let mut recv = { let mut notifications = StorageNotifications::::default(); - let recv = futures::executor::block_on_stream(notifications.listen(None, None)); + let recv = futures03::executor::block_on_stream(notifications.listen(None, None)); // when let changeset = vec![]; diff --git a/core/rpc/Cargo.toml b/core/rpc/Cargo.toml index 1cd423c88f847..25f0a467ee7c1 100644 --- a/core/rpc/Cargo.toml +++ b/core/rpc/Cargo.toml @@ -21,6 +21,7 @@ state_machine = { package = "substrate-state-machine", path = "../state-machine" substrate-executor = { path = "../executor" } substrate-keystore = { path = "../keystore" } transaction_pool = { package = "substrate-transaction-pool", path = "../transaction-pool" } +hash-db = { version = "0.15.0", default-features = false } [dev-dependencies] assert_matches = "1.1" diff --git a/core/rpc/api/src/chain/error.rs b/core/rpc/api/src/chain/error.rs index 7c093789fad5a..eccb7f4f1b0f0 100644 --- a/core/rpc/api/src/chain/error.rs +++ b/core/rpc/api/src/chain/error.rs @@ -23,6 +23,9 @@ use jsonrpc_core as rpc; /// Chain RPC Result type. pub type Result = std::result::Result; +/// State RPC future Result type. +pub type FutureResult = Box + Send>; + /// Chain RPC errors. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { diff --git a/core/rpc/api/src/chain/mod.rs b/core/rpc/api/src/chain/mod.rs index 6dc4a60e88995..c0076d5987a37 100644 --- a/core/rpc/api/src/chain/mod.rs +++ b/core/rpc/api/src/chain/mod.rs @@ -23,7 +23,7 @@ use jsonrpc_core::Result as RpcResult; use jsonrpc_core::futures::Future; use jsonrpc_derive::rpc; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; -use self::error::Result; +use self::error::{FutureResult, Result}; pub use self::gen_client::Client as ChainClient; @@ -35,11 +35,11 @@ pub trait ChainApi { /// Get header of a relay chain block. #[rpc(name = "chain_getHeader")] - fn header(&self, hash: Option) -> Result>; + fn header(&self, hash: Option) -> FutureResult>; /// Get header and body of a relay chain block. #[rpc(name = "chain_getBlock")] - fn block(&self, hash: Option) -> Result>; + fn block(&self, hash: Option) -> FutureResult>; /// Get hash of the n-th block in the canon chain. /// diff --git a/core/rpc/api/src/state/error.rs b/core/rpc/api/src/state/error.rs index f5e9112b94454..553a06e896f3d 100644 --- a/core/rpc/api/src/state/error.rs +++ b/core/rpc/api/src/state/error.rs @@ -22,6 +22,9 @@ use jsonrpc_core as rpc; /// State RPC Result type. pub type Result = std::result::Result; +/// State RPC future Result type. +pub type FutureResult = Box + Send>; + /// State RPC errors. #[derive(Debug, derive_more::Display, derive_more::From)] pub enum Error { diff --git a/core/rpc/api/src/state/mod.rs b/core/rpc/api/src/state/mod.rs index f7cff7e3975de..0d06092ca1659 100644 --- a/core/rpc/api/src/state/mod.rs +++ b/core/rpc/api/src/state/mod.rs @@ -25,7 +25,7 @@ use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; use primitives::Bytes; use primitives::storage::{StorageKey, StorageData, StorageChangeSet}; use runtime_version::RuntimeVersion; -use self::error::Result; +use self::error::FutureResult; pub use self::gen_client::Client as StateClient; @@ -37,23 +37,23 @@ pub trait StateApi { /// Call a contract at a block's state. #[rpc(name = "state_call", alias("state_callAt"))] - fn call(&self, name: String, bytes: Bytes, hash: Option) -> Result; + fn call(&self, name: String, bytes: Bytes, hash: Option) -> FutureResult; /// Returns the keys with prefix, leave empty to get all the keys #[rpc(name = "state_getKeys")] - fn storage_keys(&self, prefix: StorageKey, hash: Option) -> Result>; + fn storage_keys(&self, prefix: StorageKey, hash: Option) -> FutureResult>; /// Returns a storage entry at a specific block's state. #[rpc(name = "state_getStorage", alias("state_getStorageAt"))] - fn storage(&self, key: StorageKey, hash: Option) -> Result>; + fn storage(&self, key: StorageKey, hash: Option) -> FutureResult>; /// Returns the hash of a storage entry at a block's state. #[rpc(name = "state_getStorageHash", alias("state_getStorageHashAt"))] - fn storage_hash(&self, key: StorageKey, hash: Option) -> Result>; + fn storage_hash(&self, key: StorageKey, hash: Option) -> FutureResult>; /// Returns the size of a storage entry at a block's state. #[rpc(name = "state_getStorageSize", alias("state_getStorageSizeAt"))] - fn storage_size(&self, key: StorageKey, hash: Option) -> Result>; + fn storage_size(&self, key: StorageKey, hash: Option) -> FutureResult>; /// Returns the keys with prefix from a child storage, leave empty to get all the keys #[rpc(name = "state_getChildKeys")] @@ -62,7 +62,7 @@ pub trait StateApi { child_storage_key: StorageKey, prefix: StorageKey, hash: Option - ) -> Result>; + ) -> FutureResult>; /// Returns a child storage entry at a specific block's state. #[rpc(name = "state_getChildStorage")] @@ -71,7 +71,7 @@ pub trait StateApi { child_storage_key: StorageKey, key: StorageKey, hash: Option - ) -> Result>; + ) -> FutureResult>; /// Returns the hash of a child storage entry at a block's state. #[rpc(name = "state_getChildStorageHash")] @@ -80,7 +80,7 @@ pub trait StateApi { child_storage_key: StorageKey, key: StorageKey, hash: Option - ) -> Result>; + ) -> FutureResult>; /// Returns the size of a child storage entry at a block's state. #[rpc(name = "state_getChildStorageSize")] @@ -89,15 +89,15 @@ pub trait StateApi { child_storage_key: StorageKey, key: StorageKey, hash: Option - ) -> Result>; + ) -> FutureResult>; /// Returns the runtime metadata as an opaque blob. #[rpc(name = "state_getMetadata")] - fn metadata(&self, hash: Option) -> Result; + fn metadata(&self, hash: Option) -> FutureResult; /// Get the runtime version. #[rpc(name = "state_getRuntimeVersion", alias("chain_getRuntimeVersion"))] - fn runtime_version(&self, hash: Option) -> Result; + fn runtime_version(&self, hash: Option) -> FutureResult; /// Query historical storage entries (by key) starting from a block given as the second parameter. /// @@ -109,7 +109,7 @@ pub trait StateApi { keys: Vec, block: Hash, hash: Option - ) -> Result>>; + ) -> FutureResult>>; /// New runtime version subscription #[pubsub( diff --git a/core/rpc/src/chain/chain_full.rs b/core/rpc/src/chain/chain_full.rs new file mode 100644 index 0000000000000..ad359a9300cdd --- /dev/null +++ b/core/rpc/src/chain/chain_full.rs @@ -0,0 +1,79 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Blockchain API backend for full nodes. + +use std::sync::Arc; +use rpc::futures::future::result; + +use api::Subscriptions; +use client::{backend::Backend, CallExecutor, Client}; +use primitives::{H256, Blake2Hasher}; +use sr_primitives::{ + generic::{BlockId, SignedBlock}, + traits::{Block as BlockT}, +}; + +use super::{ChainBackend, client_err, error::FutureResult}; + +/// Blockchain API backend for full nodes. Reads all the data from local database. +pub struct FullChain { + /// Substrate client. + client: Arc>, + /// Current subscriptions. + subscriptions: Subscriptions, +} + +impl FullChain { + /// Create new Chain API RPC handler. + pub fn new(client: Arc>, subscriptions: Subscriptions) -> Self { + Self { + client, + subscriptions, + } + } +} + +impl ChainBackend for FullChain where + Block: BlockT + 'static, + B: Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, +{ + fn client(&self) -> &Arc> { + &self.client + } + + fn subscriptions(&self) -> &Subscriptions { + &self.subscriptions + } + + fn header(&self, hash: Option) -> FutureResult> { + Box::new(result(self.client + .header(&BlockId::Hash(self.unwrap_or_best(hash))) + .map_err(client_err) + )) + } + + fn block(&self, hash: Option) + -> FutureResult>> + { + Box::new(result(self.client + .block(&BlockId::Hash(self.unwrap_or_best(hash))) + .map_err(client_err) + )) + } +} diff --git a/core/rpc/src/chain/chain_light.rs b/core/rpc/src/chain/chain_light.rs new file mode 100644 index 0000000000000..d969d6ca93702 --- /dev/null +++ b/core/rpc/src/chain/chain_light.rs @@ -0,0 +1,123 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Blockchain API backend for light nodes. + +use std::sync::Arc; +use futures03::{future::ready, FutureExt, TryFutureExt}; +use rpc::futures::future::{result, Future, Either}; + +use api::Subscriptions; +use client::{ + self, Client, + light::{ + fetcher::{Fetcher, RemoteBodyRequest}, + blockchain::RemoteBlockchain, + }, +}; +use primitives::{H256, Blake2Hasher}; +use sr_primitives::{ + generic::{BlockId, SignedBlock}, + traits::{Block as BlockT}, +}; + +use super::{ChainBackend, client_err, error::FutureResult}; + +/// Blockchain API backend for light nodes. Reads all the data from local +/// database, if available, or fetches it from remote node otherwise. +pub struct LightChain { + /// Substrate client. + client: Arc>, + /// Current subscriptions. + subscriptions: Subscriptions, + /// Remote blockchain reference + remote_blockchain: Arc>, + /// Remote fetcher reference. + fetcher: Arc, +} + +impl> LightChain { + /// Create new Chain API RPC handler. + pub fn new( + client: Arc>, + subscriptions: Subscriptions, + remote_blockchain: Arc>, + fetcher: Arc, + ) -> Self { + Self { + client, + subscriptions, + remote_blockchain, + fetcher, + } + } +} + +impl ChainBackend for LightChain where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, + F: Fetcher + Send + Sync + 'static, +{ + fn client(&self) -> &Arc> { + &self.client + } + + fn subscriptions(&self) -> &Subscriptions { + &self.subscriptions + } + + fn header(&self, hash: Option) -> FutureResult> { + let hash = self.unwrap_or_best(hash); + + let fetcher = self.fetcher.clone(); + let maybe_header = client::light::blockchain::future_header( + &*self.remote_blockchain, + &*fetcher, + BlockId::Hash(hash), + ); + + Box::new(maybe_header.then(move |result| + ready(result.map_err(client_err)), + ).boxed().compat()) + } + + fn block(&self, hash: Option) + -> FutureResult>> + { + let fetcher = self.fetcher.clone(); + let block = self.header(hash) + .and_then(move |header| match header { + Some(header) => Either::A(fetcher + .remote_body(RemoteBodyRequest { + header: header.clone(), + retry_count: Default::default(), + }) + .boxed() + .compat() + .map(move |body| Some(SignedBlock { + block: Block::new(header, body), + justification: None, + })) + .map_err(client_err) + ), + None => Either::B(result(Ok(None))), + }); + + Box::new(block) + } +} diff --git a/core/rpc/src/chain/mod.rs b/core/rpc/src/chain/mod.rs index dde54f62783d1..4c59d227c2dc4 100644 --- a/core/rpc/src/chain/mod.rs +++ b/core/rpc/src/chain/mod.rs @@ -16,95 +16,181 @@ //! Substrate blockchain API. +mod chain_full; +mod chain_light; + #[cfg(test)] mod tests; use std::sync::Arc; use futures03::{future, StreamExt as _, TryStreamExt as _}; +use log::warn; +use rpc::{ + Result as RpcResult, + futures::{stream, Future, Sink, Stream}, +}; -use client::{self, Client, BlockchainEvents}; -use rpc::Result as RpcResult; -use rpc::futures::{stream, Future, Sink, Stream}; use api::Subscriptions; +use client::{ + self, Client, BlockchainEvents, + light::{fetcher::Fetcher, blockchain::RemoteBlockchain}, +}; use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; -use log::warn; use primitives::{H256, Blake2Hasher}; -use sr_primitives::generic::{BlockId, SignedBlock}; -use sr_primitives::traits::{Block as BlockT, Header, NumberFor}; -use self::error::{Error, Result}; +use sr_primitives::{ + generic::{BlockId, SignedBlock}, + traits::{Block as BlockT, Header, NumberFor}, +}; + +use self::error::{Result, Error, FutureResult}; pub use api::chain::*; -/// Chain API with subscriptions support. -pub struct Chain { - /// Substrate client. - client: Arc>, - /// Current subscriptions. - subscriptions: Subscriptions, -} +/// Blockchain backend API +trait ChainBackend: Send + Sync + 'static + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, +{ + /// Get client reference. + fn client(&self) -> &Arc>; -impl Chain { - /// Create new Chain API RPC handler. - pub fn new(client: Arc>, subscriptions: Subscriptions) -> Self { - Self { - client, - subscriptions, + /// Get subscriptions reference. + fn subscriptions(&self) -> &Subscriptions; + + /// Tries to unwrap passed block hash, or uses best block hash otherwise. + fn unwrap_or_best(&self, hash: Option) -> Block::Hash { + match hash.into() { + None => self.client().info().chain.best_hash, + Some(hash) => hash, } } -} -impl Chain where - Block: BlockT + 'static, - B: client::backend::Backend + Send + Sync + 'static, - E: client::CallExecutor + Send + Sync + 'static, - RA: Send + Sync + 'static -{ - fn unwrap_or_best(&self, hash: Option) -> Result { - Ok(match hash.into() { - None => self.client.info().chain.best_hash, - Some(hash) => hash, + /// Get header of a relay chain block. + fn header(&self, hash: Option) -> FutureResult>; + + /// Get header and body of a relay chain block. + fn block(&self, hash: Option) -> FutureResult>>; + + /// Get hash of the n-th block in the canon chain. + /// + /// By default returns latest block hash. + fn block_hash( + &self, + number: Option>>, + ) -> Result> { + Ok(match number { + None => Some(self.client().info().chain.best_hash), + Some(num_or_hex) => self.client() + .header(&BlockId::number(num_or_hex.to_number()?)) + .map_err(client_err)? + .map(|h| h.hash()), }) } - fn subscribe_headers( + /// Get hash of the last finalized block in the canon chain. + fn finalized_head(&self) -> Result { + Ok(self.client().info().chain.finalized_hash) + } + + /// New head subscription + fn subscribe_new_heads( &self, + _metadata: crate::metadata::Metadata, subscriber: Subscriber, - best_block_hash: G, - stream: F, - ) where - F: FnOnce() -> S, - G: FnOnce() -> Result>, - ERR: ::std::fmt::Debug, - S: Stream + Send + 'static, - { - self.subscriptions.add(subscriber, |sink| { - // send current head right at the start. - let header = best_block_hash() - .and_then(|hash| self.header(hash.into())) - .and_then(|header| { - header.ok_or_else(|| "Best header missing.".to_owned().into()) - }) - .map_err(Into::into); - - // send further subscriptions - let stream = stream() - .map(|res| Ok(res)) - .map_err(|e| warn!("Block notification stream error: {:?}", e)); - - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all( - stream::iter_result(vec![Ok(header)]) - .chain(stream) - ) - // we ignore the resulting Stream (if the first stream is over we are unsubscribed) - .map(|_| ()) - }); + ) { + subscribe_headers( + self.client(), + self.subscriptions(), + subscriber, + || self.client().info().chain.best_hash, + || self.client().import_notification_stream() + .filter(|notification| future::ready(notification.is_new_best)) + .map(|notification| Ok::<_, ()>(notification.header)) + .compat(), + ) + } + + /// Unsubscribe from new head subscription. + fn unsubscribe_new_heads( + &self, + _metadata: Option, + id: SubscriptionId, + ) -> RpcResult { + Ok(self.subscriptions().cancel(id)) + } + + /// New head subscription + fn subscribe_finalized_heads( + &self, + _metadata: crate::metadata::Metadata, + subscriber: Subscriber, + ) { + subscribe_headers( + self.client(), + self.subscriptions(), + subscriber, + || self.client().info().chain.finalized_hash, + || self.client().finality_notification_stream() + .map(|notification| Ok::<_, ()>(notification.header)) + .compat(), + ) + } + + /// Unsubscribe from new head subscription. + fn unsubscribe_finalized_heads( + &self, + _metadata: Option, + id: SubscriptionId, + ) -> RpcResult { + Ok(self.subscriptions().cancel(id)) } } -fn client_error(err: client::error::Error) -> Error { - Error::Client(Box::new(err)) +/// Create new state API that works on full node. +pub fn new_full( + client: Arc>, + subscriptions: Subscriptions, +) -> Chain + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, +{ + Chain { + backend: Box::new(self::chain_full::FullChain::new(client, subscriptions)), + } +} + +/// Create new state API that works on light node. +pub fn new_light>( + client: Arc>, + subscriptions: Subscriptions, + remote_blockchain: Arc>, + fetcher: Arc, +) -> Chain + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, + F: Send + Sync + 'static, +{ + Chain { + backend: Box::new(self::chain_light::LightChain::new( + client, + subscriptions, + remote_blockchain, + fetcher, + )), + } +} + +/// Chain API with subscriptions support. +pub struct Chain { + backend: Box>, } impl ChainApi, Block::Hash, Block::Header, SignedBlock> for Chain where @@ -115,58 +201,81 @@ impl ChainApi, Block::Hash, Block::Header, Sig { type Metadata = crate::metadata::Metadata; - fn header(&self, hash: Option) -> Result> { - let hash = self.unwrap_or_best(hash)?; - Ok(self.client.header(&BlockId::Hash(hash)).map_err(client_error)?) + fn header(&self, hash: Option) -> FutureResult> { + self.backend.header(hash) } - fn block(&self, hash: Option) - -> Result>> + fn block(&self, hash: Option) -> FutureResult>> { - let hash = self.unwrap_or_best(hash)?; - Ok(self.client.block(&BlockId::Hash(hash)).map_err(client_error)?) + self.backend.block(hash) } fn block_hash(&self, number: Option>>) -> Result> { - Ok(match number { - None => Some(self.client.info().chain.best_hash), - Some(num_or_hex) => self.client - .header(&BlockId::number(num_or_hex.to_number()?)) - .map_err(client_error)? - .map(|h| h.hash()), - }) + self.backend.block_hash(number) } fn finalized_head(&self) -> Result { - Ok(self.client.info().chain.finalized_hash) + self.backend.finalized_head() } - fn subscribe_new_heads(&self, _metadata: Self::Metadata, subscriber: Subscriber) { - self.subscribe_headers( - subscriber, - || self.block_hash(None.into()), - || self.client.import_notification_stream() - .filter(|notification| future::ready(notification.is_new_best)) - .map(|notification| Ok::<_, ()>(notification.header)) - .compat(), - ) + fn subscribe_new_heads(&self, metadata: Self::Metadata, subscriber: Subscriber) { + self.backend.subscribe_new_heads(metadata, subscriber) } - fn unsubscribe_new_heads(&self, _metadata: Option, id: SubscriptionId) -> RpcResult { - Ok(self.subscriptions.cancel(id)) + fn unsubscribe_new_heads(&self, metadata: Option, id: SubscriptionId) -> RpcResult { + self.backend.unsubscribe_new_heads(metadata, id) } - fn subscribe_finalized_heads(&self, _meta: Self::Metadata, subscriber: Subscriber) { - self.subscribe_headers( - subscriber, - || Ok(Some(self.client.info().chain.finalized_hash)), - || self.client.finality_notification_stream() - .map(|notification| Ok::<_, ()>(notification.header)) - .compat(), - ) + fn subscribe_finalized_heads(&self, metadata: Self::Metadata, subscriber: Subscriber) { + self.backend.subscribe_finalized_heads(metadata, subscriber) } - fn unsubscribe_finalized_heads(&self, _metadata: Option, id: SubscriptionId) -> RpcResult { - Ok(self.subscriptions.cancel(id)) + fn unsubscribe_finalized_heads(&self, metadata: Option, id: SubscriptionId) -> RpcResult { + self.backend.unsubscribe_finalized_heads(metadata, id) } } + +/// Subscribe to new headers. +fn subscribe_headers( + client: &Arc>, + subscriptions: &Subscriptions, + subscriber: Subscriber, + best_block_hash: G, + stream: F, +) where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, + F: FnOnce() -> S, + G: FnOnce() -> Block::Hash, + ERR: ::std::fmt::Debug, + S: Stream + Send + 'static, +{ + subscriptions.add(subscriber, |sink| { + // send current head right at the start. + let header = client.header(&BlockId::Hash(best_block_hash())) + .map_err(client_err) + .and_then(|header| { + header.ok_or_else(|| "Best header missing.".to_owned().into()) + }) + .map_err(Into::into); + + // send further subscriptions + let stream = stream() + .map(|res| Ok(res)) + .map_err(|e| warn!("Block notification stream error: {:?}", e)); + + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all( + stream::iter_result(vec![Ok(header)]) + .chain(stream) + ) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); +} + +fn client_err(err: client::error::Error) -> Error { + Error::Client(Box::new(err)) +} diff --git a/core/rpc/src/chain/tests.rs b/core/rpc/src/chain/tests.rs index af00f220e45fb..8b46befee6520 100644 --- a/core/rpc/src/chain/tests.rs +++ b/core/rpc/src/chain/tests.rs @@ -27,13 +27,11 @@ fn should_return_header() { let core = ::tokio::runtime::Runtime::new().unwrap(); let remote = core.executor(); - let client = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(Arc::new(remote)), - }; + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); assert_matches!( - client.header(Some(client.client.genesis_hash()).into()), + api.header(Some(client.genesis_hash()).into()).wait(), Ok(Some(ref x)) if x == &Header { parent_hash: H256::from_low_u64_be(0), number: 0, @@ -44,7 +42,7 @@ fn should_return_header() { ); assert_matches!( - client.header(None.into()), + api.header(None.into()).wait(), Ok(Some(ref x)) if x == &Header { parent_hash: H256::from_low_u64_be(0), number: 0, @@ -55,7 +53,7 @@ fn should_return_header() { ); assert_matches!( - client.header(Some(H256::from_low_u64_be(5)).into()), + api.header(Some(H256::from_low_u64_be(5)).into()).wait(), Ok(None) ); } @@ -65,26 +63,24 @@ fn should_return_a_block() { let core = ::tokio::runtime::Runtime::new().unwrap(); let remote = core.executor(); - let api = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(Arc::new(remote)), - }; + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); - let block = api.client.new_block(Default::default()).unwrap().bake().unwrap(); + let block = client.new_block(Default::default()).unwrap().bake().unwrap(); let block_hash = block.hash(); - api.client.import(BlockOrigin::Own, block).unwrap(); + client.import(BlockOrigin::Own, block).unwrap(); // Genesis block is not justified assert_matches!( - api.block(Some(api.client.genesis_hash()).into()), + api.block(Some(client.genesis_hash()).into()).wait(), Ok(Some(SignedBlock { justification: None, .. })) ); assert_matches!( - api.block(Some(block_hash).into()), + api.block(Some(block_hash).into()).wait(), Ok(Some(ref x)) if x.block == Block { header: Header { - parent_hash: api.client.genesis_hash(), + parent_hash: client.genesis_hash(), number: 1, state_root: x.block.header.state_root.clone(), extrinsics_root: "03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314".parse().unwrap(), @@ -95,10 +91,10 @@ fn should_return_a_block() { ); assert_matches!( - api.block(None.into()), + api.block(None.into()).wait(), Ok(Some(ref x)) if x.block == Block { header: Header { - parent_hash: api.client.genesis_hash(), + parent_hash: client.genesis_hash(), number: 1, state_root: x.block.header.state_root.clone(), extrinsics_root: "03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314".parse().unwrap(), @@ -109,7 +105,7 @@ fn should_return_a_block() { ); assert_matches!( - api.block(Some(H256::from_low_u64_be(5)).into()), + api.block(Some(H256::from_low_u64_be(5)).into()).wait(), Ok(None) ); } @@ -119,40 +115,38 @@ fn should_return_block_hash() { let core = ::tokio::runtime::Runtime::new().unwrap(); let remote = core.executor(); - let client = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(Arc::new(remote)), - }; + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); assert_matches!( - client.block_hash(None.into()), - Ok(Some(ref x)) if x == &client.client.genesis_hash() + api.block_hash(None.into()), + Ok(Some(ref x)) if x == &client.genesis_hash() ); assert_matches!( - client.block_hash(Some(0u64.into()).into()), - Ok(Some(ref x)) if x == &client.client.genesis_hash() + api.block_hash(Some(0u64.into()).into()), + Ok(Some(ref x)) if x == &client.genesis_hash() ); assert_matches!( - client.block_hash(Some(1u64.into()).into()), + api.block_hash(Some(1u64.into()).into()), Ok(None) ); - let block = client.client.new_block(Default::default()).unwrap().bake().unwrap(); - client.client.import(BlockOrigin::Own, block.clone()).unwrap(); + let block = client.new_block(Default::default()).unwrap().bake().unwrap(); + client.import(BlockOrigin::Own, block.clone()).unwrap(); assert_matches!( - client.block_hash(Some(0u64.into()).into()), - Ok(Some(ref x)) if x == &client.client.genesis_hash() + api.block_hash(Some(0u64.into()).into()), + Ok(Some(ref x)) if x == &client.genesis_hash() ); assert_matches!( - client.block_hash(Some(1u64.into()).into()), + api.block_hash(Some(1u64.into()).into()), Ok(Some(ref x)) if x == &block.hash() ); assert_matches!( - client.block_hash(Some(::primitives::U256::from(1u64).into()).into()), + api.block_hash(Some(::primitives::U256::from(1u64).into()).into()), Ok(Some(ref x)) if x == &block.hash() ); } @@ -163,30 +157,28 @@ fn should_return_finalized_hash() { let core = ::tokio::runtime::Runtime::new().unwrap(); let remote = core.executor(); - let client = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(Arc::new(remote)), - }; + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); assert_matches!( - client.finalized_head(), - Ok(ref x) if x == &client.client.genesis_hash() + api.finalized_head(), + Ok(ref x) if x == &client.genesis_hash() ); // import new block - let builder = client.client.new_block(Default::default()).unwrap(); - client.client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + let builder = client.new_block(Default::default()).unwrap(); + client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); // no finalization yet assert_matches!( - client.finalized_head(), - Ok(ref x) if x == &client.client.genesis_hash() + api.finalized_head(), + Ok(ref x) if x == &client.genesis_hash() ); // finalize - client.client.finalize_block(BlockId::number(1), None).unwrap(); + client.finalize_block(BlockId::number(1), None).unwrap(); assert_matches!( - client.finalized_head(), - Ok(ref x) if x == &client.client.block_hash(1).unwrap().unwrap() + api.finalized_head(), + Ok(ref x) if x == &client.block_hash(1).unwrap().unwrap() ); } @@ -197,18 +189,16 @@ fn should_notify_about_latest_block() { let (subscriber, id, transport) = Subscriber::new_test("test"); { - let api = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(Arc::new(remote)), - }; + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); api.subscribe_new_heads(Default::default(), subscriber); // assert id assigned assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); - let builder = api.client.new_block(Default::default()).unwrap(); - api.client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + let builder = client.new_block(Default::default()).unwrap(); + client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); } // assert initial head sent. @@ -228,19 +218,17 @@ fn should_notify_about_finalized_block() { let (subscriber, id, transport) = Subscriber::new_test("test"); { - let api = Chain { - client: Arc::new(test_client::new()), - subscriptions: Subscriptions::new(Arc::new(remote)), - }; + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); api.subscribe_finalized_heads(Default::default(), subscriber); // assert id assigned assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); - let builder = api.client.new_block(Default::default()).unwrap(); - api.client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); - api.client.finalize_block(BlockId::number(1), None).unwrap(); + let builder = client.new_block(Default::default()).unwrap(); + client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + client.finalize_block(BlockId::number(1), None).unwrap(); } // assert initial head sent. diff --git a/core/rpc/src/state/mod.rs b/core/rpc/src/state/mod.rs index 8e44275f1e96e..390f95ab41dff 100644 --- a/core/rpc/src/state/mod.rs +++ b/core/rpc/src/state/mod.rs @@ -16,357 +16,216 @@ //! Substrate state API. +mod state_full; +mod state_light; + #[cfg(test)] mod tests; -use std::{ - collections::{BTreeMap, HashMap}, - ops::Range, - sync::Arc, -}; +use std::sync::Arc; use futures03::{future, StreamExt as _, TryStreamExt as _}; +use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; +use log::warn; +use rpc::{ + Result as RpcResult, + futures::{stream, Future, Sink, Stream}, +}; -use client::{self, Client, CallExecutor, BlockchainEvents, runtime_api::Metadata}; -use rpc::Result as RpcResult; -use rpc::futures::{stream, Future, Sink, Stream}; use api::Subscriptions; -use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; -use log::{warn, trace}; -use primitives::hexdisplay::HexDisplay; -use primitives::storage::{self, StorageKey, StorageData, StorageChangeSet}; -use primitives::{H256, Blake2Hasher, Bytes}; -use sr_primitives::generic::BlockId; -use sr_primitives::traits::{ - Block as BlockT, Header, ProvideRuntimeApi, NumberFor, - SaturatedConversion +use client::{ + BlockchainEvents, Client, CallExecutor, + runtime_api::Metadata, + light::{blockchain::RemoteBlockchain, fetcher::Fetcher}, +}; +use primitives::{ + Blake2Hasher, Bytes, H256, + storage::{well_known_keys, StorageKey, StorageData, StorageChangeSet}, }; use runtime_version::RuntimeVersion; -use self::error::{Error, Result}; -use state_machine::{self, ExecutionStrategy}; - -pub use api::state::*; - -/// State API with subscriptions support. -pub struct State { - /// Substrate client. - client: Arc>, - /// Current subscriptions. - subscriptions: Subscriptions, -} +use sr_primitives::{ + generic::BlockId, + traits::{Block as BlockT, ProvideRuntimeApi}, +}; -/// Ranges to query in state_queryStorage. -struct QueryStorageRange { - /// Hashes of all the blocks in the range. - pub hashes: Vec, - /// Number of the first block in the range. - pub first_number: NumberFor, - /// Blocks subrange ([begin; end) indices within `hashes`) where we should read keys at - /// each state to get changes. - pub unfiltered_range: Range, - /// Blocks subrange ([begin; end) indices within `hashes`) where we could pre-filter - /// blocks-with-changes by using changes tries. - pub filtered_range: Option>, -} +use self::error::{Error, FutureResult}; -fn client_err(err: client::error::Error) -> Error { - Error::Client(Box::new(err)) -} +pub use api::state::*; -impl State where - Block: BlockT, - B: client::backend::Backend, - E: CallExecutor, +/// State backend API. +pub trait StateBackend: Send + Sync + 'static + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: client::CallExecutor + Send + Sync + 'static, + RA: Send + Sync + 'static, { - /// Create new State API RPC handler. - pub fn new(client: Arc>, subscriptions: Subscriptions) -> Self { - Self { - client, - subscriptions, - } - } + /// Get client reference. + fn client(&self) -> &Arc>; - /// Splits the `query_storage` block range into 'filtered' and 'unfiltered' subranges. - /// Blocks that contain changes within filtered subrange could be filtered using changes tries. - /// Blocks that contain changes within unfiltered subrange must be filtered manually. - fn split_query_storage_range( - &self, - from: Block::Hash, - to: Option - ) -> Result> { - let to = self.unwrap_or_best(to)?; - let from_hdr = self.client.header(&BlockId::hash(from)).map_err(client_err)?; - let to_hdr = self.client.header(&BlockId::hash(to)).map_err(client_err)?; - match (from_hdr, to_hdr) { - (Some(ref from), Some(ref to)) if from.number() <= to.number() => { - // check if we can get from `to` to `from` by going through parent_hashes. - let from_number = *from.number(); - let blocks = { - let mut blocks = vec![to.hash()]; - let mut last = to.clone(); - while *last.number() > from_number { - let hdr = self.client - .header(&BlockId::hash(*last.parent_hash())) - .map_err(client_err)?; - if let Some(hdr) = hdr { - blocks.push(hdr.hash()); - last = hdr; - } else { - return Err(invalid_block_range( - Some(from), - Some(to), - format!("Parent of {} ({}) not found", last.number(), last.hash()), - )) - } - } - if last.hash() != from.hash() { - return Err(invalid_block_range( - Some(from), - Some(to), - format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()), - )) - } - blocks.reverse(); - blocks - }; - // check if we can filter blocks-with-changes from some (sub)range using changes tries - let changes_trie_range = self.client - .max_key_changes_range(from_number, BlockId::Hash(to.hash())) - .map_err(client_err)?; - let filtered_range_begin = changes_trie_range.map(|(begin, _)| (begin - from_number).saturated_into::()); - let (unfiltered_range, filtered_range) = split_range(blocks.len(), filtered_range_begin); - Ok(QueryStorageRange { - hashes: blocks, - first_number: from_number, - unfiltered_range, - filtered_range, - }) - }, - (from, to) => Err( - invalid_block_range(from.as_ref(), to.as_ref(), "Invalid range or unknown block".into()) - ), - } - } + /// Get subscriptions reference. + fn subscriptions(&self) -> &Subscriptions; - /// Iterates through range.unfiltered_range and check each block for changes of keys' values. - fn query_storage_unfiltered( + /// Call runtime method at given block. + fn call( &self, - range: &QueryStorageRange, - keys: &[StorageKey], - last_values: &mut HashMap>, - changes: &mut Vec>, - ) -> Result<()> { - for block in range.unfiltered_range.start..range.unfiltered_range.end { - let block_hash = range.hashes[block].clone(); - let mut block_changes = StorageChangeSet { block: block_hash.clone(), changes: Vec::new() }; - let id = BlockId::hash(block_hash); - for key in keys { - let (has_changed, data) = { - let curr_data = self.client.storage(&id, key) - .map_err(client_err)?; - match last_values.get(key) { - Some(prev_data) => (curr_data != *prev_data, curr_data), - None => (true, curr_data), - } - }; - if has_changed { - block_changes.changes.push((key.clone(), data.clone())); - } - last_values.insert(key.clone(), data); - } - if !block_changes.changes.is_empty() { - changes.push(block_changes); - } - } - Ok(()) - } + block: Option, + method: String, + call_data: Bytes, + ) -> FutureResult; - /// Iterates through all blocks that are changing keys within range.filtered_range and collects these changes. - fn query_storage_filtered( + /// Returns the keys with prefix, leave empty to get all the keys. + fn storage_keys( &self, - range: &QueryStorageRange, - keys: &[StorageKey], - last_values: &HashMap>, - changes: &mut Vec>, - ) -> Result<()> { - let (begin, end) = match range.filtered_range { - Some(ref filtered_range) => ( - range.first_number + filtered_range.start.saturated_into(), - BlockId::Hash(range.hashes[filtered_range.end - 1].clone()) - ), - None => return Ok(()), - }; - let mut changes_map: BTreeMap, StorageChangeSet> = BTreeMap::new(); - for key in keys { - let mut last_block = None; - let mut last_value = last_values.get(key).cloned().unwrap_or_default(); - for (block, _) in self.client.key_changes(begin, end, key).map_err(client_err)?.into_iter().rev() { - if last_block == Some(block) { - continue; - } - - let block_hash = range.hashes[(block - range.first_number).saturated_into::()].clone(); - let id = BlockId::Hash(block_hash); - let value_at_block = self.client.storage(&id, key).map_err(client_err)?; - if last_value == value_at_block { - continue; - } - - changes_map.entry(block) - .or_insert_with(|| StorageChangeSet { block: block_hash, changes: Vec::new() }) - .changes.push((key.clone(), value_at_block.clone())); - last_block = Some(block); - last_value = value_at_block; - } - } - if let Some(additional_capacity) = changes_map.len().checked_sub(changes.len()) { - changes.reserve(additional_capacity); - } - changes.extend(changes_map.into_iter().map(|(_, cs)| cs)); - Ok(()) - } -} + block: Option, + prefix: StorageKey, + ) -> FutureResult>; -impl State where - Block: BlockT, - B: client::backend::Backend, - E: CallExecutor, -{ - fn unwrap_or_best(&self, hash: Option) -> Result { - crate::helpers::unwrap_or_else(|| Ok(self.client.info().chain.best_hash), hash) - } -} - -impl StateApi for State where - Block: BlockT + 'static, - B: client::backend::Backend + Send + Sync + 'static, - E: CallExecutor + Send + Sync + 'static + Clone, - RA: Send + Sync + 'static, - Client: ProvideRuntimeApi, - as ProvideRuntimeApi>::Api: Metadata -{ - type Metadata = crate::metadata::Metadata; - - fn call(&self, method: String, data: Bytes, block: Option) -> Result { - let block = self.unwrap_or_best(block)?; - trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data.0)); - let return_data = self.client - .executor() - .call( - &BlockId::Hash(block), - &method, &data.0, ExecutionStrategy::NativeElseWasm, state_machine::NeverOffchainExt::new(), - ) - .map_err(client_err)?; - Ok(Bytes(return_data)) - } - - fn storage_keys(&self, key_prefix: StorageKey, block: Option) -> Result> { - let block = self.unwrap_or_best(block)?; - trace!(target: "rpc", "Querying storage keys at {:?}", block); - Ok(self.client.storage_keys(&BlockId::Hash(block), &key_prefix).map_err(client_err)?) - } - - fn storage(&self, key: StorageKey, block: Option) -> Result> { - let block = self.unwrap_or_best(block)?; - trace!(target: "rpc", "Querying storage at {:?} for key {}", block, HexDisplay::from(&key.0)); - Ok(self.client.storage(&BlockId::Hash(block), &key).map_err(client_err)?) - } - - fn storage_hash(&self, key: StorageKey, block: Option) -> Result> { - let block = self.unwrap_or_best(block)?; - trace!(target: "rpc", "Querying storage hash at {:?} for key {}", block, HexDisplay::from(&key.0)); - Ok(self.client.storage_hash(&BlockId::Hash(block), &key).map_err(client_err)?) - } + /// Returns a storage entry at a specific block's state. + fn storage( + &self, + block: Option, + key: StorageKey, + ) -> FutureResult>; - fn storage_size(&self, key: StorageKey, block: Option) -> Result> { - Ok(self.storage(key, block)?.map(|x| x.0.len() as u64)) - } + /// Returns the hash of a storage entry at a block's state. + fn storage_hash( + &self, + block: Option, + key: StorageKey, + ) -> FutureResult>; - fn child_storage( + /// Returns the size of a storage entry at a block's state. + fn storage_size( &self, - child_storage_key: StorageKey, + block: Option, key: StorageKey, - block: Option - ) -> Result> { - let block = self.unwrap_or_best(block)?; - trace!(target: "rpc", "Querying child storage at {:?} for key {}", block, HexDisplay::from(&key.0)); - Ok(self.client - .child_storage(&BlockId::Hash(block), &child_storage_key, &key) - .map_err(client_err)? - ) + ) -> FutureResult> { + Box::new(self.storage(block, key) + .map(|x| x.map(|x| x.0.len() as u64))) } + /// Returns the keys with prefix from a child storage, leave empty to get all the keys fn child_storage_keys( &self, + block: Option, child_storage_key: StorageKey, - key_prefix: StorageKey, - block: Option - ) -> Result> { - let block = self.unwrap_or_best(block)?; - trace!(target: "rpc", "Querying child storage keys at {:?}", block); - Ok(self.client - .child_storage_keys(&BlockId::Hash(block), &child_storage_key, &key_prefix) - .map_err(client_err)? - ) - } + prefix: StorageKey, + ) -> FutureResult>; + /// Returns a child storage entry at a specific block's state. + fn child_storage( + &self, + block: Option, + child_storage_key: StorageKey, + key: StorageKey, + ) -> FutureResult>; + + /// Returns the hash of a child storage entry at a block's state. fn child_storage_hash( &self, + block: Option, child_storage_key: StorageKey, key: StorageKey, - block: Option - ) -> Result> { - let block = self.unwrap_or_best(block)?; - trace!( - target: "rpc", "Querying child storage hash at {:?} for key {}", - block, - HexDisplay::from(&key.0), - ); - Ok(self.client - .child_storage_hash(&BlockId::Hash(block), &child_storage_key, &key) - .map_err(client_err)? - ) - } + ) -> FutureResult>; + /// Returns the size of a child storage entry at a block's state. fn child_storage_size( &self, + block: Option, child_storage_key: StorageKey, key: StorageKey, - block: Option - ) -> Result> { - Ok(self.child_storage(child_storage_key, key, block)?.map(|x| x.0.len() as u64)) + ) -> FutureResult> { + Box::new(self.child_storage(block, child_storage_key, key) + .map(|x| x.map(|x| x.0.len() as u64))) } - fn metadata(&self, block: Option) -> Result { - let block = self.unwrap_or_best(block)?; - self.client - .runtime_api() - .metadata(&BlockId::Hash(block)) - .map(Into::into) - .map_err(client_err) - } + /// Returns the runtime metadata as an opaque blob. + fn metadata(&self, block: Option) -> FutureResult; + + /// Get the runtime version. + fn runtime_version(&self, block: Option) -> FutureResult; + /// Query historical storage entries (by key) starting from a block given as the second parameter. + /// + /// NOTE This first returned result contains the initial state of storage for all keys. + /// Subsequent values in the vector represent changes to the previous state (diffs). fn query_storage( &self, - keys: Vec, from: Block::Hash, - to: Option - ) -> Result>> { - let range = self.split_query_storage_range(from, to)?; - let mut changes = Vec::new(); - let mut last_values = HashMap::new(); - self.query_storage_unfiltered(&range, &keys, &mut last_values, &mut changes)?; - self.query_storage_filtered(&range, &keys, &last_values, &mut changes)?; - Ok(changes) + to: Option, + keys: Vec, + ) -> FutureResult>>; + + /// New runtime version subscription + fn subscribe_runtime_version( + &self, + _meta: crate::metadata::Metadata, + subscriber: Subscriber, + ) { + let stream = match self.client().storage_changes_notification_stream( + Some(&[StorageKey(well_known_keys::CODE.to_vec())]), + None, + ) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(Error::from(client_err(err)).into()); + return; + } + }; + + self.subscriptions().add(subscriber, |sink| { + let version = self.runtime_version(None.into()) + .map_err(Into::into) + .wait(); + + let client = self.client().clone(); + let mut previous_version = version.clone(); + + let stream = stream + .filter_map(move |_| { + let info = client.info(); + let version = client + .runtime_version_at(&BlockId::hash(info.chain.best_hash)) + .map_err(client_err) + .map_err(Into::into); + if previous_version != version { + previous_version = version.clone(); + future::ready(Some(Ok::<_, ()>(version))) + } else { + future::ready(None) + } + }) + .compat(); + + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all( + stream::iter_result(vec![Ok(version)]) + .chain(stream) + ) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }); } + /// Unsubscribe from runtime version subscription + fn unsubscribe_runtime_version( + &self, + _meta: Option, + id: SubscriptionId, + ) -> RpcResult { + Ok(self.subscriptions().cancel(id)) + } + + /// New storage subscription fn subscribe_storage( &self, - _meta: Self::Metadata, + _meta: crate::metadata::Metadata, subscriber: Subscriber>, keys: Option> ) { let keys = Into::>>::into(keys); - let stream = match self.client.storage_changes_notification_stream( + let stream = match self.client().storage_changes_notification_stream( keys.as_ref().map(|x| &**x), None ) { @@ -380,18 +239,19 @@ impl StateApi for State where // initial values let initial = stream::iter_result(keys .map(|keys| { - let block = self.client.info().chain.best_hash; + let block = self.client().info().chain.best_hash; let changes = keys .into_iter() - .map(|key| self.storage(key.clone(), Some(block.clone()).into()) + .map(|key| self.storage(Some(block.clone()).into(), key.clone()) .map(|val| (key.clone(), val)) + .wait() .unwrap_or_else(|_| (key, None)) ) .collect(); vec![Ok(Ok(StorageChangeSet { block, changes }))] }).unwrap_or_default()); - self.subscriptions.add(subscriber, |sink| { + self.subscriptions().add(subscriber, |sink| { let stream = stream .map(|(block, changes)| Ok::<_, ()>(Ok(StorageChangeSet { block, @@ -410,96 +270,175 @@ impl StateApi for State where }) } - fn unsubscribe_storage(&self, _meta: Option, id: SubscriptionId) -> RpcResult { - Ok(self.subscriptions.cancel(id)) + /// Unsubscribe from storage subscription + fn unsubscribe_storage( + &self, + _meta: Option, + id: SubscriptionId, + ) -> RpcResult { + Ok(self.subscriptions().cancel(id)) } +} - fn runtime_version(&self, at: Option) -> Result { - let at = self.unwrap_or_best(at)?; - Ok(self.client.runtime_version_at(&BlockId::Hash(at)).map_err(client_err)?) +/// Create new state API that works on full node. +pub fn new_full( + client: Arc>, + subscriptions: Subscriptions, +) -> State + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: Metadata, +{ + State { + backend: Box::new(self::state_full::FullState::new(client, subscriptions)), } +} - fn subscribe_runtime_version(&self, _meta: Self::Metadata, subscriber: Subscriber) { - let stream = match self.client.storage_changes_notification_stream( - Some(&[StorageKey(storage::well_known_keys::CODE.to_vec())]), - None, - ) { - Ok(stream) => stream, - Err(err) => { - let _ = subscriber.reject(client_err(err).into()); - return; - } - }; +/// Create new state API that works on light node. +pub fn new_light>( + client: Arc>, + subscriptions: Subscriptions, + remote_blockchain: Arc>, + fetcher: Arc, +) -> State + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, + F: Send + Sync + 'static, +{ + State { + backend: Box::new(self::state_light::LightState::new( + client, + subscriptions, + remote_blockchain, + fetcher, + )), + } +} - self.subscriptions.add(subscriber, |sink| { - let version = self.runtime_version(None.into()) - .map_err(Into::into); +/// State API with subscriptions support. +pub struct State { + backend: Box>, +} - let client = self.client.clone(); - let mut previous_version = version.clone(); +impl StateApi for State + where + Block: BlockT + 'static, + B: client::backend::Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, +{ + type Metadata = crate::metadata::Metadata; - let stream = stream - .filter_map(move |_| { - let info = client.info(); - let version = client - .runtime_version_at(&BlockId::hash(info.chain.best_hash)) - .map_err(client_err) - .map_err(Into::into); - if previous_version != version { - previous_version = version.clone(); - future::ready(Some(Ok::<_, ()>(version))) - } else { - future::ready(None) - } - }) - .compat(); + fn call(&self, method: String, data: Bytes, block: Option) -> FutureResult { + self.backend.call(block, method, data) + } - sink - .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) - .send_all( - stream::iter_result(vec![Ok(version)]) - .chain(stream) - ) - // we ignore the resulting Stream (if the first stream is over we are unsubscribed) - .map(|_| ()) - }); + fn storage_keys( + &self, + key_prefix: StorageKey, + block: Option, + ) -> FutureResult> { + self.backend.storage_keys(block, key_prefix) } - fn unsubscribe_runtime_version(&self, _meta: Option, id: SubscriptionId) -> RpcResult { - Ok(self.subscriptions.cancel(id)) + fn storage(&self, key: StorageKey, block: Option) -> FutureResult> { + self.backend.storage(block, key) } -} -/// Splits passed range into two subranges where: -/// - first range has at least one element in it; -/// - second range (optionally) starts at given `middle` element. -pub(crate) fn split_range(size: usize, middle: Option) -> (Range, Option>) { - // check if we can filter blocks-with-changes from some (sub)range using changes tries - let range2_begin = match middle { - // some of required changes tries are pruned => use available tries - Some(middle) if middle != 0 => Some(middle), - // all required changes tries are available, but we still want values at first block - // => do 'unfiltered' read for the first block and 'filtered' for the rest - Some(_) if size > 1 => Some(1), - // range contains single element => do not use changes tries - Some(_) => None, - // changes tries are not available => do 'unfiltered' read for the whole range - None => None, - }; - let range1 = 0..range2_begin.unwrap_or(size); - let range2 = range2_begin.map(|begin| begin..size); - (range1, range2) -} + fn storage_hash(&self, key: StorageKey, block: Option) -> FutureResult> { + self.backend.storage_hash(block, key) + } + + fn storage_size(&self, key: StorageKey, block: Option) -> FutureResult> { + self.backend.storage_size(block, key) + } + + fn child_storage( + &self, + child_storage_key: StorageKey, + key: StorageKey, + block: Option + ) -> FutureResult> { + self.backend.child_storage(block, child_storage_key, key) + } + + fn child_storage_keys( + &self, + child_storage_key: StorageKey, + key_prefix: StorageKey, + block: Option + ) -> FutureResult> { + self.backend.child_storage_keys(block, child_storage_key, key_prefix) + } + + fn child_storage_hash( + &self, + child_storage_key: StorageKey, + key: StorageKey, + block: Option + ) -> FutureResult> { + self.backend.child_storage_hash(block, child_storage_key, key) + } + + fn child_storage_size( + &self, + child_storage_key: StorageKey, + key: StorageKey, + block: Option + ) -> FutureResult> { + self.backend.child_storage_size(block, child_storage_key, key) + } -fn invalid_block_range(from: Option<&H>, to: Option<&H>, reason: String) -> error::Error { - let to_string = |x: Option<&H>| match x { - None => "unknown hash".into(), - Some(h) => format!("{} ({})", h.number(), h.hash()), - }; + fn metadata(&self, block: Option) -> FutureResult { + self.backend.metadata(block) + } - error::Error::InvalidBlockRange { - from: to_string(from), - to: to_string(to), - details: reason, + fn query_storage( + &self, + keys: Vec, + from: Block::Hash, + to: Option + ) -> FutureResult>> { + self.backend.query_storage(from, to, keys) } + + fn subscribe_storage( + &self, + meta: Self::Metadata, + subscriber: Subscriber>, + keys: Option> + ) { + self.backend.subscribe_storage(meta, subscriber, keys); + } + + fn unsubscribe_storage(&self, meta: Option, id: SubscriptionId) -> RpcResult { + self.backend.unsubscribe_storage(meta, id) + } + + fn runtime_version(&self, at: Option) -> FutureResult { + self.backend.runtime_version(at) + } + + fn subscribe_runtime_version(&self, meta: Self::Metadata, subscriber: Subscriber) { + self.backend.subscribe_runtime_version(meta, subscriber); + } + + fn unsubscribe_runtime_version( + &self, + meta: Option, + id: SubscriptionId, + ) -> RpcResult { + self.backend.unsubscribe_runtime_version(meta, id) + } +} + +fn client_err(err: client::error::Error) -> Error { + Error::Client(Box::new(err)) } diff --git a/core/rpc/src/state/state_full.rs b/core/rpc/src/state/state_full.rs new file mode 100644 index 0000000000000..1f780d3e114da --- /dev/null +++ b/core/rpc/src/state/state_full.rs @@ -0,0 +1,389 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! State API backend for full nodes. + +use std::collections::{BTreeMap, HashMap}; +use std::sync::Arc; +use std::ops::Range; +use rpc::futures::future::result; + +use api::Subscriptions; +use client::{ + Client, CallExecutor, runtime_api::Metadata, + backend::Backend, error::Result as ClientResult, +}; +use primitives::{ + H256, Blake2Hasher, Bytes, + storage::{StorageKey, StorageData, StorageChangeSet}, +}; +use runtime_version::RuntimeVersion; +use state_machine::{NeverOffchainExt, ExecutionStrategy}; +use sr_primitives::{ + generic::BlockId, + traits::{Block as BlockT, Header, NumberFor, ProvideRuntimeApi, SaturatedConversion}, +}; + +use super::{StateBackend, error::{FutureResult, Error, Result}, client_err}; + +/// Ranges to query in state_queryStorage. +struct QueryStorageRange { + /// Hashes of all the blocks in the range. + pub hashes: Vec, + /// Number of the first block in the range. + pub first_number: NumberFor, + /// Blocks subrange ([begin; end) indices within `hashes`) where we should read keys at + /// each state to get changes. + pub unfiltered_range: Range, + /// Blocks subrange ([begin; end) indices within `hashes`) where we could pre-filter + /// blocks-with-changes by using changes tries. + pub filtered_range: Option>, +} + +pub struct FullState { + client: Arc>, + subscriptions: Subscriptions, +} + +impl FullState + where + Block: BlockT + 'static, + B: Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, +{ + /// + pub fn new(client: Arc>, subscriptions: Subscriptions) -> Self { + Self { client, subscriptions } + } + + /// Returns given block hash or best block hash if None is passed. + fn block_or_best(&self, hash: Option) -> ClientResult { + crate::helpers::unwrap_or_else(|| Ok(self.client.info().chain.best_hash), hash) + } + + /// Splits the `query_storage` block range into 'filtered' and 'unfiltered' subranges. + /// Blocks that contain changes within filtered subrange could be filtered using changes tries. + /// Blocks that contain changes within unfiltered subrange must be filtered manually. + fn split_query_storage_range( + &self, + from: Block::Hash, + to: Option + ) -> Result> { + let to = self.block_or_best(to).map_err(client_err)?; + let from_hdr = self.client.header(&BlockId::hash(from)).map_err(client_err)?; + let to_hdr = self.client.header(&BlockId::hash(to)).map_err(client_err)?; + match (from_hdr, to_hdr) { + (Some(ref from), Some(ref to)) if from.number() <= to.number() => { + // check if we can get from `to` to `from` by going through parent_hashes. + let from_number = *from.number(); + let blocks = { + let mut blocks = vec![to.hash()]; + let mut last = to.clone(); + while *last.number() > from_number { + let hdr = self.client + .header(&BlockId::hash(*last.parent_hash())) + .map_err(client_err)?; + if let Some(hdr) = hdr { + blocks.push(hdr.hash()); + last = hdr; + } else { + return Err(invalid_block_range( + Some(from), + Some(to), + format!("Parent of {} ({}) not found", last.number(), last.hash()), + )) + } + } + if last.hash() != from.hash() { + return Err(invalid_block_range( + Some(from), + Some(to), + format!("Expected to reach `from`, got {} ({})", last.number(), last.hash()), + )) + } + blocks.reverse(); + blocks + }; + // check if we can filter blocks-with-changes from some (sub)range using changes tries + let changes_trie_range = self.client + .max_key_changes_range(from_number, BlockId::Hash(to.hash())) + .map_err(client_err)?; + let filtered_range_begin = changes_trie_range + .map(|(begin, _)| (begin - from_number).saturated_into::()); + let (unfiltered_range, filtered_range) = split_range(blocks.len(), filtered_range_begin); + Ok(QueryStorageRange { + hashes: blocks, + first_number: from_number, + unfiltered_range, + filtered_range, + }) + }, + (from, to) => Err( + invalid_block_range(from.as_ref(), to.as_ref(), "Invalid range or unknown block".into()) + ), + } + } + + /// Iterates through range.unfiltered_range and check each block for changes of keys' values. + fn query_storage_unfiltered( + &self, + range: &QueryStorageRange, + keys: &[StorageKey], + last_values: &mut HashMap>, + changes: &mut Vec>, + ) -> Result<()> { + for block in range.unfiltered_range.start..range.unfiltered_range.end { + let block_hash = range.hashes[block].clone(); + let mut block_changes = StorageChangeSet { block: block_hash.clone(), changes: Vec::new() }; + let id = BlockId::hash(block_hash); + for key in keys { + let (has_changed, data) = { + let curr_data = self.client.storage(&id, key).map_err(client_err)?; + match last_values.get(key) { + Some(prev_data) => (curr_data != *prev_data, curr_data), + None => (true, curr_data), + } + }; + if has_changed { + block_changes.changes.push((key.clone(), data.clone())); + } + last_values.insert(key.clone(), data); + } + if !block_changes.changes.is_empty() { + changes.push(block_changes); + } + } + Ok(()) + } + + /// Iterates through all blocks that are changing keys within range.filtered_range and collects these changes. + fn query_storage_filtered( + &self, + range: &QueryStorageRange, + keys: &[StorageKey], + last_values: &HashMap>, + changes: &mut Vec>, + ) -> Result<()> { + let (begin, end) = match range.filtered_range { + Some(ref filtered_range) => ( + range.first_number + filtered_range.start.saturated_into(), + BlockId::Hash(range.hashes[filtered_range.end - 1].clone()) + ), + None => return Ok(()), + }; + let mut changes_map: BTreeMap, StorageChangeSet> = BTreeMap::new(); + for key in keys { + let mut last_block = None; + let mut last_value = last_values.get(key).cloned().unwrap_or_default(); + let key_changes = self.client.key_changes(begin, end, key).map_err(client_err)?; + for (block, _) in key_changes.into_iter().rev() { + if last_block == Some(block) { + continue; + } + + let block_hash = range.hashes[(block - range.first_number).saturated_into::()].clone(); + let id = BlockId::Hash(block_hash); + let value_at_block = self.client.storage(&id, key).map_err(client_err)?; + if last_value == value_at_block { + continue; + } + + changes_map.entry(block) + .or_insert_with(|| StorageChangeSet { block: block_hash, changes: Vec::new() }) + .changes.push((key.clone(), value_at_block.clone())); + last_block = Some(block); + last_value = value_at_block; + } + } + if let Some(additional_capacity) = changes_map.len().checked_sub(changes.len()) { + changes.reserve(additional_capacity); + } + changes.extend(changes_map.into_iter().map(|(_, cs)| cs)); + Ok(()) + } +} + +impl StateBackend for FullState + where + Block: BlockT + 'static, + B: Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, + Client: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: Metadata, +{ + fn client(&self) -> &Arc> { + &self.client + } + + fn subscriptions(&self) -> &Subscriptions { + &self.subscriptions + } + + fn call( + &self, + block: Option, + method: String, + call_data: Bytes, + ) -> FutureResult { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.executor() + .call( + &BlockId::Hash(block), + &method, + &*call_data, + ExecutionStrategy::NativeElseWasm, + NeverOffchainExt::new(), + ) + .map(Into::into)) + .map_err(client_err))) + } + + fn storage_keys( + &self, + block: Option, + prefix: StorageKey, + ) -> FutureResult> { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.storage_keys(&BlockId::Hash(block), &prefix)) + .map_err(client_err))) + } + + fn storage( + &self, + block: Option, + key: StorageKey, + ) -> FutureResult> { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.storage(&BlockId::Hash(block), &key)) + .map_err(client_err))) + } + + fn storage_hash( + &self, + block: Option, + key: StorageKey, + ) -> FutureResult> { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.storage_hash(&BlockId::Hash(block), &key)) + .map_err(client_err))) + } + + fn child_storage_keys( + &self, + block: Option, + child_storage_key: StorageKey, + prefix: StorageKey, + ) -> FutureResult> { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.child_storage_keys(&BlockId::Hash(block), &child_storage_key, &prefix)) + .map_err(client_err))) + } + + fn child_storage( + &self, + block: Option, + child_storage_key: StorageKey, + key: StorageKey, + ) -> FutureResult> { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.child_storage(&BlockId::Hash(block), &child_storage_key, &key)) + .map_err(client_err))) + } + + fn child_storage_hash( + &self, + block: Option, + child_storage_key: StorageKey, + key: StorageKey, + ) -> FutureResult> { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.child_storage_hash(&BlockId::Hash(block), &child_storage_key, &key)) + .map_err(client_err))) + } + + fn metadata(&self, block: Option) -> FutureResult { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.runtime_api().metadata(&BlockId::Hash(block)).map(Into::into)) + .map_err(client_err))) + } + + fn runtime_version(&self, block: Option) -> FutureResult { + Box::new(result( + self.block_or_best(block) + .and_then(|block| self.client.runtime_version_at(&BlockId::Hash(block))) + .map_err(client_err))) + } + + fn query_storage( + &self, + from: Block::Hash, + to: Option, + keys: Vec, + ) -> FutureResult>> { + let call_fn = move || { + let range = self.split_query_storage_range(from, to)?; + let mut changes = Vec::new(); + let mut last_values = HashMap::new(); + self.query_storage_unfiltered(&range, &keys, &mut last_values, &mut changes)?; + self.query_storage_filtered(&range, &keys, &last_values, &mut changes)?; + Ok(changes) + }; + Box::new(result(call_fn())) + } +} + +/// Splits passed range into two subranges where: +/// - first range has at least one element in it; +/// - second range (optionally) starts at given `middle` element. +pub(crate) fn split_range(size: usize, middle: Option) -> (Range, Option>) { + // check if we can filter blocks-with-changes from some (sub)range using changes tries + let range2_begin = match middle { + // some of required changes tries are pruned => use available tries + Some(middle) if middle != 0 => Some(middle), + // all required changes tries are available, but we still want values at first block + // => do 'unfiltered' read for the first block and 'filtered' for the rest + Some(_) if size > 1 => Some(1), + // range contains single element => do not use changes tries + Some(_) => None, + // changes tries are not available => do 'unfiltered' read for the whole range + None => None, + }; + let range1 = 0..range2_begin.unwrap_or(size); + let range2 = range2_begin.map(|begin| begin..size); + (range1, range2) +} + +fn invalid_block_range(from: Option<&H>, to: Option<&H>, reason: String) -> Error { + let to_string = |x: Option<&H>| match x { + None => "unknown hash".into(), + Some(h) => format!("{} ({})", h.number(), h.hash()), + }; + + Error::InvalidBlockRange { + from: to_string(from), + to: to_string(to), + details: reason, + } +} diff --git a/core/rpc/src/state/state_light.rs b/core/rpc/src/state/state_light.rs new file mode 100644 index 0000000000000..eb14b3fe7bd0b --- /dev/null +++ b/core/rpc/src/state/state_light.rs @@ -0,0 +1,283 @@ +// Copyright 2019 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! State API backend for light nodes. + +use std::sync::Arc; +use codec::Decode; +use futures03::{future::{ready, Either}, FutureExt, TryFutureExt}; +use hash_db::Hasher; +use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId}; +use rpc::{ + Result as RpcResult, + futures::future::{result, Future}, +}; + +use api::Subscriptions; +use client::{ + Client, CallExecutor, backend::Backend, + error::Error as ClientError, + light::{ + blockchain::{future_header, RemoteBlockchain}, + fetcher::{Fetcher, RemoteCallRequest, RemoteReadRequest, RemoteReadChildRequest}, + }, +}; +use primitives::{ + H256, Blake2Hasher, Bytes, OpaqueMetadata, + storage::{StorageKey, StorageData, StorageChangeSet}, +}; +use runtime_version::RuntimeVersion; +use sr_primitives::{ + generic::BlockId, + traits::{Block as BlockT, Header as HeaderT}, +}; + +use super::{StateBackend, error::{FutureResult, Error}, client_err}; + +pub struct LightState, B, E, RA> { + client: Arc>, + subscriptions: Subscriptions, + remote_blockchain: Arc>, + fetcher: Arc, +} + +impl + 'static, B, E, RA> LightState + where + Block: BlockT, + B: Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, +{ + /// + pub fn new( + client: Arc>, + subscriptions: Subscriptions, + remote_blockchain: Arc>, + fetcher: Arc, + ) -> Self { + Self { client, subscriptions, remote_blockchain, fetcher, } + } + + /// Returns given block hash or best block hash if None is passed. + fn block_or_best(&self, hash: Option) -> Block::Hash { + hash.unwrap_or_else(|| self.client.info().chain.best_hash) + } + + /// Resolve header by hash. + fn resolve_header( + &self, + block: Option, + ) -> impl std::future::Future> { + let block = self.block_or_best(block); + let maybe_header = future_header( + &*self.remote_blockchain, + &*self.fetcher, + BlockId::Hash(block), + ); + + maybe_header.then(move |result| + ready(result.and_then(|maybe_header| + maybe_header.ok_or(ClientError::UnknownBlock(format!("{}", block))) + ).map_err(client_err)), + ) + } +} + +impl StateBackend for LightState + where + Block: BlockT, + B: Backend + Send + Sync + 'static, + E: CallExecutor + Send + Sync + 'static + Clone, + RA: Send + Sync + 'static, + F: Fetcher + 'static +{ + fn client(&self) -> &Arc> { + &self.client + } + + fn subscriptions(&self) -> &Subscriptions { + &self.subscriptions + } + + fn call( + &self, + block: Option, + method: String, + call_data: Bytes, + ) -> FutureResult { + let fetcher = self.fetcher.clone(); + let call_result = self.resolve_header(block) + .then(move |result| match result { + Ok(header) => Either::Left(fetcher.remote_call(RemoteCallRequest { + block: header.hash(), + header, + method, + call_data: call_data.0, + retry_count: Default::default(), + }).then(|result| ready(result.map(Bytes).map_err(client_err)))), + Err(error) => Either::Right(ready(Err(error))), + }); + + Box::new(call_result.boxed().compat()) + } + + fn storage_keys( + &self, + _block: Option, + _prefix: StorageKey, + ) -> FutureResult> { + Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient)))) + } + + fn storage( + &self, + block: Option, + key: StorageKey, + ) -> FutureResult> { + let fetcher = self.fetcher.clone(); + let storage = self.resolve_header(block) + .then(move |result| match result { + Ok(header) => Either::Left(fetcher.remote_read(RemoteReadRequest { + block: header.hash(), + header, + key: key.0, + retry_count: Default::default(), + }).then(|result| ready(result.map(|data| data.map(StorageData)).map_err(client_err)))), + Err(error) => Either::Right(ready(Err(error))), + }); + + Box::new(storage.boxed().compat()) + } + + fn storage_hash( + &self, + block: Option, + key: StorageKey, + ) -> FutureResult> { + Box::new(self + .storage(block, key) + .and_then(|maybe_storage| + result(Ok(maybe_storage.map(|storage| Blake2Hasher::hash(&storage.0)))) + ) + ) + } + + fn child_storage_keys( + &self, + _block: Option, + _child_storage_key: StorageKey, + _prefix: StorageKey, + ) -> FutureResult> { + Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient)))) + } + + fn child_storage( + &self, + block: Option, + child_storage_key: StorageKey, + key: StorageKey, + ) -> FutureResult> { + let fetcher = self.fetcher.clone(); + let child_storage = self.resolve_header(block) + .then(move |result| match result { + Ok(header) => Either::Left(fetcher.remote_read_child(RemoteReadChildRequest { + block: header.hash(), + header, + storage_key: child_storage_key.0, + key: key.0, + retry_count: Default::default(), + }).then(|result| ready(result.map(|data| data.map(StorageData)).map_err(client_err)))), + Err(error) => Either::Right(ready(Err(error))), + }); + + Box::new(child_storage.boxed().compat()) + } + + fn child_storage_hash( + &self, + block: Option, + child_storage_key: StorageKey, + key: StorageKey, + ) -> FutureResult> { + Box::new(self + .child_storage(block, child_storage_key, key) + .and_then(|maybe_storage| + result(Ok(maybe_storage.map(|storage| Blake2Hasher::hash(&storage.0)))) + ) + ) + } + + fn metadata(&self, block: Option) -> FutureResult { + let metadata = self.call(block, "Metadata_metadata".into(), Bytes(Vec::new())) + .and_then(|metadata| OpaqueMetadata::decode(&mut &metadata.0[..]) + .map(Into::into) + .map_err(|decode_err| client_err(ClientError::CallResultDecode( + "Unable to decode metadata", + decode_err, + )))); + + Box::new(metadata) + } + + fn runtime_version(&self, block: Option) -> FutureResult { + let version = self.call(block, "Core_version".into(), Bytes(Vec::new())) + .and_then(|version| Decode::decode(&mut &version.0[..]) + .map_err(|_| client_err(ClientError::VersionInvalid)) + ); + + Box::new(version) + } + + fn query_storage( + &self, + _from: Block::Hash, + _to: Option, + _keys: Vec, + ) -> FutureResult>> { + Box::new(result(Err(client_err(ClientError::NotAvailableOnLightClient)))) + } + + fn subscribe_storage( + &self, + _meta: crate::metadata::Metadata, + _subscriber: Subscriber>, + _keys: Option> + ) { + } + + fn unsubscribe_storage( + &self, + _meta: Option, + _id: SubscriptionId, + ) -> RpcResult { + Ok(false) + } + + fn subscribe_runtime_version( + &self, + _meta: crate::metadata::Metadata, + _subscriber: Subscriber, + ) { + } + + fn unsubscribe_runtime_version( + &self, + _meta: Option, + _id: SubscriptionId, + ) -> RpcResult { + Ok(false) + } +} diff --git a/core/rpc/src/state/tests.rs b/core/rpc/src/state/tests.rs index 656abf56a965c..74738318ec3c8 100644 --- a/core/rpc/src/state/tests.rs +++ b/core/rpc/src/state/tests.rs @@ -15,9 +15,12 @@ // along with Substrate. If not, see . use super::*; +use super::state_full::split_range; use self::error::Error; +use std::sync::Arc; use assert_matches::assert_matches; +use futures::stream::Stream; use primitives::storage::well_known_keys; use sr_io::blake2_256; use test_client::{ @@ -36,20 +39,21 @@ fn should_return_storage() { .add_extra_storage(KEY.to_vec(), VALUE.to_vec()) .build(); let genesis_hash = client.genesis_hash(); - let client = State::new(Arc::new(client), Subscriptions::new(Arc::new(core.executor()))); + let client = new_full(Arc::new(client), Subscriptions::new(Arc::new(core.executor()))); let key = StorageKey(KEY.to_vec()); assert_eq!( - client.storage(key.clone(), Some(genesis_hash).into()) + client.storage(key.clone(), Some(genesis_hash).into()).wait() .map(|x| x.map(|x| x.0.len())).unwrap().unwrap() as usize, VALUE.len(), ); assert_matches!( - client.storage_hash(key.clone(), Some(genesis_hash).into()).map(|x| x.is_some()), + client.storage_hash(key.clone(), Some(genesis_hash).into()).wait() + .map(|x| x.is_some()), Ok(true) ); assert_eq!( - client.storage_size(key.clone(), None).unwrap().unwrap() as usize, + client.storage_size(key.clone(), None).wait().unwrap().unwrap() as usize, VALUE.len(), ); } @@ -61,22 +65,22 @@ fn should_return_child_storage() { .add_child_storage("test", "key", vec![42_u8]) .build()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, Subscriptions::new(Arc::new(core.executor()))); + let client = new_full(client, Subscriptions::new(Arc::new(core.executor()))); let child_key = StorageKey(well_known_keys::CHILD_STORAGE_KEY_PREFIX.iter().chain(b"test").cloned().collect()); let key = StorageKey(b"key".to_vec()); assert_matches!( - client.child_storage(child_key.clone(), key.clone(), Some(genesis_hash).into()), + client.child_storage(child_key.clone(), key.clone(), Some(genesis_hash).into()).wait(), Ok(Some(StorageData(ref d))) if d[0] == 42 && d.len() == 1 ); assert_matches!( client.child_storage_hash(child_key.clone(), key.clone(), Some(genesis_hash).into()) - .map(|x| x.is_some()), + .wait().map(|x| x.is_some()), Ok(true) ); assert_matches!( - client.child_storage_size(child_key.clone(), key.clone(), None), + client.child_storage_size(child_key.clone(), key.clone(), None).wait(), Ok(Some(1)) ); } @@ -86,10 +90,10 @@ fn should_call_contract() { let core = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); - let client = State::new(client, Subscriptions::new(Arc::new(core.executor()))); + let client = new_full(client, Subscriptions::new(Arc::new(core.executor()))); assert_matches!( - client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()), + client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(), Err(Error::Client(_)) ) } @@ -101,21 +105,22 @@ fn should_notify_about_storage_changes() { let (subscriber, id, transport) = Subscriber::new_test("test"); { - let api = State::new(Arc::new(test_client::new()), Subscriptions::new(Arc::new(remote))); + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); api.subscribe_storage(Default::default(), subscriber, None.into()); // assert id assigned assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); - let mut builder = api.client.new_block(Default::default()).unwrap(); + let mut builder = client.new_block(Default::default()).unwrap(); builder.push_transfer(runtime::Transfer { from: AccountKeyring::Alice.into(), to: AccountKeyring::Ferdie.into(), amount: 42, nonce: 0, }).unwrap(); - api.client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); } // assert notification sent to transport @@ -132,7 +137,8 @@ fn should_send_initial_storage_changes_and_notifications() { let (subscriber, id, transport) = Subscriber::new_test("test"); { - let api = State::new(Arc::new(test_client::new()), Subscriptions::new(Arc::new(remote))); + let client = Arc::new(test_client::new()); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(remote))); let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into())); @@ -143,14 +149,14 @@ fn should_send_initial_storage_changes_and_notifications() { // assert id assigned assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(1)))); - let mut builder = api.client.new_block(Default::default()).unwrap(); + let mut builder = client.new_block(Default::default()).unwrap(); builder.push_transfer(runtime::Transfer { from: AccountKeyring::Alice.into(), to: AccountKeyring::Ferdie.into(), amount: 42, nonce: 0, }).unwrap(); - api.client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + client.import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); } // assert initial values sent to transport @@ -167,7 +173,7 @@ fn should_send_initial_storage_changes_and_notifications() { fn should_query_storage() { fn run_tests(client: Arc) { let core = tokio::runtime::Runtime::new().unwrap(); - let api = State::new(client.clone(), Subscriptions::new(Arc::new(core.executor()))); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(core.executor()))); let add_block = |nonce| { let mut builder = client.new_block(Default::default()).unwrap(); @@ -219,7 +225,7 @@ fn should_query_storage() { Some(block1_hash).into(), ); - assert_eq!(result.unwrap(), expected); + assert_eq!(result.wait().unwrap(), expected); // Query all changes let result = api.query_storage( @@ -236,7 +242,7 @@ fn should_query_storage() { (StorageKey(vec![5]), Some(StorageData(vec![1]))), ], }); - assert_eq!(result.unwrap(), expected); + assert_eq!(result.wait().unwrap(), expected); } run_tests(Arc::new(test_client::new())); @@ -258,7 +264,7 @@ fn should_return_runtime_version() { let core = tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); - let api = State::new(client.clone(), Subscriptions::new(Arc::new(core.executor()))); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(core.executor()))); let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\ \"specVersion\":1,\"implVersion\":1,\"apis\":[[\"0xdf6acb689907609b\",2],\ @@ -266,7 +272,7 @@ fn should_return_runtime_version() { [\"0xc6e9a76309f39b09\",1],[\"0xdd718d5cc53262d4\",1],[\"0xcbca25e39f142387\",1],\ [\"0xf78b278be53f454c\",1],[\"0xab3c0572291feb8b\",1]]}"; - let runtime_version = api.runtime_version(None.into()).unwrap(); + let runtime_version = api.runtime_version(None.into()).wait().unwrap(); let serialized = serde_json::to_string(&runtime_version).unwrap(); assert_eq!(serialized, result); @@ -281,7 +287,7 @@ fn should_notify_on_runtime_version_initially() { { let client = Arc::new(test_client::new()); - let api = State::new(client.clone(), Subscriptions::new(Arc::new(core.executor()))); + let api = new_full(client.clone(), Subscriptions::new(Arc::new(core.executor()))); api.subscribe_runtime_version(Default::default(), subscriber); diff --git a/core/service/src/builder.rs b/core/service/src/builder.rs index fbcfa0f09db46..c675710e540a0 100644 --- a/core/service/src/builder.rs +++ b/core/service/src/builder.rs @@ -18,7 +18,10 @@ use crate::{NewService, NetworkStatus, NetworkState, error::{self, Error}, DEFAU use crate::{SpawnTaskHandle, start_rpc_servers, build_network_future, TransactionPoolAdapter}; use crate::TaskExecutor; use crate::config::Configuration; -use client::{BlockchainEvents, Client, runtime_api}; +use client::{ + BlockchainEvents, Client, runtime_api, + backend::RemoteBackend, light::blockchain::RemoteBlockchain, +}; use codec::{Decode, Encode, IoReader}; use consensus_common::import_queue::ImportQueue; use futures::{prelude::*, sync::mpsc}; @@ -58,7 +61,7 @@ use transaction_pool::txpool::{self, ChainApi, Pool as TransactionPool}; /// generics is done when you call `build`. /// pub struct ServiceBuilder + TNetP, TExPool, TRpc, TRpcB, Backend> { config: Configuration, client: Arc, @@ -72,10 +75,68 @@ pub struct ServiceBuilder, rpc_extensions: TRpc, + rpc_builder: TRpcB, marker: PhantomData<(TBl, TRtApi)>, } -impl ServiceBuilder<(), (), TCfg, TGen, (), (), (), (), (), (), (), (), (), ()> +/// Full client type. +type TFullClient = Client< + TFullBackend, + TFullCallExecutor, + TBl, + TRtApi, +>; + +/// Full client backend type. +type TFullBackend = client_db::Backend; + +/// Full client call executor type. +type TFullCallExecutor = client::LocalCallExecutor< + client_db::Backend, + NativeExecutor, +>; + +/// Light client type. +type TLightClient = Client< + TLightBackend, + TLightCallExecutor, + TBl, + TRtApi, +>; + +/// Light client backend type. +type TLightBackend = client::light::backend::Backend< + client_db::light::LightStorage, + network::OnDemand, + Blake2Hasher, +>; + +/// Light call executor type. +type TLightCallExecutor = client::light::call_executor::RemoteOrLocalCallExecutor< + TBl, + client::light::backend::Backend< + client_db::light::LightStorage, + network::OnDemand, + Blake2Hasher + >, + client::light::call_executor::RemoteCallExecutor< + client::light::blockchain::Blockchain< + client_db::light::LightStorage, + network::OnDemand + >, + network::OnDemand, + >, + client::LocalCallExecutor< + client::light::backend::Backend< + client_db::light::LightStorage, + network::OnDemand, + Blake2Hasher + >, + NativeExecutor + >, +>; + +impl ServiceBuilder<(), (), TCfg, TGen, (), (), (), (), (), (), (), (), (), (), ()> where TGen: Serialize + DeserializeOwned + BuildStorage { /// Start the service builder with a configuration. pub fn new_full, TRtApi, TExecDisp: NativeExecutionDispatch>( @@ -85,12 +146,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { TRtApi, TCfg, TGen, - Client< - client_db::Backend, - client::LocalCallExecutor, NativeExecutor>, - TBl, - TRtApi - >, + TFullClient, Arc>, (), (), @@ -99,7 +155,8 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { (), (), (), - client_db::Backend, + FullRpcBuilder, + TFullBackend, >, Error> { let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; @@ -124,6 +181,8 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { let client = Arc::new(client); + let rpc_builder = FullRpcBuilder { client: client.clone() }; + Ok(ServiceBuilder { config, client, @@ -137,6 +196,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), + rpc_builder, marker: PhantomData, }) } @@ -149,34 +209,7 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { TRtApi, TCfg, TGen, - Client< - client::light::backend::Backend, network::OnDemand, Blake2Hasher>, - client::light::call_executor::RemoteOrLocalCallExecutor< - TBl, - client::light::backend::Backend< - client_db::light::LightStorage, - network::OnDemand, - Blake2Hasher - >, - client::light::call_executor::RemoteCallExecutor< - client::light::blockchain::Blockchain< - client_db::light::LightStorage, - network::OnDemand - >, - network::OnDemand, - >, - client::LocalCallExecutor< - client::light::backend::Backend< - client_db::light::LightStorage, - network::OnDemand, - Blake2Hasher - >, - NativeExecutor - > - >, - TBl, - TRtApi - >, + TLightClient, Arc>, (), (), @@ -185,7 +218,8 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { (), (), (), - client::light::backend::Backend, network::OnDemand, Blake2Hasher>, + LightRpcBuilder, + TLightBackend, >, Error> { let keystore = Keystore::open(config.keystore_path.clone(), config.keystore_password.clone())?; @@ -205,11 +239,22 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { let fetch_checker = Arc::new(client::light::new_fetch_checker(light_blockchain.clone(), executor.clone())); let fetcher = Arc::new(network::OnDemand::new(fetch_checker)); let backend = client::light::new_light_backend(light_blockchain, fetcher.clone()); - let client = client::light::new_light(backend.clone(), fetcher.clone(), &config.chain_spec, executor)?; + let remote_blockchain = backend.remote_blockchain(); + let client = Arc::new(client::light::new_light( + backend.clone(), + fetcher.clone(), + &config.chain_spec, + executor, + )?); + let rpc_builder = LightRpcBuilder { + client: client.clone(), + remote_blockchain, + fetcher: fetcher.clone(), + }; Ok(ServiceBuilder { config, - client: Arc::new(client), + client, backend, keystore, fetcher: Some(fetcher), @@ -220,13 +265,15 @@ where TGen: Serialize + DeserializeOwned + BuildStorage { network_protocol: (), transaction_pool: Arc::new(()), rpc_extensions: Default::default(), + rpc_builder, marker: PhantomData, }) } } -impl - ServiceBuilder { +impl + ServiceBuilder { /// Returns a reference to the client that was stored in this builder. pub fn client(&self) -> &Arc { @@ -248,7 +295,7 @@ impl, &Arc) -> Result, Error> ) -> Result, Error> { + TNetP, TExPool, TRpc, TRpcB, Backend>, Error> { let select_chain = select_chain_builder(&self.config, &self.backend)?; Ok(ServiceBuilder { @@ -264,6 +311,7 @@ impl, &Arc) -> Result ) -> Result, Error> { + TNetP, TExPool, TRpc, TRpcB, Backend>, Error> { self.with_opt_select_chain(|cfg, b| builder(cfg, b).map(Option::Some)) } @@ -283,7 +331,7 @@ impl, Arc, Option, Arc) -> Result ) -> Result, Error> + TNetP, TExPool, TRpc, TRpcB, Backend>, Error> where TSc: Clone { let import_queue = builder( &self.config, @@ -305,6 +353,7 @@ impl) -> Result ) -> Result, Error> { + UNetP, TExPool, TRpc, TRpcB, Backend>, Error> { let network_protocol = network_protocol_builder(&self.config)?; Ok(ServiceBuilder { @@ -330,6 +379,7 @@ impl, Error> { let finality_proof_provider = builder(self.client.clone(), self.backend.clone())?; @@ -369,6 +420,7 @@ impl, Error> { self.with_opt_finality_proof_provider(|client, backend| build(client, backend).map(Option::Some)) @@ -402,7 +455,7 @@ impl, Arc, Arc, Option, Arc) -> Result<(UImpQu, Option), Error> ) -> Result, Error> + TNetP, TExPool, TRpc, TRpcB, Backend>, Error> where TSc: Clone { let (import_queue, fprb) = builder( &self.config, @@ -425,6 +478,7 @@ impl, Arc, Arc, Option, Arc) -> Result<(UImpQu, UFprb), Error> ) -> Result, Error> + TNetP, TExPool, TRpc, TRpcB, Backend>, Error> where TSc: Clone { self.with_import_queue_and_opt_fprb(|cfg, cl, b, sc, tx| builder(cfg, cl, b, sc, tx).map(|(q, f)| (q, Some(f)))) } @@ -445,7 +499,7 @@ impl) -> Result ) -> Result, Error> { + TNetP, UExPool, TRpc, TRpcB, Backend>, Error> { let transaction_pool = transaction_pool_builder(self.config.transaction_pool.clone(), self.client.clone())?; Ok(ServiceBuilder { @@ -461,6 +515,7 @@ impl, Arc) -> URpc ) -> Result, Error> { + TNetP, TExPool, URpc, TRpcB, Backend>, Error> { let rpc_extensions = rpc_ext_builder(self.client.clone(), self.transaction_pool.clone()); Ok(ServiceBuilder { @@ -486,11 +541,90 @@ impl { + /// Build chain RPC handler. + fn build_chain(&self, subscriptions: rpc::Subscriptions) -> rpc::chain::Chain; + /// Build state RPC handler. + fn build_state(&self, subscriptions: rpc::Subscriptions) -> rpc::state::State; +} + +/// RPC handlers builder for full nodes. +pub struct FullRpcBuilder { + client: Arc>, +} + +impl RpcBuilder, TFullCallExecutor, TRtApi> + for + FullRpcBuilder + where + TBl: BlockT, + TRtApi: 'static + Send + Sync, + TExecDisp: 'static + NativeExecutionDispatch, + TFullClient: ProvideRuntimeApi, + as ProvideRuntimeApi>::Api: runtime_api::Metadata, +{ + fn build_chain( + &self, + subscriptions: rpc::Subscriptions, + ) -> rpc::chain::Chain, TFullCallExecutor, TBl, TRtApi> { + rpc::chain::new_full(self.client.clone(), subscriptions) + } + + fn build_state( + &self, + subscriptions: rpc::Subscriptions, + ) -> rpc::state::State, TFullCallExecutor, TBl, TRtApi> { + rpc::state::new_full(self.client.clone(), subscriptions) + } +} + +/// RPC handlers builder for light nodes. +pub struct LightRpcBuilder, TRtApi, TExecDisp> { + client: Arc>, + remote_blockchain: Arc>, + fetcher: Arc>, +} + +impl RpcBuilder, TLightCallExecutor, TRtApi> + for + LightRpcBuilder + where + TBl: BlockT, + TRtApi: 'static + Send + Sync, + TExecDisp: 'static + NativeExecutionDispatch, +{ + fn build_chain( + &self, + subscriptions: rpc::Subscriptions, + ) -> rpc::chain::Chain, TLightCallExecutor, TBl, TRtApi> { + rpc::chain::new_light( + self.client.clone(), + subscriptions, + self.remote_blockchain.clone(), + self.fetcher.clone(), + ) + } + + fn build_state( + &self, + subscriptions: rpc::Subscriptions, + ) -> rpc::state::State, TLightCallExecutor, TBl, TRtApi> { + rpc::state::new_light( + self.client.clone(), + subscriptions, + self.remote_blockchain.clone(), + self.fetcher.clone(), + ) + } +} + /// Implemented on `ServiceBuilder`. Allows importing blocks once you have given all the required /// components to the builder. pub trait ServiceBuilderImport { @@ -532,9 +666,9 @@ pub trait ServiceBuilderRevert { ) -> Result<(), Error>; } -impl +impl ServiceBuilderImport for ServiceBuilder, - TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, Backend> + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, Backend> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, @@ -554,9 +688,9 @@ where } } -impl +impl ServiceBuilderExport for ServiceBuilder, - TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TBackend> + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, TBackend> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, @@ -577,9 +711,9 @@ where } } -impl +impl ServiceBuilderRevert for ServiceBuilder, - TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TBackend> + TFchr, TSc, TImpQu, TFprb, TFpp, TNetP, TExPool, TRpc, TRpcB, TBackend> where TBl: BlockT::Out>, TBackend: 'static + client::backend::Backend + Send, @@ -596,7 +730,7 @@ where } } -impl +impl ServiceBuilder< TBl, TRtApi, @@ -611,7 +745,8 @@ ServiceBuilder< TNetP, TransactionPool, TRpc, - TBackend + TRpcB, + TBackend, > where Client: ProvideRuntimeApi, as ProvideRuntimeApi>::Api: @@ -630,6 +765,7 @@ ServiceBuilder< TNetP: NetworkSpecialization, TExPoolApi: 'static + ChainApi::Hash>, TRpc: rpc::RpcExtension + Clone, + TRpcB: RpcBuilder, { /// Builds the service. pub fn build(self) -> Result( +pub(crate) fn start_rpc( + rpc_builder: &RpcB, client: Arc>, system_send_back: futures03::channel::mpsc::UnboundedSender>, rpc_system_info: SystemInfo, @@ -718,11 +857,13 @@ where runtime_api::Metadata + session::SessionKeys, Api: Send + Sync + 'static, Executor: client::CallExecutor + Send + Sync + Clone + 'static, - PoolApi: txpool::ChainApi + 'static { + PoolApi: txpool::ChainApi + 'static, + RpcB: RpcBuilder, +{ use rpc::{chain, state, author, system}; let subscriptions = rpc::Subscriptions::new(task_executor.clone()); - let chain = chain::Chain::new(client.clone(), subscriptions.clone()); - let state = state::State::new(client.clone(), subscriptions.clone()); + let chain = rpc_builder.build_chain(subscriptions.clone()); + let state = rpc_builder.build_state(subscriptions.clone()); let author = rpc::author::Author::new( client, transaction_pool, diff --git a/core/service/src/lib.rs b/core/service/src/lib.rs index ad359767b29ef..2056c8a2f2da3 100644 --- a/core/service/src/lib.rs +++ b/core/service/src/lib.rs @@ -345,6 +345,7 @@ macro_rules! new_impl { }; $start_rpc( client.clone(), + //light_components.clone(), system_rpc_tx.clone(), system_info.clone(), Arc::new(SpawnTaskHandle { sender: to_spawn_tx.clone() }),