From 8c4bb2fb127b72ec02b42ede36edd4eab674f3a7 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 16 Nov 2021 18:18:56 +0100 Subject: [PATCH 1/2] fix chain subscription: less boiler plate --- client/rpc/src/chain/chain_full.rs | 115 ++++++++++++++++++++++++----- client/rpc/src/chain/helpers.rs | 95 ------------------------ client/rpc/src/chain/mod.rs | 1 - client/rpc/src/chain/tests.rs | 71 ++++++++---------- 4 files changed, 130 insertions(+), 152 deletions(-) delete mode 100644 client/rpc/src/chain/helpers.rs diff --git a/client/rpc/src/chain/chain_full.rs b/client/rpc/src/chain/chain_full.rs index 4d6dd9a46446e..6c5e8c0827586 100644 --- a/client/rpc/src/chain/chain_full.rs +++ b/client/rpc/src/chain/chain_full.rs @@ -19,10 +19,14 @@ //! Blockchain API backend for full nodes. use super::{client_err, ChainBackend, Error}; -use crate::{chain::helpers, SubscriptionTaskExecutor}; +use crate::SubscriptionTaskExecutor; use std::{marker::PhantomData, sync::Arc}; -use futures::task::Spawn; +use futures::{ + stream::{self, Stream, StreamExt}, + future, + task::Spawn, +}; use jsonrpsee::ws_server::SubscriptionSink; use sc_client_api::{BlockBackend, BlockchainEvents}; use sp_blockchain::HeaderBackend; @@ -68,27 +72,104 @@ where } fn subscribe_all_heads(&self, sink: SubscriptionSink) -> Result<(), Error> { - let client = self.client.clone(); - let executor = self.executor.clone(); - - let fut = helpers::subscribe_headers(client, sink, "chain_subscribeAllHeads"); - executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e))) + subscribe_headers( + &self.client, + self.executor.clone(), + "chain_subscribeAllHeads", + sink, + || self.client().info().best_hash, + || { + self.client() + .import_notification_stream() + .map(|notification| notification.header) + }, + ) } fn subscribe_new_heads(&self, sink: SubscriptionSink) -> Result<(), Error> { - let client = self.client.clone(); - let executor = self.executor.clone(); - - let fut = helpers::subscribe_headers(client, sink, "chain_subscribeNewHeads"); - executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e))) + subscribe_headers( + &self.client, + self.executor.clone(), + "chain_subscribeNewHeads", + sink, + || self.client().info().best_hash, + || { + self.client() + .import_notification_stream() + .filter(|notification| future::ready(notification.is_new_best)) + .map(|notification| notification.header) + }, + ) } fn subscribe_finalized_heads(&self, sink: SubscriptionSink) -> Result<(), Error> { - let client = self.client.clone(); - let executor = self.executor.clone(); + subscribe_headers( + &self.client, + self.executor.clone(), + "chain_subscribeFinalizedHeads", + sink, + || self.client().info().finalized_hash, + || { + self.client() + .finality_notification_stream() + .map(|notification| notification.header) + }, + ) + } +} + +/// Subscribe to new headers. +fn subscribe_headers( + client: &Arc, + executor: SubscriptionTaskExecutor, + method: &'static str, + mut sink: SubscriptionSink, + best_block_hash: G, + stream: F, +) -> Result<(), Error> +where + Block: BlockT + 'static, + Block::Header: Unpin, + Client: HeaderBackend + 'static, + F: FnOnce() -> S, + G: FnOnce() -> Block::Hash, + S: Stream + Send + 'static, +{ + // send current head right at the start. + let maybe_header = client + .header(BlockId::Hash(best_block_hash())) + .map_err(client_err) + .and_then(|header| { + header.ok_or_else(|| Error::Other("Best header missing.".to_string())) + }) + .map_err(|e| { + log::warn!("Best header error {:?}", e); + e + }) + .ok(); + + // send further subscriptions + let stream = stream(); + + // NOTE: by the time we set up the stream there might be a new best block and so there is a risk + // that the stream has a hole in it. The alternative would be to look up the best block *after* + // we set up the stream and chain it to the stream. Consuming code would need to handle + // duplicates at the beginning of the stream though. + let fut = async move { + stream::iter(maybe_header) + .chain(stream) + .take_while(|storage| { + future::ready(sink.send(&storage).map_or_else( + |e| { + log::debug!("Could not send data to subscription: {} error: {:?}", method, e); + false + }, + |_| true, + )) + }) + .for_each(|_| future::ready(())) + .await; + }; - let fut = - helpers::subscribe_finalized_headers(client, sink, "chain_subscribeFinalizedHeads"); executor.spawn_obj(Box::pin(fut).into()).map_err(|e| Error::Client(Box::new(e))) - } } diff --git a/client/rpc/src/chain/helpers.rs b/client/rpc/src/chain/helpers.rs deleted file mode 100644 index 385947423552c..0000000000000 --- a/client/rpc/src/chain/helpers.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::sync::Arc; - -use futures::{future, StreamExt}; -use jsonrpsee::ws_server::SubscriptionSink; -use sc_client_api::BlockchainEvents; -use sp_blockchain::HeaderBackend; -use sp_runtime::{generic::BlockId, traits::Block as BlockT}; - -/// Helper to create subscriptions for `allHeads` and `newHeads`. -pub async fn subscribe_headers( - client: Arc, - mut sink: SubscriptionSink, - method: &str, -) where - Block: BlockT + 'static, - Client: HeaderBackend + BlockchainEvents + 'static, -{ - let hash = client.info().best_hash; - let best_head = match client.header(BlockId::Hash(hash)) { - Ok(head) => head, - Err(e) => { - log_err(method, e); - return - }, - }; - - if let Err(e) = sink.send(&best_head) { - log_err(method, e); - return - }; - - // NOTE: by the time we set up the stream there might be a new best block and so there is a risk - // that the stream has a hole in it. The alternative would be to look up the best block *after* - // we set up the stream and chain it to the stream. Consuming code would need to handle - // duplicates at the beginning of the stream though. - let stream = client.import_notification_stream(); - stream - .take_while(|import| { - future::ready(sink.send(&import.header).map_or_else( - |e| { - log_err(method, e); - false - }, - |_| true, - )) - }) - .for_each(|_| future::ready(())) - .await; -} - -/// Helper to create subscriptions for `finalizedHeads`. -pub async fn subscribe_finalized_headers( - client: Arc, - mut sink: SubscriptionSink, - method: &str, -) where - Block: BlockT + 'static, - Client: HeaderBackend + BlockchainEvents + 'static, -{ - let hash = client.info().finalized_hash; - let best_head = match client.header(BlockId::Hash(hash)) { - Ok(head) => head, - Err(err) => { - log_err(method, err); - return - }, - }; - - if let Err(err) = sink.send(&best_head) { - log_err(method, err); - return - }; - - // NOTE: by the time we set up the stream there might be a new best block and so there is a risk - // that the stream has a hole in it. The alternative would be to look up the best block *after* - // we set up the stream and chain it to the stream. Consuming code would need to handle - // duplicates at the beginning of the stream though. - let stream = client.finality_notification_stream(); - stream - .take_while(|import| { - future::ready(sink.send(&import.header).map_or_else( - |e| { - log_err(method, e); - false - }, - |_| true, - )) - }) - .for_each(|_| future::ready(())) - .await; -} - -fn log_err(method: &str, err: E) { - log::debug!("Could not send data to subscription: {} error: {:?}", method, err); -} diff --git a/client/rpc/src/chain/mod.rs b/client/rpc/src/chain/mod.rs index 11636283d46f2..bea26a83f424c 100644 --- a/client/rpc/src/chain/mod.rs +++ b/client/rpc/src/chain/mod.rs @@ -19,7 +19,6 @@ //! Substrate blockchain API. mod chain_full; -mod helpers; #[cfg(test)] mod tests; diff --git a/client/rpc/src/chain/tests.rs b/client/rpc/src/chain/tests.rs index 71e2e6b53a947..23c789c63542c 100644 --- a/client/rpc/src/chain/tests.rs +++ b/client/rpc/src/chain/tests.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use super::*; -use crate::testing::{deser_call, timeout_secs, TaskExecutor}; +use crate::testing::{deser_call, TaskExecutor}; use assert_matches::assert_matches; use sc_block_builder::BlockBuilderProvider; use sp_consensus::BlockOrigin; @@ -226,63 +226,56 @@ async fn should_return_finalized_hash() { #[tokio::test] async fn should_notify_about_latest_block() { - let mut sub = { - let mut client = Arc::new(substrate_test_runtime_client::new()); - let api = new_full(client.clone(), SubscriptionTaskExecutor::new(TaskExecutor)).into_rpc(); - - let sub = api.test_subscription("chain_subscribeAllHeads", Vec::<()>::new()).await; + let mut client = Arc::new(substrate_test_runtime_client::new()); + let api = new_full(client.clone(), SubscriptionTaskExecutor::new(TaskExecutor)).into_rpc(); + let mut sub = api.test_subscription("chain_subscribeAllHeads", Vec::<()>::new()).await; - let block = client.new_block(Default::default()).unwrap().build().unwrap().block; - client.import(BlockOrigin::Own, block).await.unwrap(); - sub - }; + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block).await.unwrap(); - assert_matches!(timeout_secs(1, sub.next::
()).await, Ok(_)); - assert_matches!(timeout_secs(1, sub.next::
()).await, Ok(_)); + let _h1 = sub.next::
().await; + let _h2 = sub.next::
().await; - // TODO(niklasad1): assert that the subscription was closed. - assert_matches!(timeout_secs(1, sub.next::
()).await, Err(_)); + sub.close(); + // TODO: this will panic https://github.com/paritytech/jsonrpsee/pull/566 + let _h3 = sub.next::
().await; } #[tokio::test] async fn should_notify_about_best_block() { - let mut sub = { - let mut client = Arc::new(substrate_test_runtime_client::new()); - let api = new_full(client.clone(), SubscriptionTaskExecutor::new(TaskExecutor)).into_rpc(); + let mut client = Arc::new(substrate_test_runtime_client::new()); + let api = new_full(client.clone(), SubscriptionTaskExecutor::new(TaskExecutor)).into_rpc(); - let sub = api.test_subscription("chain_subscribeNewHeads", Vec::<()>::new()).await; + let mut sub = api.test_subscription("chain_subscribeNewHeads", Vec::<()>::new()).await; - let block = client.new_block(Default::default()).unwrap().build().unwrap().block; - client.import(BlockOrigin::Own, block).await.unwrap(); - sub - }; + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block).await.unwrap(); // Check for the correct number of notifications - assert_matches!(timeout_secs(1, sub.next::
()).await, Ok(_)); - assert_matches!(timeout_secs(1, sub.next::
()).await, Ok(_)); + let _h1 = sub.next::
().await; + let _h2 = sub.next::
().await; - // TODO(niklasad1): assert that the subscription was closed. - assert_matches!(timeout_secs(1, sub.next::
()).await, Err(_)); + sub.close(); + // TODO: this will panic https://github.com/paritytech/jsonrpsee/pull/566 + let _h3 = sub.next::
().await; } #[tokio::test] async fn should_notify_about_finalized_block() { - let mut sub = { - let mut client = Arc::new(substrate_test_runtime_client::new()); - let api = new_full(client.clone(), SubscriptionTaskExecutor::new(TaskExecutor)).into_rpc(); + let mut client = Arc::new(substrate_test_runtime_client::new()); + let api = new_full(client.clone(), SubscriptionTaskExecutor::new(TaskExecutor)).into_rpc(); - let sub = api.test_subscription("chain_subscribeFinalizedHeads", Vec::<()>::new()).await; + let mut sub = api.test_subscription("chain_subscribeFinalizedHeads", Vec::<()>::new()).await; - let block = client.new_block(Default::default()).unwrap().build().unwrap().block; - client.import(BlockOrigin::Own, block).await.unwrap(); - client.finalize_block(BlockId::number(1), None).unwrap(); - sub - }; + let block = client.new_block(Default::default()).unwrap().build().unwrap().block; + client.import(BlockOrigin::Own, block).await.unwrap(); + client.finalize_block(BlockId::number(1), None).unwrap(); // Check for the correct number of notifications - assert_matches!(timeout_secs(1, sub.next::
()).await, Ok(_)); - assert_matches!(timeout_secs(1, sub.next::
()).await, Ok(_)); + let _h1 = sub.next::
().await; + let _h2 = sub.next::
().await; - // TODO(niklasad1): assert that the subscription was closed. - assert_matches!(timeout_secs(1, sub.next::
()).await, Err(_)); + sub.close(); + // TODO: this will panic https://github.com/paritytech/jsonrpsee/pull/566 + let _h3 = sub.next::
().await; } From 9ef94dd366552fbbc7e2ffae1939358b9faf1e27 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Sun, 21 Nov 2021 19:42:29 +0100 Subject: [PATCH 2/2] fix bad merge --- client/rpc/src/chain/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc/src/chain/tests.rs b/client/rpc/src/chain/tests.rs index 7e336f568471e..6b09c6687a9f8 100644 --- a/client/rpc/src/chain/tests.rs +++ b/client/rpc/src/chain/tests.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use super::*; -use crate::testing::{deser_call, TaskExecutor}; +use crate::testing::{deser_call, timeout_secs, TaskExecutor}; use assert_matches::assert_matches; use sc_block_builder::BlockBuilderProvider; use sp_consensus::BlockOrigin;