- + Send + 'static,
- {
- let id = self.next_id.next_id();
- let subscription_id: SubscriptionId = id.into();
- if let Ok(sink) = subscriber.assign_id(subscription_id.clone()) {
- let (tx, rx) = oneshot::channel();
- let future = into_future(sink)
- .into_future()
- .select(rx.map_err(|e| warn!("Error timeing out: {:?}", e)))
- .then(|_| Ok(()));
-
- self.active_subscriptions.lock().insert(id, tx);
- if self.executor.execute(Box::new(future)).is_err() {
- error!("Failed to spawn RPC subscription task");
- }
- }
-
- subscription_id
- }
-
- /// Cancel subscription.
- ///
- /// Returns true if subscription existed or false otherwise.
- pub fn cancel(&self, id: SubscriptionId) -> bool {
- if let SubscriptionId::Number(id) = id {
- if let Some(tx) = self.active_subscriptions.lock().remove(&id) {
- let _ = tx.send(());
- return true;
- }
- }
- false
- }
-}
diff --git a/client/rpc-servers/Cargo.toml b/client/rpc-servers/Cargo.toml
index 401f5f4882530..83ef5d71335c7 100644
--- a/client/rpc-servers/Cargo.toml
+++ b/client/rpc-servers/Cargo.toml
@@ -12,13 +12,13 @@ description = "Substrate RPC servers."
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
-jsonrpc-core = "14.0.3"
-pubsub = { package = "jsonrpc-pubsub", version = "14.0.3" }
+jsonrpc-core = "14.2.0"
+pubsub = { package = "jsonrpc-pubsub", version = "14.2.0" }
log = "0.4.8"
serde = "1.0.101"
serde_json = "1.0.41"
sp-runtime = { version = "2.0.0-rc2", path = "../../primitives/runtime" }
[target.'cfg(not(target_os = "unknown"))'.dependencies]
-http = { package = "jsonrpc-http-server", version = "14.0.3" }
-ws = { package = "jsonrpc-ws-server", version = "14.0.3" }
+http = { package = "jsonrpc-http-server", version = "14.2.0" }
+ws = { package = "jsonrpc-ws-server", version = "14.2.0" }
diff --git a/client/rpc/Cargo.toml b/client/rpc/Cargo.toml
index 62f93195758b9..4d564e8601518 100644
--- a/client/rpc/Cargo.toml
+++ b/client/rpc/Cargo.toml
@@ -17,10 +17,10 @@ sc-client-api = { version = "2.0.0-rc2", path = "../api" }
sp-api = { version = "2.0.0-rc2", path = "../../primitives/api" }
codec = { package = "parity-scale-codec", version = "1.3.0" }
futures = { version = "0.3.1", features = ["compat"] }
-jsonrpc-pubsub = "14.0.3"
+jsonrpc-pubsub = "14.2.0"
log = "0.4.8"
sp-core = { version = "2.0.0-rc2", path = "../../primitives/core" }
-rpc = { package = "jsonrpc-core", version = "14.0.3" }
+rpc = { package = "jsonrpc-core", version = "14.2.0" }
sp-version = { version = "2.0.0-rc2", path = "../../primitives/version" }
serde_json = "1.0.41"
sp-session = { version = "2.0.0-rc2", path = "../../primitives/session" }
diff --git a/client/rpc/src/author/mod.rs b/client/rpc/src/author/mod.rs
index d59fad354ef3c..974c1b85e1b39 100644
--- a/client/rpc/src/author/mod.rs
+++ b/client/rpc/src/author/mod.rs
@@ -32,8 +32,8 @@ use rpc::futures::{
};
use futures::{StreamExt as _, compat::Compat};
use futures::future::{ready, FutureExt, TryFutureExt};
-use sc_rpc_api::{DenyUnsafe, Subscriptions};
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
+use sc_rpc_api::DenyUnsafe;
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use codec::{Encode, Decode};
use sp_core::{Bytes, traits::BareCryptoStorePtr};
use sp_api::ProvideRuntimeApi;
@@ -55,7 +55,7 @@ pub struct Author
{
/// Transactions pool
pool: Arc
,
/// Subscriptions manager
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
/// The key store.
keystore: BareCryptoStorePtr,
/// Whether to deny unsafe calls
@@ -67,7 +67,7 @@ impl
Author
{
pub fn new(
client: Arc,
pool: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
keystore: BareCryptoStorePtr,
deny_unsafe: DenyUnsafe,
) -> Self {
@@ -81,7 +81,6 @@ impl
Author
{
}
}
-
/// Currently we treat all RPC transactions as externals.
///
/// Possibly in the future we could allow opt-in for special treatment
diff --git a/client/rpc/src/author/tests.rs b/client/rpc/src/author/tests.rs
index d70a2ce2aff99..f2f4ddebb2f1d 100644
--- a/client/rpc/src/author/tests.rs
+++ b/client/rpc/src/author/tests.rs
@@ -79,7 +79,7 @@ impl TestSetup {
Author {
client: self.client.clone(),
pool: self.pool.clone(),
- subscriptions: Subscriptions::new(Arc::new(crate::testing::TaskExecutor)),
+ subscriptions: SubscriptionManager::new(Arc::new(crate::testing::TaskExecutor)),
keystore: self.keystore.clone(),
deny_unsafe: DenyUnsafe::No,
}
@@ -131,8 +131,14 @@ fn should_watch_extrinsic() {
uxt(AccountKeyring::Alice, 0).encode().into(),
);
- // then
- assert_eq!(executor::block_on(id_rx.compat()), Ok(Ok(1.into())));
+ let id = executor::block_on(id_rx.compat()).unwrap().unwrap();
+ assert_matches!(id, SubscriptionId::String(_));
+
+ let id = match id {
+ SubscriptionId::String(id) => id,
+ _ => unreachable!(),
+ };
+
// check notifications
let replacement = {
let tx = Transfer {
@@ -145,15 +151,22 @@ fn should_watch_extrinsic() {
};
AuthorApi::submit_extrinsic(&p, replacement.encode().into()).wait().unwrap();
let (res, data) = executor::block_on(data.into_future().compat()).unwrap();
- assert_eq!(
- res,
- Some(r#"{"jsonrpc":"2.0","method":"test","params":{"result":"ready","subscription":1}}"#.into())
- );
+
+ let expected = Some(format!(
+ r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":"ready","subscription":"{}"}}}}"#,
+ id,
+ ));
+ assert_eq!(res, expected);
+
let h = blake2_256(&replacement.encode());
- assert_eq!(
- executor::block_on(data.into_future().compat()).unwrap().0,
- Some(format!(r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":1}}}}"#, HexDisplay::from(&h)))
- );
+ let expected = Some(format!(
+ r#"{{"jsonrpc":"2.0","method":"test","params":{{"result":{{"usurped":"0x{}"}},"subscription":"{}"}}}}"#,
+ HexDisplay::from(&h),
+ id,
+ ));
+
+ let res = executor::block_on(data.into_future().compat()).unwrap().0;
+ assert_eq!(res, expected);
}
#[test]
diff --git a/client/rpc/src/chain/chain_full.rs b/client/rpc/src/chain/chain_full.rs
index c1b062754bdac..816dbba866417 100644
--- a/client/rpc/src/chain/chain_full.rs
+++ b/client/rpc/src/chain/chain_full.rs
@@ -18,8 +18,8 @@
use std::sync::Arc;
use rpc::futures::future::result;
+use jsonrpc_pubsub::manager::SubscriptionManager;
-use sc_rpc_api::Subscriptions;
use sc_client_api::{BlockchainEvents, BlockBackend};
use sp_runtime::{generic::{BlockId, SignedBlock}, traits::{Block as BlockT}};
@@ -32,14 +32,14 @@ pub struct FullChain {
/// Substrate client.
client: Arc,
/// Current subscriptions.
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
/// phantom member to pin the block type
_phantom: PhantomData,
}
impl FullChain {
/// Create new Chain API RPC handler.
- pub fn new(client: Arc, subscriptions: Subscriptions) -> Self {
+ pub fn new(client: Arc, subscriptions: SubscriptionManager) -> Self {
Self {
client,
subscriptions,
@@ -56,7 +56,7 @@ impl ChainBackend for FullChain whe
&self.client
}
- fn subscriptions(&self) -> &Subscriptions {
+ fn subscriptions(&self) -> &SubscriptionManager {
&self.subscriptions
}
diff --git a/client/rpc/src/chain/chain_light.rs b/client/rpc/src/chain/chain_light.rs
index 059233089d05d..8a4afbed71c16 100644
--- a/client/rpc/src/chain/chain_light.rs
+++ b/client/rpc/src/chain/chain_light.rs
@@ -19,8 +19,8 @@
use std::sync::Arc;
use futures::{future::ready, FutureExt, TryFutureExt};
use rpc::futures::future::{result, Future, Either};
+use jsonrpc_pubsub::manager::SubscriptionManager;
-use sc_rpc_api::Subscriptions;
use sc_client_api::light::{Fetcher, RemoteBodyRequest, RemoteBlockchain};
use sp_runtime::{
generic::{BlockId, SignedBlock},
@@ -37,7 +37,7 @@ pub struct LightChain {
/// Substrate client.
client: Arc,
/// Current subscriptions.
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
/// Remote blockchain reference
remote_blockchain: Arc>,
/// Remote fetcher reference.
@@ -48,7 +48,7 @@ impl> LightChain {
/// Create new Chain API RPC handler.
pub fn new(
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Self {
@@ -70,7 +70,7 @@ impl ChainBackend for LightChain &Subscriptions {
+ fn subscriptions(&self) -> &SubscriptionManager {
&self.subscriptions
}
diff --git a/client/rpc/src/chain/mod.rs b/client/rpc/src/chain/mod.rs
index 6d53fbbb06f6e..7b13e7a6005ff 100644
--- a/client/rpc/src/chain/mod.rs
+++ b/client/rpc/src/chain/mod.rs
@@ -32,9 +32,8 @@ use rpc::{
futures::{stream, Future, Sink, Stream},
};
-use sc_rpc_api::Subscriptions;
use sc_client_api::{BlockchainEvents, light::{Fetcher, RemoteBlockchain}};
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use sp_rpc::{number::NumberOrHex, list::ListOrValue};
use sp_runtime::{
generic::{BlockId, SignedBlock},
@@ -57,7 +56,7 @@ trait ChainBackend: Send + Sync + 'static
fn client(&self) -> &Arc;
/// Get subscriptions reference.
- fn subscriptions(&self) -> &Subscriptions;
+ fn subscriptions(&self) -> &SubscriptionManager;
/// Tries to unwrap passed block hash, or uses best block hash otherwise.
fn unwrap_or_best(&self, hash: Option) -> Block::Hash {
@@ -177,7 +176,7 @@ trait ChainBackend: Send + Sync + 'static
/// Create new state API that works on full node.
pub fn new_full(
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
) -> Chain
where
Block: BlockT + 'static,
@@ -191,7 +190,7 @@ pub fn new_full(
/// Create new state API that works on light node.
pub fn new_light>(
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Chain
@@ -279,7 +278,7 @@ impl ChainApi, Block::Hash, Block::Header, Signe
/// Subscribe to new headers.
fn subscribe_headers(
client: &Arc,
- subscriptions: &Subscriptions,
+ subscriptions: &SubscriptionManager,
subscriber: Subscriber,
best_block_hash: G,
stream: F,
diff --git a/client/rpc/src/chain/tests.rs b/client/rpc/src/chain/tests.rs
index e86d1d547fbde..68d46135e36b1 100644
--- a/client/rpc/src/chain/tests.rs
+++ b/client/rpc/src/chain/tests.rs
@@ -31,7 +31,7 @@ use crate::testing::TaskExecutor;
#[test]
fn should_return_header() {
let client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
assert_matches!(
api.header(Some(client.genesis_hash()).into()).wait(),
@@ -63,7 +63,7 @@ fn should_return_header() {
#[test]
fn should_return_a_block() {
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
let block_hash = block.hash();
@@ -114,7 +114,7 @@ fn should_return_a_block() {
#[test]
fn should_return_block_hash() {
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
assert_matches!(
api.block_hash(None.into()),
@@ -158,7 +158,7 @@ fn should_return_block_hash() {
#[test]
fn should_return_finalized_hash() {
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
assert_matches!(
api.finalized_head(),
@@ -188,12 +188,15 @@ fn should_notify_about_latest_block() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
api.subscribe_all_heads(Default::default(), subscriber);
// assert id assigned
- assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
+ assert!(matches!(
+ executor::block_on(id.compat()),
+ Ok(Ok(SubscriptionId::String(_)))
+ ));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
@@ -215,12 +218,15 @@ fn should_notify_about_best_block() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
api.subscribe_new_heads(Default::default(), subscriber);
// assert id assigned
- assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
+ assert!(matches!(
+ executor::block_on(id.compat()),
+ Ok(Ok(SubscriptionId::String(_)))
+ ));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
@@ -242,12 +248,15 @@ fn should_notify_about_finalized_block() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let api = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let api = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
api.subscribe_finalized_heads(Default::default(), subscriber);
// assert id assigned
- assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
+ assert!(matches!(
+ executor::block_on(id.compat()),
+ Ok(Ok(SubscriptionId::String(_)))
+ ));
let block = client.new_block(Default::default()).unwrap().build().unwrap().block;
client.import(BlockOrigin::Own, block).unwrap();
diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs
index f979b0ab6957e..53a63b449c87e 100644
--- a/client/rpc/src/lib.rs
+++ b/client/rpc/src/lib.rs
@@ -24,7 +24,7 @@
mod metadata;
-pub use sc_rpc_api::{DenyUnsafe, Subscriptions};
+pub use sc_rpc_api::DenyUnsafe;
pub use self::metadata::Metadata;
pub use rpc::IoHandlerExtension as RpcExtension;
diff --git a/client/rpc/src/state/mod.rs b/client/rpc/src/state/mod.rs
index 168dc3e0105a4..921cc7efc699d 100644
--- a/client/rpc/src/state/mod.rs
+++ b/client/rpc/src/state/mod.rs
@@ -25,10 +25,10 @@ mod state_light;
mod tests;
use std::sync::Arc;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use rpc::{Result as RpcResult, futures::{Future, future::result}};
-use sc_rpc_api::{Subscriptions, state::ReadProof};
+use sc_rpc_api::state::ReadProof;
use sc_client_api::light::{RemoteBlockchain, Fetcher};
use sp_core::{Bytes, storage::{StorageKey, PrefixedStorageKey, StorageData, StorageChangeSet}};
use sp_version::RuntimeVersion;
@@ -170,7 +170,7 @@ pub trait StateBackend: Send + Sync + 'static
/// Create new state API that works on full node.
pub fn new_full(
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
) -> (State, ChildState)
where
Block: BlockT + 'static,
@@ -191,7 +191,7 @@ pub fn new_full(
/// Create new state API that works on light node.
pub fn new_light>(
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
remote_blockchain: Arc>,
fetcher: Arc,
) -> (State, ChildState)
diff --git a/client/rpc/src/state/state_full.rs b/client/rpc/src/state/state_full.rs
index 82f87e9acf223..f0ae79a033b58 100644
--- a/client/rpc/src/state/state_full.rs
+++ b/client/rpc/src/state/state_full.rs
@@ -21,10 +21,10 @@ use std::sync::Arc;
use std::ops::Range;
use futures::{future, StreamExt as _, TryStreamExt as _};
use log::warn;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use rpc::{Result as RpcResult, futures::{stream, Future, Sink, Stream, future::result}};
-use sc_rpc_api::{Subscriptions, state::ReadProof};
+use sc_rpc_api::state::ReadProof;
use sc_client_api::backend::Backend;
use sp_blockchain::{Result as ClientResult, Error as ClientError, HeaderMetadata, CachedHeaderMetadata, HeaderBackend};
use sc_client_api::BlockchainEvents;
@@ -60,7 +60,7 @@ struct QueryStorageRange {
/// State API backend for full nodes.
pub struct FullState {
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
_phantom: PhantomData<(BE, Block)>
}
@@ -72,7 +72,7 @@ impl FullState
Block: BlockT + 'static,
{
/// Create new state API backend for full nodes.
- pub fn new(client: Arc, subscriptions: Subscriptions) -> Self {
+ pub fn new(client: Arc, subscriptions: SubscriptionManager) -> Self {
Self { client, subscriptions, _phantom: PhantomData }
}
diff --git a/client/rpc/src/state/state_light.rs b/client/rpc/src/state/state_light.rs
index af5d4248e3a42..ec275a2d78b79 100644
--- a/client/rpc/src/state/state_light.rs
+++ b/client/rpc/src/state/state_light.rs
@@ -28,7 +28,7 @@ use futures::{
StreamExt as _, TryStreamExt as _,
};
use hash_db::Hasher;
-use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
+use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId, manager::SubscriptionManager};
use log::warn;
use parking_lot::Mutex;
use rpc::{
@@ -38,7 +38,7 @@ use rpc::{
futures::stream::Stream,
};
-use sc_rpc_api::{Subscriptions, state::ReadProof};
+use sc_rpc_api::state::ReadProof;
use sp_blockchain::{Error as ClientError, HeaderBackend};
use sc_client_api::{
BlockchainEvents,
@@ -63,7 +63,7 @@ type StorageMap = HashMap>;
#[derive(Clone)]
pub struct LightState, Client> {
client: Arc,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
version_subscriptions: SimpleSubscriptions,
storage_subscriptions: Arc>>,
remote_blockchain: Arc>,
@@ -143,7 +143,7 @@ impl + 'static, Client> LightState,
- subscriptions: Subscriptions,
+ subscriptions: SubscriptionManager,
remote_blockchain: Arc>,
fetcher: Arc,
) -> Self {
diff --git a/client/rpc/src/state/tests.rs b/client/rpc/src/state/tests.rs
index a610cbbfc8285..0cc16ce8d5e92 100644
--- a/client/rpc/src/state/tests.rs
+++ b/client/rpc/src/state/tests.rs
@@ -55,7 +55,7 @@ fn should_return_storage() {
.add_extra_child_storage(&child_info, KEY.to_vec(), CHILD_VALUE.to_vec())
.build();
let genesis_hash = client.genesis_hash();
- let (client, child) = new_full(Arc::new(client), Subscriptions::new(Arc::new(TaskExecutor)));
+ let (client, child) = new_full(Arc::new(client), SubscriptionManager::new(Arc::new(TaskExecutor)));
let key = StorageKey(KEY.to_vec());
assert_eq!(
@@ -90,7 +90,7 @@ fn should_return_child_storage() {
.add_child_storage(&child_info, "key", vec![42_u8])
.build());
let genesis_hash = client.genesis_hash();
- let (_client, child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
+ let (_client, child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)));
let child_key = prefixed_storage_key();
let key = StorageKey(b"key".to_vec());
@@ -125,7 +125,7 @@ fn should_return_child_storage() {
fn should_call_contract() {
let client = Arc::new(substrate_test_runtime_client::new());
let genesis_hash = client.genesis_hash();
- let (client, _child) = new_full(client, Subscriptions::new(Arc::new(TaskExecutor)));
+ let (client, _child) = new_full(client, SubscriptionManager::new(Arc::new(TaskExecutor)));
assert_matches!(
client.call("balanceOf".into(), Bytes(vec![1,2,3]), Some(genesis_hash).into()).wait(),
@@ -139,12 +139,15 @@ fn should_notify_about_storage_changes() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
api.subscribe_storage(Default::default(), subscriber, None.into());
// assert id assigned
- assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
+ assert!(matches!(
+ executor::block_on(id.compat()),
+ Ok(Ok(SubscriptionId::String(_)))
+ ));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -170,7 +173,7 @@ fn should_send_initial_storage_changes_and_notifications() {
{
let mut client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let alice_balance_key = blake2_256(&runtime::system::balance_of_key(AccountKeyring::Alice.into()));
@@ -179,7 +182,10 @@ fn should_send_initial_storage_changes_and_notifications() {
]).into());
// assert id assigned
- assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
+ assert!(matches!(
+ executor::block_on(id.compat()),
+ Ok(Ok(SubscriptionId::String(_)))
+ ));
let mut builder = client.new_block(Default::default()).unwrap();
builder.push_transfer(runtime::Transfer {
@@ -205,7 +211,7 @@ fn should_send_initial_storage_changes_and_notifications() {
#[test]
fn should_query_storage() {
fn run_tests(mut client: Arc, has_changes_trie_config: bool) {
- let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let mut add_block = |nonce| {
let mut builder = client.new_block(Default::default()).unwrap();
@@ -422,7 +428,7 @@ fn should_split_ranges() {
#[test]
fn should_return_runtime_version() {
let client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
let result = "{\"specName\":\"test\",\"implName\":\"parity-test\",\"authoringVersion\":1,\
\"specVersion\":2,\"implVersion\":2,\"apis\":[[\"0xdf6acb689907609b\",3],\
@@ -445,12 +451,16 @@ fn should_notify_on_runtime_version_initially() {
{
let client = Arc::new(substrate_test_runtime_client::new());
- let (api, _child) = new_full(client.clone(), Subscriptions::new(Arc::new(TaskExecutor)));
+ let (api, _child) = new_full(client.clone(), SubscriptionManager::new(Arc::new(TaskExecutor)));
api.subscribe_runtime_version(Default::default(), subscriber);
// assert id assigned
- assert_eq!(executor::block_on(id.compat()), Ok(Ok(SubscriptionId::Number(1))));
+ assert!(matches!(
+ executor::block_on(id.compat()),
+ Ok(Ok(SubscriptionId::String(_)))
+ ));
+
}
// assert initial version sent.
diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml
index acc75d6890f68..f40999bf1cb8d 100644
--- a/client/service/Cargo.toml
+++ b/client/service/Cargo.toml
@@ -26,6 +26,7 @@ test-helpers = []
derive_more = "0.99.2"
futures01 = { package = "futures", version = "0.1.29" }
futures = { version = "0.3.4", features = ["compat"] }
+jsonrpc-pubsub = "14.2.0"
rand = "0.7.3"
parking_lot = "0.10.0"
lazy_static = "1.4.0"
diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs
index e9fa1ff3e280a..16500baae1e7b 100644
--- a/client/service/src/builder.rs
+++ b/client/service/src/builder.rs
@@ -37,6 +37,7 @@ use futures::{
Future, FutureExt, StreamExt,
future::ready,
};
+use jsonrpc_pubsub::manager::SubscriptionManager;
use sc_keystore::Store as Keystore;
use log::{info, warn, error};
use sc_network::config::{Role, FinalityProofProvider, OnDemand, BoxFinalityProofRequestBuilder};
@@ -1204,7 +1205,7 @@ ServiceBuilder<
chain_type: chain_spec.chain_type().clone(),
};
- let subscriptions = sc_rpc::Subscriptions::new(Arc::new(task_manager.spawn_handle()));
+ let subscriptions = SubscriptionManager::new(Arc::new(task_manager.spawn_handle()));
let (chain, state, child_state) = if let (Some(remote_backend), Some(on_demand)) =
(remote_backend.as_ref(), on_demand.as_ref()) {
diff --git a/frame/contracts/rpc/Cargo.toml b/frame/contracts/rpc/Cargo.toml
index 8ed233ed79f7d..db6a1a6b3e32e 100644
--- a/frame/contracts/rpc/Cargo.toml
+++ b/frame/contracts/rpc/Cargo.toml
@@ -13,9 +13,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "1.3.0" }
-jsonrpc-core = "14.0.3"
-jsonrpc-core-client = "14.0.5"
-jsonrpc-derive = "14.0.3"
+jsonrpc-core = "14.2.0"
+jsonrpc-core-client = "14.2.0"
+jsonrpc-derive = "14.2.1"
sp-blockchain = { version = "2.0.0-rc2", path = "../../../primitives/blockchain" }
sp-core = { version = "2.0.0-rc2", path = "../../../primitives/core" }
sp-rpc = { version = "2.0.0-rc2", path = "../../../primitives/rpc" }
diff --git a/frame/transaction-payment/rpc/Cargo.toml b/frame/transaction-payment/rpc/Cargo.toml
index 3ca2f4be8e308..dffb8f0cca60e 100644
--- a/frame/transaction-payment/rpc/Cargo.toml
+++ b/frame/transaction-payment/rpc/Cargo.toml
@@ -13,9 +13,9 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
codec = { package = "parity-scale-codec", version = "1.3.0" }
-jsonrpc-core = "14.0.3"
-jsonrpc-core-client = "14.0.5"
-jsonrpc-derive = "14.0.3"
+jsonrpc-core = "14.2.0"
+jsonrpc-core-client = "14.2.0"
+jsonrpc-derive = "14.2.1"
sp-core = { version = "2.0.0-rc2", path = "../../../primitives/core" }
sp-rpc = { version = "2.0.0-rc2", path = "../../../primitives/rpc" }
serde = { version = "1.0.101", features = ["derive"] }
diff --git a/utils/frame/rpc/support/Cargo.toml b/utils/frame/rpc/support/Cargo.toml
index 006372eb364e7..e8322aaf1da43 100644
--- a/utils/frame/rpc/support/Cargo.toml
+++ b/utils/frame/rpc/support/Cargo.toml
@@ -13,8 +13,8 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
futures = { version = "0.3.0", features = ["compat"] }
-jsonrpc-client-transports = { version = "14.0.5", default-features = false, features = ["http"] }
-jsonrpc-core = "14"
+jsonrpc-client-transports = { version = "14.2.0", default-features = false, features = ["http"] }
+jsonrpc-core = "14.2.0"
codec = { package = "parity-scale-codec", version = "1" }
serde = "1"
frame-support = { version = "2.0.0-rc2", path = "../../../../frame/support" }
diff --git a/utils/frame/rpc/system/Cargo.toml b/utils/frame/rpc/system/Cargo.toml
index f757e811fb96d..5a350d911af2e 100644
--- a/utils/frame/rpc/system/Cargo.toml
+++ b/utils/frame/rpc/system/Cargo.toml
@@ -15,9 +15,9 @@ targets = ["x86_64-unknown-linux-gnu"]
sc-client-api = { version = "2.0.0-rc2", path = "../../../../client/api" }
codec = { package = "parity-scale-codec", version = "1.3.0" }
futures = { version = "0.3.4", features = ["compat"] }
-jsonrpc-core = "14.0.3"
-jsonrpc-core-client = "14.0.5"
-jsonrpc-derive = "14.0.3"
+jsonrpc-core = "14.2.0"
+jsonrpc-core-client = "14.2.0"
+jsonrpc-derive = "14.2.1"
log = "0.4.8"
serde = { version = "1.0.101", features = ["derive"] }
sp-runtime = { version = "2.0.0-rc2", path = "../../../../primitives/runtime" }