From 463e2aa93f2940b324672f5608bc857d290083ed Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 25 Nov 2022 17:07:26 +0200 Subject: [PATCH] rpc: Expose the `subscription ID` for `RpcClientT` (#733) * rpc: Extend `RpcClientT` to return the subscription ID Signed-off-by: Alexandru Vasile * rpc: Return `RpcSubscriptionId` for jsonrpsee clients Signed-off-by: Alexandru Vasile * rpc: Expose subscription ID via subxt subscription Signed-off-by: Alexandru Vasile * examples: Adjust example to return subscription ID Signed-off-by: Alexandru Vasile * rpc: Add structure for subscription stream and subscription id Signed-off-by: Alexandru Vasile Signed-off-by: Alexandru Vasile --- examples/examples/custom_rpc_client.rs | 3 ++- subxt/src/rpc/jsonrpsee_impl.rs | 37 +++++++++++++++++--------- subxt/src/rpc/mod.rs | 2 ++ subxt/src/rpc/rpc_client.rs | 8 +++++- subxt/src/rpc/rpc_client_t.rs | 13 ++++++++- 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/examples/examples/custom_rpc_client.rs b/examples/examples/custom_rpc_client.rs index 52e0cd8227..ca56a53e9c 100644 --- a/examples/examples/custom_rpc_client.rs +++ b/examples/examples/custom_rpc_client.rs @@ -68,7 +68,8 @@ impl RpcClientT for MyLoggingClient { let res = RawValue::from_string("[]".to_string()).unwrap(); let stream = futures::stream::once(async move { Ok(res) }); let stream: Pin + Send>> = Box::pin(stream); - Box::pin(std::future::ready(Ok(stream))) + // This subscription does not provide an ID. + Box::pin(std::future::ready(Ok(RpcSubscription { stream, id: None }))) } } diff --git a/subxt/src/rpc/jsonrpsee_impl.rs b/subxt/src/rpc/jsonrpsee_impl.rs index 09dbfe663b..e04a7828b8 100644 --- a/subxt/src/rpc/jsonrpsee_impl.rs +++ b/subxt/src/rpc/jsonrpsee_impl.rs @@ -12,14 +12,18 @@ use futures::stream::{ StreamExt, TryStreamExt, }; -use jsonrpsee::core::{ - client::{ - Client, - ClientT, - SubscriptionClientT, +use jsonrpsee::{ + core::{ + client::{ + Client, + ClientT, + SubscriptionClientT, + SubscriptionKind, + }, + traits::ToRpcParams, + Error as JsonRpseeError, }, - traits::ToRpcParams, - Error as JsonRpseeError, + types::SubscriptionId, }; use serde_json::value::RawValue; @@ -52,17 +56,26 @@ impl RpcClientT for Client { unsub: &'a str, ) -> RpcFuture<'a, RpcSubscription> { Box::pin(async move { - let sub = SubscriptionClientT::subscribe::, _>( + let stream = SubscriptionClientT::subscribe::, _>( self, sub, Params(params), unsub, ) .await - .map_err(|e| RpcError::ClientError(Box::new(e)))? - .map_err(|e| RpcError::ClientError(Box::new(e))) - .boxed(); - Ok(sub) + .map_err(|e| RpcError::ClientError(Box::new(e)))?; + + let id = match stream.kind() { + SubscriptionKind::Subscription(SubscriptionId::Str(id)) => { + Some(id.clone().into_owned()) + } + _ => None, + }; + + let stream = stream + .map_err(|e| RpcError::ClientError(Box::new(e))) + .boxed(); + Ok(RpcSubscription { stream, id }) }) } } diff --git a/subxt/src/rpc/mod.rs b/subxt/src/rpc/mod.rs index ad08b55d59..db26f67702 100644 --- a/subxt/src/rpc/mod.rs +++ b/subxt/src/rpc/mod.rs @@ -67,6 +67,8 @@ pub use rpc_client_t::{ RpcClientT, RpcFuture, RpcSubscription, + RpcSubscriptionId, + RpcSubscriptionStream, }; pub use rpc_client::{ diff --git a/subxt/src/rpc/rpc_client.rs b/subxt/src/rpc/rpc_client.rs index 56592c0363..2d49dc1926 100644 --- a/subxt/src/rpc/rpc_client.rs +++ b/subxt/src/rpc/rpc_client.rs @@ -5,6 +5,7 @@ use super::{ RpcClientT, RpcSubscription, + RpcSubscriptionId, }; use crate::error::Error; use futures::{ @@ -185,6 +186,11 @@ impl Subscription { _marker: std::marker::PhantomData, } } + + /// Obtain the ID associated with this subscription. + pub fn subscription_id(&self) -> Option<&RpcSubscriptionId> { + self.inner.id.as_ref() + } } impl Subscription { @@ -203,7 +209,7 @@ impl Stream for Subscription { mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { - let res = futures::ready!(self.inner.poll_next_unpin(cx)); + let res = futures::ready!(self.inner.stream.poll_next_unpin(cx)); // Decode the inner RawValue to the type we're expecting and map // any errors to the right shape: diff --git a/subxt/src/rpc/rpc_client_t.rs b/subxt/src/rpc/rpc_client_t.rs index d32924ffbe..7e1ed0f665 100644 --- a/subxt/src/rpc/rpc_client_t.rs +++ b/subxt/src/rpc/rpc_client_t.rs @@ -59,6 +59,17 @@ pub trait RpcClientT: Send + Sync + 'static { pub type RpcFuture<'a, T> = Pin> + Send + 'a>>; +/// The RPC subscription returned from [`RpcClientT`]'s `subscription` method. +pub struct RpcSubscription { + /// The subscription stream. + pub stream: RpcSubscriptionStream, + /// The ID associated with the subscription. + pub id: Option, +} + /// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method. -pub type RpcSubscription = +pub type RpcSubscriptionStream = Pin, RpcError>> + Send + 'static>>; + +/// The ID associated with the [`RpcClientT`]'s `subscription`. +pub type RpcSubscriptionId = String;