diff --git a/Cargo.lock b/Cargo.lock index 346c4f2c7e26e..d4e42f6f76c72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -975,7 +975,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "jsonrpc-core" version = "8.0.2" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -987,7 +987,7 @@ dependencies = [ [[package]] name = "jsonrpc-http-server" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "hyper 0.11.27 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1000,7 +1000,7 @@ dependencies = [ [[package]] name = "jsonrpc-macros" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "jsonrpc-pubsub 8.0.1 (git+https://github.com/paritytech/jsonrpc.git)", @@ -1010,7 +1010,7 @@ dependencies = [ [[package]] name = "jsonrpc-pubsub" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", "log 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1020,7 +1020,7 @@ dependencies = [ [[package]] name = "jsonrpc-server-utils" version = "8.0.1" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "bytes 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "globset 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1033,7 +1033,7 @@ dependencies = [ [[package]] name = "jsonrpc-ws-server" version = "8.0.0" -source = "git+https://github.com/paritytech/jsonrpc.git#c3f02dfbbef0c4afe16633f503917d4a0501ee9d" +source = "git+https://github.com/paritytech/jsonrpc.git#7e5df8ca2acc01c608b2d2bd8cdcdb19d8cbaa90" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", "jsonrpc-core 8.0.2 (git+https://github.com/paritytech/jsonrpc.git)", @@ -2647,6 +2647,7 @@ version = "0.1.0" dependencies = [ "ed25519 0.1.0", "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", + "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2707,7 +2708,7 @@ dependencies = [ "hex-literal 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", - "parking_lot 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)", + "parking_lot 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.70 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/demo/cli/src/lib.rs b/demo/cli/src/lib.rs index 08834541f52bf..6eb6bfeaccdc5 100644 --- a/demo/cli/src/lib.rs +++ b/demo/cli/src/lib.rs @@ -173,9 +173,10 @@ pub fn run(args: I) -> error::Result<()> where let mut runtime = Runtime::new()?; let _rpc_servers = { let handler = || { + let state = rpc::apis::state::State::new(client.clone(), runtime.executor()); let chain = rpc::apis::chain::Chain::new(client.clone(), runtime.executor()); let author = rpc::apis::author::Author::new(client.clone(), Arc::new(DummyPool), runtime.executor()); - rpc::rpc_handler::(client.clone(), chain, author, DummySystem) + rpc::rpc_handler::(state, chain, author, DummySystem) }; let http_address = "127.0.0.1:9933".parse().unwrap(); let ws_address = "127.0.0.1:9944".parse().unwrap(); diff --git a/substrate/client/Cargo.toml b/substrate/client/Cargo.toml index 30ec72435d202..5b347395763b7 100644 --- a/substrate/client/Cargo.toml +++ b/substrate/client/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Parity Technologies "] [dependencies] error-chain = "0.12" +fnv = "1.0" log = "0.3" parking_lot = "0.4" triehash = "0.1" diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index b297341fa19f4..11aaab78e4ff4 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -453,7 +453,7 @@ impl client::backend::Backend for Backend { } } -impl client::backend::LocalBackend for Backend +impl client::backend::LocalBackend for Backend {} #[cfg(test)] diff --git a/substrate/client/src/backend.rs b/substrate/client/src/backend.rs index 0686e1e47aadd..8de27a39b4645 100644 --- a/substrate/client/src/backend.rs +++ b/substrate/client/src/backend.rs @@ -16,12 +16,12 @@ //! Polkadot Client data backend -use state_machine::backend::Backend as StateBackend; use error; use primitives::AuthorityId; use runtime_primitives::bft::Justification; -use runtime_primitives::traits::{Block as BlockT, NumberFor}; use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::{Block as BlockT, NumberFor}; +use state_machine::backend::Backend as StateBackend; /// Block insertion operation. Keeps hold if the inserted block state and data. pub trait BlockImportOperation { diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index c125a1aa4919b..bd7f3c5581a2e 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -31,6 +31,7 @@ use backend::{self, BlockImportOperation}; use blockchain::{self, Info as ChainInfo, Backend as ChainBackend, HeaderBackend as ChainHeaderBackend}; use call_executor::{CallExecutor, LocalCallExecutor}; use executor::{RuntimeVersion, RuntimeInfo}; +use notifications::{StorageNotifications, StorageEventStream}; use {error, in_mem, block_builder, runtime_io, bft, genesis}; /// Type that implements `futures::Stream` of block import events. @@ -40,6 +41,7 @@ pub type BlockchainEventStream = mpsc::UnboundedReceiver where Block: BlockT { backend: Arc, executor: E, + storage_notifications: Mutex>, import_notification_sinks: Mutex>>>, import_lock: Mutex<()>, importing_block: RwLock>, // holds the block hash currently being imported. TODO: replace this with block queue @@ -49,7 +51,12 @@ pub struct Client where Block: BlockT { /// A source of blockchain evenets. pub trait BlockchainEvents { /// Get block import event stream. - fn import_notification_stream(&self) -> mpsc::UnboundedReceiver>; + fn import_notification_stream(&self) -> BlockchainEventStream; + + /// Get storage changes event stream. + /// + /// Passing `None` as `filter_keys` subscribes to all storage changes. + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result>; } /// Chain head information. @@ -182,9 +189,10 @@ impl Client where Ok(Client { backend, executor, - import_notification_sinks: Mutex::new(Vec::new()), - import_lock: Mutex::new(()), - importing_block: RwLock::new(None), + storage_notifications: Default::default(), + import_notification_sinks: Default::default(), + import_lock: Default::default(), + importing_block: Default::default(), execution_strategy, }) } @@ -332,7 +340,7 @@ impl Client where } let mut transaction = self.backend.begin_operation(BlockId::Hash(parent_hash))?; - let storage_update = match transaction.state()? { + let (storage_update, storage_changes) = match transaction.state()? { Some(transaction_state) => { let mut overlay = Default::default(); let mut r = self.executor.call_at_state( @@ -359,9 +367,10 @@ impl Client where }, ); let (_, storage_update) = r?; - Some(storage_update) + overlay.commit_prospective(); + (Some(storage_update), Some(overlay.into_committed())) }, - None => None, + None => (None, None) }; let is_new_best = header.number() == &(self.backend.blockchain().info()?.best_number + One::one()); @@ -373,7 +382,15 @@ impl Client where transaction.update_storage(storage_update)?; } self.backend.commit_operation(transaction)?; + if origin == BlockOrigin::NetworkBroadcast || origin == BlockOrigin::Own || origin == BlockOrigin::ConsensusBroadcast { + + if let Some(storage_changes) = storage_changes { + // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? + self.storage_notifications.lock() + .trigger(&hash, storage_changes); + } + let notification = BlockImportNotification:: { hash: hash, origin: origin, @@ -516,16 +533,20 @@ impl bft::Authorities for Client impl BlockchainEvents for Client where - B: backend::Backend, E: CallExecutor, Block: BlockT, { /// Get block import event stream. - fn import_notification_stream(&self) -> mpsc::UnboundedReceiver> { + fn import_notification_stream(&self) -> BlockchainEventStream { let (sink, stream) = mpsc::unbounded(); self.import_notification_sinks.lock().push(sink); stream } + + /// Get storage changes event stream. + fn storage_changes_notification_stream(&self, filter_keys: Option<&[StorageKey]>) -> error::Result> { + Ok(self.storage_notifications.lock().listen(filter_keys)) + } } impl ChainHead for Client diff --git a/substrate/client/src/lib.rs b/substrate/client/src/lib.rs index 329d5a5ac02df..8a4a143914a96 100644 --- a/substrate/client/src/lib.rs +++ b/substrate/client/src/lib.rs @@ -32,6 +32,7 @@ extern crate substrate_state_machine as state_machine; #[macro_use] extern crate slog; // needed until we can reexport `slog_info` from `substrate_telemetry` extern crate ed25519; +extern crate fnv; extern crate futures; extern crate parking_lot; extern crate triehash; @@ -50,13 +51,15 @@ pub mod block_builder; pub mod light; mod call_executor; mod client; +mod notifications; +pub use blockchain::Info as ChainInfo; +pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor}; pub use client::{ new_in_mem, BlockStatus, BlockOrigin, BlockchainEventStream, BlockchainEvents, Client, ClientInfo, ChainHead, ImportResult, JustifiedHeader, }; -pub use blockchain::Info as ChainInfo; -pub use call_executor::{CallResult, CallExecutor, LocalCallExecutor}; +pub use notifications::{StorageEventStream, StorageChangeSet}; pub use state_machine::ExecutionStrategy; diff --git a/substrate/client/src/light/backend.rs b/substrate/client/src/light/backend.rs index d06aef80bf5ab..e3b8b42c89c5f 100644 --- a/substrate/client/src/light/backend.rs +++ b/substrate/client/src/light/backend.rs @@ -62,7 +62,11 @@ impl Backend { } } -impl ClientBackend for Backend where Block: BlockT, S: BlockchainStorage, F: Fetcher { +impl ClientBackend for Backend where + Block: BlockT, + S: BlockchainStorage, + F: Fetcher, +{ type BlockImportOperation = ImportOperation; type Blockchain = Blockchain; type State = OnDemandState; diff --git a/substrate/client/src/notifications.rs b/substrate/client/src/notifications.rs new file mode 100644 index 0000000000000..a64ee0f4dd6ac --- /dev/null +++ b/substrate/client/src/notifications.rs @@ -0,0 +1,267 @@ +// Copyright 2017 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Storage notifications + +use std::{ + collections::{HashSet, HashMap}, + sync::Arc, +}; + +use fnv::{FnvHashSet, FnvHashMap}; +use futures::sync::mpsc; +use primitives::storage::{StorageKey, StorageData}; +use runtime_primitives::traits::Block as BlockT; + +/// Storage change set +#[derive(Debug)] +pub struct StorageChangeSet { + changes: Arc)>>, + filter: Option>, +} + +impl StorageChangeSet { + /// Convert the change set into iterator over storage items. + pub fn iter<'a>(&'a self) -> impl Iterator)> + 'a { + self.changes + .iter() + .filter(move |&(key, _)| match self.filter { + Some(ref filter) => filter.contains(key), + None => true, + }) + } +} + +/// Type that implements `futures::Stream` of storage change events. +pub type StorageEventStream = mpsc::UnboundedReceiver<(H, StorageChangeSet)>; + +type SubscriberId = u64; + +/// Manages storage listeners. +#[derive(Debug)] +pub struct StorageNotifications { + next_id: SubscriberId, + wildcard_listeners: FnvHashSet, + listeners: HashMap>, + sinks: FnvHashMap, + Option>, + )>, +} + +impl Default for StorageNotifications { + fn default() -> Self { + StorageNotifications { + next_id: Default::default(), + wildcard_listeners: Default::default(), + listeners: Default::default(), + sinks: Default::default(), + } + } +} + +impl StorageNotifications { + /// Trigger notification to all listeners. + /// + /// Note the changes are going to be filtered by listener's filter key. + /// In fact no event might be sent if clients are not interested in the changes. + pub fn trigger(&mut self, hash: &Block::Hash, changeset: impl Iterator, Option>)>) { + let has_wildcard = !self.wildcard_listeners.is_empty(); + + // early exit if no listeners + if !has_wildcard && self.listeners.is_empty() { + return; + } + + let mut subscribers = self.wildcard_listeners.clone(); + let mut changes = Vec::new(); + + // Collect subscribers and changes + for (k, v) in changeset { + let k = StorageKey(k); + let listeners = self.listeners.get(&k); + + if let Some(ref listeners) = listeners { + subscribers.extend(listeners.iter()); + } + + if has_wildcard || listeners.is_some() { + changes.push((k, v.map(StorageData))); + } + } + + let changes = Arc::new(changes); + // Trigger the events + for subscriber in subscribers { + let should_remove = { + let &(ref sink, ref filter) = self.sinks.get(&subscriber) + .expect("subscribers returned from self.listeners are always in self.sinks; qed"); + sink.unbounded_send((hash.clone(), StorageChangeSet { + changes: changes.clone(), + filter: filter.clone(), + })).is_err() + }; + + if should_remove { + self.remove_subscriber(subscriber); + } + } + } + + fn remove_subscriber(&mut self, subscriber: SubscriberId) { + if let Some((_, filters)) = self.sinks.remove(&subscriber) { + match filters { + None => { + self.wildcard_listeners.remove(&subscriber); + }, + Some(filters) => { + for key in filters { + let remove_key = match self.listeners.get_mut(&key) { + Some(ref mut set) => { + set.remove(&subscriber); + set.is_empty() + }, + None => false, + }; + + if remove_key { + self.listeners.remove(&key); + } + } + }, + } + } + } + + /// Start listening for particular storage keys. + pub fn listen(&mut self, filter_keys: Option<&[StorageKey]>) -> StorageEventStream { + self.next_id += 1; + + // add subscriber for every key + let keys = match filter_keys { + None => { + self.wildcard_listeners.insert(self.next_id); + None + }, + Some(keys) => Some(keys.iter().map(|key| { + self.listeners + .entry(key.clone()) + .or_insert_with(Default::default) + .insert(self.next_id); + key.clone() + }).collect()) + }; + + // insert sink + let (tx, rx) = mpsc::unbounded(); + self.sinks.insert(self.next_id, (tx, keys)); + rx + } +} + +#[cfg(test)] +mod tests { + use runtime_primitives::testing::{H256 as Hash, Block as RawBlock}; + use super::*; + use futures::Stream; + + + #[cfg(test)] + impl From)>> for StorageChangeSet { + fn from(changes: Vec<(StorageKey, Option)>) -> Self { + StorageChangeSet { + changes: Arc::new(changes), + filter: None, + } + } + } + + #[cfg(test)] + impl PartialEq for StorageChangeSet { + fn eq(&self, other: &Self) -> bool { + self.iter().eq(other.iter()) + } + } + + type Block = RawBlock; + + #[test] + fn triggering_change_should_notify_wildcard_listeners() { + // given + let mut notifications = StorageNotifications::::default(); + let mut recv = notifications.listen(None).wait(); + + // when + let changeset = vec![ + (vec![2], Some(vec![3])), + (vec![3], None), + ]; + notifications.trigger(&1.into(), changeset.into_iter()); + + // then + assert_eq!(recv.next().unwrap(), Ok((1.into(), vec![ + (StorageKey(vec![2]), Some(StorageData(vec![3]))), + (StorageKey(vec![3]), None), + ].into()))); + } + + #[test] + fn should_only_notify_interested_listeners() { + // given + let mut notifications = StorageNotifications::::default(); + let mut recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); + let mut recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); + + // when + let changeset = vec![ + (vec![2], Some(vec![3])), + (vec![1], None), + ]; + notifications.trigger(&1.into(), changeset.into_iter()); + + // then + assert_eq!(recv1.next().unwrap(), Ok((1.into(), vec![ + (StorageKey(vec![1]), None), + ].into()))); + assert_eq!(recv2.next().unwrap(), Ok((1.into(), vec![ + (StorageKey(vec![2]), Some(StorageData(vec![3]))), + ].into()))); + } + + #[test] + fn should_cleanup_subscribers_if_dropped() { + // given + let mut notifications = StorageNotifications::::default(); + { + let _recv1 = notifications.listen(Some(&[StorageKey(vec![1])])).wait(); + let _recv2 = notifications.listen(Some(&[StorageKey(vec![2])])).wait(); + let _recv3 = notifications.listen(None).wait(); + assert_eq!(notifications.listeners.len(), 2); + assert_eq!(notifications.wildcard_listeners.len(), 1); + } + + // when + let changeset = vec![ + (vec![2], Some(vec![3])), + (vec![1], None), + ]; + notifications.trigger(&1.into(), changeset.into_iter()); + + // then + assert_eq!(notifications.listeners.len(), 0); + assert_eq!(notifications.wildcard_listeners.len(), 0); + } +} diff --git a/substrate/primitives/src/storage.rs b/substrate/primitives/src/storage.rs index c8929c7646ff6..25bb11fb6e549 100644 --- a/substrate/primitives/src/storage.rs +++ b/substrate/primitives/src/storage.rs @@ -22,10 +22,23 @@ use rstd::vec::Vec; /// Contract storage key. #[derive(PartialEq, Eq)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))] pub struct StorageKey(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); /// Contract storage entry data. #[derive(PartialEq, Eq)] -#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord))] +#[cfg_attr(feature = "std", derive(Serialize, Deserialize, Debug, Hash, PartialOrd, Ord, Clone))] pub struct StorageData(#[cfg_attr(feature = "std", serde(with="bytes"))] pub Vec); + +/// Storage change set +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +pub struct StorageChangeSet { + /// Block hash + pub block: Hash, + /// A list of changes + pub changes: Vec<( + StorageKey, + Option, + )>, +} + diff --git a/substrate/rpc-servers/src/lib.rs b/substrate/rpc-servers/src/lib.rs index 006345e68e335..11fa9aa9b60af 100644 --- a/substrate/rpc-servers/src/lib.rs +++ b/substrate/rpc-servers/src/lib.rs @@ -45,7 +45,7 @@ pub fn rpc_handler( system: Y, ) -> RpcHandler where Block: 'static, - S: apis::state::StateApi, + S: apis::state::StateApi, C: apis::chain::ChainApi, A: apis::author::AuthorApi, Y: apis::system::SystemApi, diff --git a/substrate/rpc/src/chain/mod.rs b/substrate/rpc/src/chain/mod.rs index d1d055b5c0ff8..a7a8e8c0e046a 100644 --- a/substrate/rpc/src/chain/mod.rs +++ b/substrate/rpc/src/chain/mod.rs @@ -18,14 +18,13 @@ use std::sync::Arc; -use runtime_primitives::traits::Block as BlockT; -use runtime_primitives::generic::BlockId; use client::{self, Client, BlockchainEvents}; - use jsonrpc_macros::pubsub; use jsonrpc_pubsub::SubscriptionId; use rpc::Result as RpcResult; use rpc::futures::{Future, Sink, Stream}; +use runtime_primitives::generic::BlockId; +use runtime_primitives::traits::Block as BlockT; use tokio::runtime::TaskExecutor; use subscriptions::Subscriptions; @@ -51,11 +50,11 @@ build_rpc_trait! { #[pubsub(name = "chain_newHead")] { /// New head subscription - #[rpc(name = "subscribe_newHead")] + #[rpc(name = "chain_subscribeNewHead", alias = ["subscribe_newHead", ])] fn subscribe_new_head(&self, Self::Metadata, pubsub::Subscriber
); /// Unsubscribe from new head subscription. - #[rpc(name = "unsubscribe_newHead")] + #[rpc(name = "chain_unsubscribeNewHead", alias = ["unsubscribe_newHead", ])] fn unsubscribe_new_head(&self, SubscriptionId) -> RpcResult; } } @@ -72,7 +71,7 @@ pub struct Chain { impl Chain { /// Create new Chain API RPC handler. pub fn new(client: Arc>, executor: TaskExecutor) -> Self { - Chain { + Self { client, subscriptions: Subscriptions::new(executor), } diff --git a/substrate/rpc/src/state/mod.rs b/substrate/rpc/src/state/mod.rs index 40d6f70f6c318..6cf6907712f97 100644 --- a/substrate/rpc/src/state/mod.rs +++ b/substrate/rpc/src/state/mod.rs @@ -16,24 +16,33 @@ //! Polkadot state API. -mod error; - -#[cfg(test)] -mod tests; - use std::sync::Arc; -use client::{self, Client, CallExecutor}; +use client::{self, Client, CallExecutor, BlockchainEvents}; +use jsonrpc_macros::Trailing; +use jsonrpc_macros::pubsub; +use jsonrpc_pubsub::SubscriptionId; +use primitives::hexdisplay::HexDisplay; +use primitives::storage::{StorageKey, StorageData, StorageChangeSet}; +use rpc::Result as RpcResult; +use rpc::futures::{Future, Sink, Stream}; use runtime_primitives::generic::BlockId; use runtime_primitives::traits::Block as BlockT; -use primitives::storage::{StorageKey, StorageData}; -use primitives::hexdisplay::HexDisplay; +use tokio::runtime::TaskExecutor; + +use subscriptions::Subscriptions; + +mod error; +#[cfg(test)] +mod tests; use self::error::Result; build_rpc_trait! { /// Polkadot state API pub trait StateApi { + type Metadata; + /// Returns a storage entry at a specific block's state. #[rpc(name = "state_getStorageAt")] fn storage_at(&self, StorageKey, Hash) -> Result; @@ -65,22 +74,52 @@ build_rpc_trait! { /// Call a contract at the best block. #[rpc(name = "state_call")] fn call(&self, String, Vec) -> Result>; + + #[pubsub(name = "state_storage")] { + /// New storage subscription + #[rpc(name = "state_subscribeStorage")] + fn subscribe_storage(&self, Self::Metadata, pubsub::Subscriber>, Trailing>); + + /// Unsubscribe from storage subscription + #[rpc(name = "state_unsubscribeStorage")] + fn unsubscribe_storage(&self, SubscriptionId) -> RpcResult; + } + } +} + +/// State API with subscriptions support. +pub struct State { + /// Substrate client. + client: Arc>, + /// Current subscriptions. + subscriptions: Subscriptions, +} + +impl State { + /// Create new State API RPC handler. + pub fn new(client: Arc>, executor: TaskExecutor) -> Self { + Self { + client, + subscriptions: Subscriptions::new(executor), + } } } -impl StateApi for Arc> where +impl StateApi for State where Block: BlockT + 'static, B: client::backend::Backend + Send + Sync + 'static, E: CallExecutor + Send + Sync + 'static, { + type Metadata = ::metadata::Metadata; + fn storage_at(&self, key: StorageKey, block: Block::Hash) -> Result { trace!(target: "rpc", "Querying storage at {:?} for key {}", block, HexDisplay::from(&key.0)); - Ok(self.as_ref().storage(&BlockId::Hash(block), &key)?) + Ok(self.client.storage(&BlockId::Hash(block), &key)?) } fn call_at(&self, method: String, data: Vec, block: Block::Hash) -> Result> { trace!(target: "rpc", "Calling runtime at {:?} for method {} ({})", block, method, HexDisplay::from(&data)); - Ok(self.as_ref().executor().call(&BlockId::Hash(block), &method, &data)?.return_data) + Ok(self.client.executor().call(&BlockId::Hash(block), &method, &data)?.return_data) } fn storage_hash_at(&self, key: StorageKey, block: Block::Hash) -> Result { @@ -93,18 +132,52 @@ impl StateApi for Arc> where } fn storage_hash(&self, key: StorageKey) -> Result { - self.storage_hash_at(key, self.as_ref().info()?.chain.best_hash) + self.storage_hash_at(key, self.client.info()?.chain.best_hash) } fn storage_size(&self, key: StorageKey) -> Result { - self.storage_size_at(key, self.as_ref().info()?.chain.best_hash) + self.storage_size_at(key, self.client.info()?.chain.best_hash) } fn storage(&self, key: StorageKey) -> Result { - self.storage_at(key, self.as_ref().info()?.chain.best_hash) + self.storage_at(key, self.client.info()?.chain.best_hash) } fn call(&self, method: String, data: Vec) -> Result> { - self.call_at(method, data, self.as_ref().info()?.chain.best_hash) + self.call_at(method, data, self.client.info()?.chain.best_hash) + } + + fn subscribe_storage( + &self, + _meta: Self::Metadata, + subscriber: pubsub::Subscriber>, + keys: Trailing> + ) { + let keys = Into::>>::into(keys); + let stream = match self.client.storage_changes_notification_stream(keys.as_ref().map(|x| &**x)) { + Ok(stream) => stream, + Err(err) => { + let _ = subscriber.reject(error::Error::from(err).into()); + return; + }, + }; + + self.subscriptions.add(subscriber, |sink| { + let stream = stream + .map_err(|e| warn!("Error creating storage notification stream: {:?}", e)) + .map(|(block, changes)| Ok(StorageChangeSet { + block, + changes: changes.iter().cloned().collect(), + })); + sink + .sink_map_err(|e| warn!("Error sending notifications: {:?}", e)) + .send_all(stream) + // we ignore the resulting Stream (if the first stream is over we are unsubscribed) + .map(|_| ()) + }) + } + + fn unsubscribe_storage(&self, id: SubscriptionId) -> RpcResult { + Ok(self.subscriptions.cancel(id)) } } diff --git a/substrate/rpc/src/state/tests.rs b/substrate/rpc/src/state/tests.rs index 1b98711ea5e99..37d86cb9266b9 100644 --- a/substrate/rpc/src/state/tests.rs +++ b/substrate/rpc/src/state/tests.rs @@ -16,26 +16,60 @@ use super::*; use self::error::{Error, ErrorKind}; +use jsonrpc_macros::pubsub; +use client::BlockOrigin; use test_client::{self, TestClient}; #[test] fn should_return_storage() { + let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); + let client = State::new(client, core.executor()); assert_matches!( - StateApi::storage_at(&client, StorageKey(vec![10]), genesis_hash), + client.storage_at(StorageKey(vec![10]), genesis_hash), Err(Error(ErrorKind::Client(client::error::ErrorKind::NoValueForKey(ref k)), _)) if *k == vec![10] ) } #[test] fn should_call_contract() { + let core = ::tokio::runtime::Runtime::new().unwrap(); let client = Arc::new(test_client::new()); let genesis_hash = client.genesis_hash(); + let client = State::new(client, core.executor()); assert_matches!( - StateApi::call_at(&client, "balanceOf".into(), vec![1,2,3], genesis_hash), + client.call_at("balanceOf".into(), vec![1,2,3], genesis_hash), Err(Error(ErrorKind::Client(client::error::ErrorKind::Execution(_)), _)) ) } + +#[test] +fn should_notify_about_storage_changes() { + let mut core = ::tokio::runtime::Runtime::new().unwrap(); + let remote = core.executor(); + let (subscriber, id, transport) = pubsub::Subscriber::new_test("test"); + + { + let api = State { + client: Arc::new(test_client::new()), + subscriptions: Subscriptions::new(remote), + }; + + api.subscribe_storage(Default::default(), subscriber, None.into()); + + // assert id assigned + assert_eq!(core.block_on(id), Ok(Ok(SubscriptionId::Number(0)))); + + let builder = api.client.new_block().unwrap(); + api.client.justify_and_import(BlockOrigin::Own, builder.bake().unwrap()).unwrap(); + } + + // assert notification send to transport + let (notification, next) = core.block_on(transport.into_future()).unwrap(); + assert!(notification.is_some()); + // no more notifications on this channel + assert_eq!(core.block_on(next.into_future()).unwrap().0, None); +} diff --git a/substrate/service/src/lib.rs b/substrate/service/src/lib.rs index fca2c4d05834e..2b9e588d01156 100644 --- a/substrate/service/src/lib.rs +++ b/substrate/service/src/lib.rs @@ -202,9 +202,10 @@ impl Service let handler = || { let client = client.clone(); let chain = rpc::apis::chain::Chain::new(client.clone(), task_executor.clone()); + let state = rpc::apis::state::State::new(client.clone(), task_executor.clone()); let author = rpc::apis::author::Author::new(client.clone(), extrinsic_pool.api(), task_executor.clone()); rpc::rpc_handler::, _, _, _, _>( - client, + state, chain, author, rpc_config.clone(), diff --git a/substrate/state-machine/src/lib.rs b/substrate/state-machine/src/lib.rs index 17fd85852b6ac..20498b107bf0f 100644 --- a/substrate/state-machine/src/lib.rs +++ b/substrate/state-machine/src/lib.rs @@ -34,7 +34,6 @@ extern crate byteorder; extern crate parking_lot; use std::collections::HashMap; -use std::collections::hash_map::Drain; use std::fmt; pub mod backend; @@ -113,10 +112,23 @@ impl OverlayedChanges { } } - /// Drain prospective changes to an iterator. - pub fn drain(&mut self) -> Drain, Option>> { + /// Drain committed changes to an iterator. + /// + /// Panics: + /// Will panic if there are any uncommitted prospective changes. + pub fn drain<'a>(&'a mut self) -> impl Iterator, Option>)> + 'a { + assert!(self.prospective.is_empty()); self.committed.drain() } + + /// Consume `OverlayedChanges` and take committed set. + /// + /// Panics: + /// Will panic if there are any uncommitted prospective changes. + pub fn into_committed(self) -> impl Iterator, Option>)> { + assert!(self.prospective.is_empty()); + self.committed.into_iter() + } } /// State Machine Error bound. @@ -367,14 +379,7 @@ pub fn execute_using_consensus_failure_handler< result.map(move |out| (out, delta)) }; - match result { - Ok(x) => { - Ok(x) - } - Err(e) => { - Err(Box::new(e)) - } - } + result.map_err(|e| Box::new(e) as _) } /// Prove execution using the given state backend, overlayed changes, and call executor.