- + Send + 'static,
+ {
+ let id = self.next_id.next_id();
+ let subscription_id: SubscriptionId = id.into();
+ if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
+ let (tx, rx) = oneshot::channel();
+ let future = into_future(sink)
+ .into_future()
+ .select(rx.map_err(|e| warn!("Error timeing out: {:?}", e)))
+ .then(|_| Ok(()));
+
+ self.active_subscriptions.lock().insert(id, tx);
+ if self.executor.execute(Box::new(future)).is_err() {
+ error!("Failed to spawn RPC subscription task");
+ }
+ }
+
+ subscription_id
+ }
+
+ /// Cancel subscription.
+ ///
+ /// Returns true if subscription existed or false otherwise.
+ pub fn cancel(&self, id: SubscriptionId) -> bool {
+ if let SubscriptionId::Number(id) = id {
+ if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
+ let _ = tx.send(());
+ return true;
+ }
+ }
+ false
+ }
+}
diff --git a/client/rpc-servers/Cargo.toml b/client/rpc-servers/Cargo.toml
index 9e52ccae588c6..401f5f4882530 100644
--- a/client/rpc-servers/Cargo.toml
+++ b/client/rpc-servers/Cargo.toml
@@ -12,13 +12,13 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
-jsonrpc-core = "14.2"
-pubsub = { package = "jsonrpc-pubsub", version = "14.2" }
+jsonrpc-core = "14.0.3"
+pubsub = { package = "jsonrpc-pubsub", version = "14.0.3" }
log = "0.4.8"
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc2", path = "../../primitives/runtime" }
[target.'cfg(not(target_os = "unknown"))'.dependencies]
-http = { package = "jsonrpc-http-server", version = "14.2" }
-ws = { package = "jsonrpc-ws-server", version = "14.2" }
+http = { package = "jsonrpc-http-server", version = "14.0.3" }
+ws = { package = "jsonrpc-ws-server", version = "14.0.3" }
diff --git a/client/rpc/Cargo.toml b/client/rpc/Cargo.toml
index 60c4a24cd0e6a..62f93195758b9 100644
--- a/client/rpc/Cargo.toml
+++ b/client/rpc/Cargo.toml
@@ -17,10 +17,10 @@ sc-client-api = { version = "2.0.0-rc2", path = "../api" }
sp-api = { version = "2.0.0-rc2", path = "../../primitives/api" }
codec = { package = "parity-scale-codec", version = "1.3.0" }
futures = { version = "0.3.1", features = ["compat"] }
-jsonrpc-pubsub = "14.2"
+jsonrpc-pubsub = "14.0.3"
log = "0.4.8"
sp-core = { version = "2.0.0-rc2", path = "../../primitives/core" }
-rpc = { package = "jsonrpc-core", version = "14.2" }
+rpc = { package = "jsonrpc-core", version = "14.0.3" }
sp-version = { version = "2.0.0-rc2", path = "../../primitives/version" }
serde_json = "1.0.41"
sp-session = { version = "2.0.0-rc2", path = "../../primitives/session" }
diff --git a/client/rpc/src/author/mod.rs b/client/rpc/src/author/mod.rs
index 974c1b85e1b39..d59fad354ef3c 100644
--- a/client/rpc/src/author/mod.rs
+++ b/client/rpc/src/author/mod.rs
@@ -32,8 +32,8 @@ use rpc::futures::{
};
use futures::{StreamExt as _, compat::Compat};
use futures::future::{ready, FutureExt, TryFutureExt};
-use sc_rpc_api::DenyUnsafe;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
+use sc_rpc_api::{DenyUnsafe, Subscriptions};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use codec::{Encode, Decode};
use sp_core::{Bytes, traits::BareCryptoStorePtr};
use sp_api::ProvideRuntimeApi;
@@ -55,7 +55,7 @@ pub struct Author
{
/// Transactions pool
pool: Arc
,
/// Subscriptions manager
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
/// The key store.
keystore: BareCryptoStorePtr,
/// Whether to deny unsafe calls
@@ -67,7 +67,7 @@ impl
Author
{
pub fn new(
client: Arc,
pool: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
keystore: BareCryptoStorePtr,
deny_unsafe: DenyUnsafe,
) -> Self {
@@ -81,6 +81,7 @@ impl
Author
{
}
}
+
/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
diff --git a/client/rpc/src/author/tests.rs b/client/rpc/src/author/tests.rs
index f0f92a8e7eae7..8c1b82028bd79 100644
--- a/client/rpc/src/author/tests.rs
+++ b/client/rpc/src/author/tests.rs
@@ -81,7 +81,7 @@ impl TestSetup {
Author {
client: self.client.clone(),
pool: self.pool.clone(),
- subscriptions: SubscriptionManager::new(Arc::new(crate::testing::TaskExecutor)),
+ subscriptions: Subscriptions::new(Arc::new(crate::testing::TaskExecutor)),
keystore: self.keystore.clone(),
deny_unsafe: DenyUnsafe::No,
}
@@ -133,14 +133,8 @@ fn should_watch_extrinsic() {
uxt(AccountKeyring::Alice, 0).encode().into(),
);
- let id = executor::block_on(id_rx.compat()).unwrap().unwrap();
- assert_matches!(id, SubscriptionId::String(_));
-
- let id = match id {
- SubscriptionId::String(id) => id,
- _ => unreachable!(),
- };
-
+ // then
+ assert_eq!(executor::block_on(id_rx.compat()), Ok(Ok(1.into())));
// check notifications
let replacement = {
let tx = Transfer {
@@ -153,22 +147,15 @@ fn should_watch_extrinsic() {
};
AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap();
let (res, data) = executor::block_on(data.into_future().compat()).unwrap();
-
- let expected = Some(format!(
- r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":"ready","subscription":"{}"}}}}"#,
- id,
- ));
- assert_eq!(res, expected);
-
+ assert_eq!(
+ res,
+ Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into())
+ );
let h = blake2_256(&replacement.encode());
- let expected = Some(format!(
- r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":"{}"}}}}"#,
- HexDisplay::from(&h),
- id,
- ));
-
- let res = executor::block_on(data.into_future().compat()).unwrap().0;
- assert_eq!(res, expected);
+ assert_eq!(
+ executor::block_on(data.into_future().compat()).unwrap().0,
+ Some(format!(r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":1}}}}"#, HexDisplay::from(&h)))
+ );
}
#[test]
diff --git a/client/rpc/src/chain/chain_full.rs b/client/rpc/src/chain/chain_full.rs
index 816dbba866417..c1b062754bdac 100644
--- a/client/rpc/src/chain/chain_full.rs
+++ b/client/rpc/src/chain/chain_full.rs
@@ -18,8 +18,8 @@
use std::sync::Arc;
use rpc::futures::future::result;
-use jsonrpc_pubsub::manager::SubscriptionManager;
+use sc_rpc_api::Subscriptions;
use sc_client_api::{BlockchainEvents, BlockBackend};
use sp_runtime::{generic::{BlockId, SignedBlock}, traits::{Block as BlockT}};
@@ -32,14 +32,14 @@ pub struct FullChain {
/// Substrate client.
client: Arc,
/// Current subscriptions.
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
/// phantom member to pin the block type
_phantom: PhantomData,
}
impl FullChain {
/// Create new Chain API RPC handler.
- pub fn new(client: Arc, subscriptions: SubscriptionManager) -> Self {
+ pub fn new(client: Arc, subscriptions: Subscriptions) -> Self {
Self {
client,
subscriptions,
@@ -56,7 +56,7 @@ impl ChainBackend for FullChain whe
&self.client
}
- fn subscriptions(&self) -> &SubscriptionManager {
+ fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}
diff --git a/client/rpc/src/chain/chain_light.rs b/client/rpc/src/chain/chain_light.rs
index 8a4afbed71c16..059233089d05d 100644
--- a/client/rpc/src/chain/chain_light.rs
+++ b/client/rpc/src/chain/chain_light.rs
@@ -19,8 +19,8 @@
use std::sync::Arc;
use futures::{future::ready, FutureExt, TryFutureExt};
use rpc::futures::future::{result, Future, Either};
-use jsonrpc_pubsub::manager::SubscriptionManager;
+use sc_rpc_api::Subscriptions;
use sc_client_api::light::{Fetcher, RemoteBodyRequest, RemoteBlockchain};
use sp_runtime::{
generic::{BlockId, SignedBlock},
@@ -37,7 +37,7 @@ pub struct LightChain {
/// Substrate client.
client: Arc,
/// Current subscriptions.
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
/// Remote blockchain reference
remote_blockchain: Arc>,
/// Remote fetcher reference.
@@ -48,7 +48,7 @@ impl> LightChain {
/// Create new Chain API RPC handler.
pub fn new(
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Self {
@@ -70,7 +70,7 @@ impl ChainBackend for LightChain &SubscriptionManager {
+ fn subscriptions(&self) -> &Subscriptions {
&self.subscriptions
}
diff --git a/client/rpc/src/chain/mod.rs b/client/rpc/src/chain/mod.rs
index 7b13e7a6005ff..6d53fbbb06f6e 100644
--- a/client/rpc/src/chain/mod.rs
+++ b/client/rpc/src/chain/mod.rs
@@ -32,8 +32,9 @@ use rpc::{
futures::{stream, Future, Sink, Stream},
};
+use sc_rpc_api::Subscriptions;
use sc_client_api::{BlockchainEvents, light::{Fetcher, RemoteBlockchain}};
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use sp_rpc::{number::NumberOrHex, list::ListOrValue};
use sp_runtime::{
generic::{BlockId, SignedBlock},
@@ -56,7 +57,7 @@ trait ChainBackend: Send + Sync + 'static
fn client(&self) -> &Arc;
/// Get subscriptions reference.
- fn subscriptions(&self) -> &SubscriptionManager;
+ fn subscriptions(&self) -> &Subscriptions;
/// Tries to unwrap passed block hash, or uses best block hash otherwise.
fn unwrap_or_best(&self, hash: Option) -> Block::Hash {
@@ -176,7 +177,7 @@ trait ChainBackend: Send + Sync + 'static
/// Create new state API that works on full node.
pub fn new_full(
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
) -> Chain
where
Block: BlockT + 'static,
@@ -190,7 +191,7 @@ pub fn new_full(
/// Create new state API that works on light node.
pub fn new_light>(
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Chain
@@ -278,7 +279,7 @@ impl ChainApi, Block::Hash, Block::Header, Signe
/// Subscribe to new headers.
fn subscribe_headers(
client: &Arc,
- subscriptions: &SubscriptionManager,
+ subscriptions: &Subscriptions,
subscriber: Subscriber,
best_block_hash: G,
stream: F,
diff --git a/client/rpc/src/chain/tests.rs b/client/rpc/src/chain/tests.rs
index 68d46135e36b1..e86d1d547fbde 100644
--- a/client/rpc/src/chain/tests.rs
+++ b/client/rpc/src/chain/tests.rs
@@ -31,7 +31,7 @@ use crate::testing::TaskExecutor;
#[test]
fn should_return_header() {
let client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
api.header(Some(client.genesis_hash()).into()).wait(),
@@ -63,7 +63,7 @@ fn should_return_header() {
#[test]
fn should_return_a_block() {
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_hash = block.hash();
@@ -114,7 +114,7 @@ fn should_return_a_block() {
#[test]
fn should_return_block_hash() {
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
api.block_hash(None.into()),
@@ -158,7 +158,7 @@ fn should_return_block_hash() {
#[test]
fn should_return_finalized_hash() {
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
api.finalized_head(),
@@ -188,15 +188,12 @@ fn should_notify_about_latest_block() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_all_heads(Default::default(), subscriber);
// assert id assigned
- assert!(matches!(
- executor::block_on(id.compat()),
- Ok(Ok(SubscriptionId::String(_)))
- ));
+ assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
@@ -218,15 +215,12 @@ fn should_notify_about_best_block() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_new_heads(Default::default(), subscriber);
// assert id assigned
- assert!(matches!(
- executor::block_on(id.compat()),
- Ok(Ok(SubscriptionId::String(_)))
- ));
+ assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
@@ -248,15 +242,12 @@ fn should_notify_about_finalized_block() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_finalized_heads(Default::default(), subscriber);
// assert id assigned
- assert!(matches!(
- executor::block_on(id.compat()),
- Ok(Ok(SubscriptionId::String(_)))
- ));
+ assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs
index 53a63b449c87e..f979b0ab6957e 100644
--- a/client/rpc/src/lib.rs
+++ b/client/rpc/src/lib.rs
@@ -24,7 +24,7 @@
mod metadata;
-pub use sc_rpc_api::DenyUnsafe;
+pub use sc_rpc_api::{DenyUnsafe, Subscriptions};
pub use self::metadata::Metadata;
pub use rpc::IoHandlerExtension as RpcExtension;
diff --git a/client/rpc/src/state/mod.rs b/client/rpc/src/state/mod.rs
index 921cc7efc699d..168dc3e0105a4 100644
--- a/client/rpc/src/state/mod.rs
+++ b/client/rpc/src/state/mod.rs
@@ -25,10 +25,10 @@ mod state_light;
mod tests;
use std::sync::Arc;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use rpc::{Result as RpcResult, futures::{Future, future::result}};
-use sc_rpc_api::state::ReadProof;
+use sc_rpc_api::{Subscriptions, state::ReadProof};
use sc_client_api::light::{RemoteBlockchain, Fetcher};
use sp_core::{Bytes, storage::{StorageKey, PrefixedStorageKey, StorageData, StorageChangeSet}};
use sp_version::RuntimeVersion;
@@ -170,7 +170,7 @@ pub trait StateBackend: Send + Sync + 'static
/// Create new state API that works on full node.
pub fn new_full(
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
) -> (State, ChildState)
where
Block: BlockT + 'static,
@@ -191,7 +191,7 @@ pub fn new_full(
/// Create new state API that works on light node.
pub fn new_light>(
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
remote_blockchain: Arc>,
fetcher: Arc,
) -> (State, ChildState)
diff --git a/client/rpc/src/state/state_full.rs b/client/rpc/src/state/state_full.rs
index f0ae79a033b58..82f87e9acf223 100644
--- a/client/rpc/src/state/state_full.rs
+++ b/client/rpc/src/state/state_full.rs
@@ -21,10 +21,10 @@ use std::sync::Arc;
use std::ops::Range;
use futures::{future, StreamExt as _, TryStreamExt as _};
use log::warn;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use rpc::{Result as RpcResult, futures::{stream, Future, Sink, Stream, future::result}};
-use sc_rpc_api::state::ReadProof;
+use sc_rpc_api::{Subscriptions, state::ReadProof};
use sc_client_api::backend::Backend;
use sp_blockchain::{Result as ClientResult, Error as ClientError, HeaderMetadata, CachedHeaderMetadata, HeaderBackend};
use sc_client_api::BlockchainEvents;
@@ -60,7 +60,7 @@ struct QueryStorageRange {
/// State API backend for full nodes.
pub struct FullState {
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
_phantom: PhantomData<(BE, Block)>
}
@@ -72,7 +72,7 @@ impl FullState
Block: BlockT + 'static,
{
/// Create new state API backend for full nodes.
- pub fn new(client: Arc, subscriptions: SubscriptionManager) -> Self {
+ pub fn new(client: Arc, subscriptions: Subscriptions) -> Self {
Self { client, subscriptions, _phantom: PhantomData }
}
diff --git a/client/rpc/src/state/state_light.rs b/client/rpc/src/state/state_light.rs
index ec275a2d78b79..af5d4248e3a42 100644
--- a/client/rpc/src/state/state_light.rs
+++ b/client/rpc/src/state/state_light.rs
@@ -28,7 +28,7 @@ use futures::{
StreamExt as _, TryStreamExt as _,
};
use hash_db::Hasher;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use log::warn;
use parking_lot::Mutex;
use rpc::{
@@ -38,7 +38,7 @@ use rpc::{
futures::stream::Stream,
};
-use sc_rpc_api::state::ReadProof;
+use sc_rpc_api::{Subscriptions, state::ReadProof};
use sp_blockchain::{Error as ClientError, HeaderBackend};
use sc_client_api::{
BlockchainEvents,
@@ -63,7 +63,7 @@ type StorageMap = HashMap>;
#[derive(Clone)]
pub struct LightState, Client> {
client: Arc,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
version_subscriptions: SimpleSubscriptions,
storage_subscriptions: Arc>>,
remote_blockchain: Arc>,
@@ -143,7 +143,7 @@ impl + 'static, Client> LightState,
- subscriptions: SubscriptionManager,
+ subscriptions: Subscriptions,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Self {
diff --git a/client/rpc/src/state/tests.rs b/client/rpc/src/state/tests.rs
index 0cc16ce8d5e92..a610cbbfc8285 100644
--- a/client/rpc/src/state/tests.rs
+++ b/client/rpc/src/state/tests.rs
@@ -55,7 +55,7 @@ fn should_return_storage() {
.add_extra_child_storage(&child_info, KEY.to_vec(), CHILD_VALUE.to_vec())
.build();
let genesis_hash = client.genesis_hash();
- let (client, child) = new_full(Arc::new(client), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (client, child) = new_full(Arc::new(client), Subscriptions::new(Arc::new(TaskExecutor)));
let key = StorageKey(KEY.to_vec());
assert_eq!(
@@ -90,7 +90,7 @@ fn should_return_child_storage() {
.add_child_storage(&child_info, "key", vec![42_u8])
.build());
let genesis_hash = client.genesis_hash();
- let (_client, child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (_client, child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
let child_key = prefixed_storage_key();
let key = StorageKey(b"key".to_vec());
@@ -125,7 +125,7 @@ fn should_return_child_storage() {
fn should_call_contract() {
let client = Arc::new(substrate_test_runtime_client::new());
let genesis_hash = client.genesis_hash();
- let (client, _child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (client, _child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
assert_matches!(
client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(),
@@ -139,15 +139,12 @@ fn should_notify_about_storage_changes() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned
- assert!(matches!(
- executor::block_on(id.compat()),
- Ok(Ok(SubscriptionId::String(_)))
- ));
+ assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -173,7 +170,7 @@ fn should_send_initial_storage_changes_and_notifications() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into()));
@@ -182,10 +179,7 @@ fn should_send_initial_storage_changes_and_notifications() {
]).into());
// assert id assigned
- assert!(matches!(
- executor::block_on(id.compat()),
- Ok(Ok(SubscriptionId::String(_)))
- ));
+ assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -211,7 +205,7 @@ fn should_send_initial_storage_changes_and_notifications() {
#[test]
fn should_query_storage() {
fn run_tests(mut client: Arc, has_changes_trie_config: bool) {
- let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let mut add_block = |nonce| {
let mut builder = client.new_block(Default::default()).unwrap();
@@ -428,7 +422,7 @@ fn should_split_ranges() {
#[test]
fn should_return_runtime_version() {
let client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\
\"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",3],\
@@ -451,16 +445,12 @@ fn should_notify_on_runtime_version_initially() {
{
let client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
api.subscribe_runtime_version(Default::default(), subscriber);
// assert id assigned
- assert!(matches!(
- executor::block_on(id.compat()),
- Ok(Ok(SubscriptionId::String(_)))
- ));
-
+ assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
}
// assert initial version sent.
diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml
index c72f8226feb80..fc5991bc3f131 100644
--- a/client/service/Cargo.toml
+++ b/client/service/Cargo.toml
@@ -26,7 +26,6 @@ test-helpers = []
derive_more = "0.99.2"
futures01 = { package = "futures", version = "0.1.29" }
futures = { version = "0.3.4", features = ["compat"] }
-jsonrpc-pubsub = "14.2"
rand = "0.7.3"
parking_lot = "0.10.0"
lazy_static = "1.4.0"
diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs
index d0cdaac5b7b68..baf1c2e0cceff 100644
--- a/client/service/src/builder.rs
+++ b/client/service/src/builder.rs
@@ -36,7 +36,6 @@ use futures::{
Future, FutureExt, StreamExt,
future::ready,
};
-use jsonrpc_pubsub::manager::SubscriptionManager;
use sc_keystore::Store as Keystore;
use log::{info, warn, error};
use sc_network::config::{Role, FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
@@ -1197,7 +1196,7 @@ ServiceBuilder<
chain_type: chain_spec.chain_type().clone(),
};
- let subscriptions = SubscriptionManager::new(Arc::new(task_manager.spawn_handle()));
+ let subscriptions = sc_rpc::Subscriptions::new(Arc::new(task_manager.spawn_handle()));
let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend.as_ref(), on_demand.as_ref()) {
diff --git a/frame/contracts/rpc/Cargo.toml b/frame/contracts/rpc/Cargo.toml
index d521be6d2a226..8ed233ed79f7d 100644
--- a/frame/contracts/rpc/Cargo.toml
+++ b/frame/contracts/rpc/Cargo.toml
@@ -13,9 +13,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "1.3.0" }
-jsonrpc-core = "14.2"
-jsonrpc-core-client = "14.2"
-jsonrpc-derive = "14.2.1"
+jsonrpc-core = "14.0.3"
+jsonrpc-core-client = "14.0.5"
+jsonrpc-derive = "14.0.3"
sp-blockchain = { version = "2.0.0-rc2", path = "../../../primitives/blockchain" }
sp-core = { version = "2.0.0-rc2", path = "../../../primitives/core" }
sp-rpc = { version = "2.0.0-rc2", path = "../../../primitives/rpc" }
diff --git a/frame/transaction-payment/rpc/Cargo.toml b/frame/transaction-payment/rpc/Cargo.toml
index adecfd0aabb84..3ca2f4be8e308 100644
--- a/frame/transaction-payment/rpc/Cargo.toml
+++ b/frame/transaction-payment/rpc/Cargo.toml
@@ -13,9 +13,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "1.3.0" }
-jsonrpc-core = "14.2"
-jsonrpc-core-client = "14.2"
-jsonrpc-derive = "14.2.1"
+jsonrpc-core = "14.0.3"
+jsonrpc-core-client = "14.0.5"
+jsonrpc-derive = "14.0.3"
sp-core = { version = "2.0.0-rc2", path = "../../../primitives/core" }
sp-rpc = { version = "2.0.0-rc2", path = "../../../primitives/rpc" }
serde = { version = "1.0.101", features = ["derive"] }
diff --git a/utils/frame/rpc/support/Cargo.toml b/utils/frame/rpc/support/Cargo.toml
index a64b23b6a987c..006372eb364e7 100644
--- a/utils/frame/rpc/support/Cargo.toml
+++ b/utils/frame/rpc/support/Cargo.toml
@@ -13,8 +13,8 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = { version = "0.3.0", features = ["compat"] }
-jsonrpc-client-transports = { version = "14.2", default-features = false, features = ["http"] }
-jsonrpc-core = "14.2"
+jsonrpc-client-transports = { version = "14.0.5", default-features = false, features = ["http"] }
+jsonrpc-core = "14"
codec = { package = "parity-scale-codec", version = "1" }
serde = "1"
frame-support = { version = "2.0.0-rc2", path = "../../../../frame/support" }
diff --git a/utils/frame/rpc/system/Cargo.toml b/utils/frame/rpc/system/Cargo.toml
index d4878a4f289c1..f757e811fb96d 100644
--- a/utils/frame/rpc/system/Cargo.toml
+++ b/utils/frame/rpc/system/Cargo.toml
@@ -15,9 +15,9 @@ targets = ["x86_64-unknown-linux-gnu"]
sc-client-api = { version = "2.0.0-rc2", path = "../../../../client/api" }
codec = { package = "parity-scale-codec", version = "1.3.0" }
futures = { version = "0.3.4", features = ["compat"] }
-jsonrpc-core = "14.2"
-jsonrpc-core-client = "14.2"
-jsonrpc-derive = "14.2.1"
+jsonrpc-core = "14.0.3"
+jsonrpc-core-client = "14.0.5"
+jsonrpc-derive = "14.0.3"
log = "0.4.8"
serde = { version = "1.0.101", features = ["derive"] }
sp-runtime = { version = "2.0.0-rc2", path = "../../../../primitives/runtime" }