Skip to content

Commit

Permalink
rpc: Expose the subscription ID for RpcClientT (#733)
Browse files Browse the repository at this point in the history
* rpc: Extend `RpcClientT` to return the subscription ID

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc: Return `RpcSubscriptionId` for jsonrpsee clients

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc: Expose subscription ID via subxt subscription

Signed-off-by: Alexandru Vasile <[email protected]>

* examples: Adjust example to return subscription ID

Signed-off-by: Alexandru Vasile <[email protected]>

* rpc: Add structure for subscription stream and subscription id

Signed-off-by: Alexandru Vasile <[email protected]>

Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv authored Nov 25, 2022
1 parent f0ce26d commit 463e2aa
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 15 deletions.
3 changes: 2 additions & 1 deletion examples/examples/custom_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl RpcClientT for MyLoggingClient {
let res = RawValue::from_string("[]".to_string()).unwrap();
let stream = futures::stream::once(async move { Ok(res) });
let stream: Pin<Box<dyn futures::Stream<Item = _> + Send>> = Box::pin(stream);
Box::pin(std::future::ready(Ok(stream)))
// This subscription does not provide an ID.
Box::pin(std::future::ready(Ok(RpcSubscription { stream, id: None })))
}
}

Expand Down
37 changes: 25 additions & 12 deletions subxt/src/rpc/jsonrpsee_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ use futures::stream::{
StreamExt,
TryStreamExt,
};
use jsonrpsee::core::{
client::{
Client,
ClientT,
SubscriptionClientT,
use jsonrpsee::{
core::{
client::{
Client,
ClientT,
SubscriptionClientT,
SubscriptionKind,
},
traits::ToRpcParams,
Error as JsonRpseeError,
},
traits::ToRpcParams,
Error as JsonRpseeError,
types::SubscriptionId,
};
use serde_json::value::RawValue;

Expand Down Expand Up @@ -52,17 +56,26 @@ impl RpcClientT for Client {
unsub: &'a str,
) -> RpcFuture<'a, RpcSubscription> {
Box::pin(async move {
let sub = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
let stream = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
self,
sub,
Params(params),
unsub,
)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?
.map_err(|e| RpcError::ClientError(Box::new(e)))
.boxed();
Ok(sub)
.map_err(|e| RpcError::ClientError(Box::new(e)))?;

let id = match stream.kind() {
SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
Some(id.clone().into_owned())
}
_ => None,
};

let stream = stream
.map_err(|e| RpcError::ClientError(Box::new(e)))
.boxed();
Ok(RpcSubscription { stream, id })
})
}
}
2 changes: 2 additions & 0 deletions subxt/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub use rpc_client_t::{
RpcClientT,
RpcFuture,
RpcSubscription,
RpcSubscriptionId,
RpcSubscriptionStream,
};

pub use rpc_client::{
Expand Down
8 changes: 7 additions & 1 deletion subxt/src/rpc/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use super::{
RpcClientT,
RpcSubscription,
RpcSubscriptionId,
};
use crate::error::Error;
use futures::{
Expand Down Expand Up @@ -185,6 +186,11 @@ impl<Res> Subscription<Res> {
_marker: std::marker::PhantomData,
}
}

/// Obtain the ID associated with this subscription.
pub fn subscription_id(&self) -> Option<&RpcSubscriptionId> {
self.inner.id.as_ref()
}
}

impl<Res: DeserializeOwned> Subscription<Res> {
Expand All @@ -203,7 +209,7 @@ impl<Res: DeserializeOwned> Stream for Subscription<Res> {
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = futures::ready!(self.inner.poll_next_unpin(cx));
let res = futures::ready!(self.inner.stream.poll_next_unpin(cx));

// Decode the inner RawValue to the type we're expecting and map
// any errors to the right shape:
Expand Down
13 changes: 12 additions & 1 deletion subxt/src/rpc/rpc_client_t.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ pub trait RpcClientT: Send + Sync + 'static {
pub type RpcFuture<'a, T> =
Pin<Box<dyn Future<Output = Result<T, RpcError>> + Send + 'a>>;

/// The RPC subscription returned from [`RpcClientT`]'s `subscription` method.
pub struct RpcSubscription {
/// The subscription stream.
pub stream: RpcSubscriptionStream,
/// The ID associated with the subscription.
pub id: Option<RpcSubscriptionId>,
}

/// The inner subscription stream returned from our [`RpcClientT`]'s `subscription` method.
pub type RpcSubscription =
pub type RpcSubscriptionStream =
Pin<Box<dyn Stream<Item = Result<Box<RawValue>, RpcError>> + Send + 'static>>;

/// The ID associated with the [`RpcClientT`]'s `subscription`.
pub type RpcSubscriptionId = String;

0 comments on commit 463e2aa

Please sign in to comment.