Skip to content

Commit b22eb3f

Browse files
authored
impl Stream on Subscription and tweak built-in next() method to align (#601)
* get the tests passing * cargo fmt * tweak comment wording * point to StreamExt in the next() method impl just so that people know to import it if desired * ignore clippy lint on next() Iterator-like method * Fix an example * actually, unwrap instead of transpose
1 parent 59925c0 commit b22eb3f

File tree

10 files changed

+70
-45
lines changed

10 files changed

+70
-45
lines changed

.gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
*/target
33
**/*.rs.bk
44
Cargo.lock
5-
5+
.DS_Store
66

77
#Added by cargo
88
#

benches/bench.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl
163163
})
164164
},
165165
|mut sub| async move {
166-
black_box(sub.next().await.unwrap());
166+
black_box(sub.next().await.transpose().unwrap());
167167
// Note that this benchmark will include costs for measuring `drop` for subscription,
168168
// since it's not possible to combine both `iter_with_setup` and `iter_with_large_drop`.
169169
// To estimate pure cost of method, one should subtract the result of `unsub` bench

examples/proc_macro.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ async fn main() -> anyhow::Result<()> {
8585

8686
let mut sub: Subscription<Vec<ExampleHash>> =
8787
RpcClient::<ExampleHash, ExampleStorageKey>::subscribe_storage(&client, None).await.unwrap();
88-
assert_eq!(Some(vec![[0; 32]]), sub.next().await.unwrap());
88+
assert_eq!(Some(vec![[0; 32]]), sub.next().await.transpose().unwrap());
8989

9090
Ok(())
9191
}

proc-macros/src/lib.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,10 @@ pub(crate) mod visitor;
285285
///
286286
/// // Subscribe and receive messages from the subscription.
287287
/// let mut sub = client.sub().await.unwrap();
288-
/// let first_recv = sub.next().await.unwrap();
289-
/// assert_eq!(first_recv, Some("Response_A".to_string()));
290-
/// let second_recv = sub.next().await.unwrap();
291-
/// assert_eq!(second_recv, Some("Response_B".to_string()));
288+
/// let first_recv = sub.next().await.unwrap().unwrap();
289+
/// assert_eq!(first_recv, "Response_A".to_string());
290+
/// let second_recv = sub.next().await.unwrap().unwrap();
291+
/// assert_eq!(second_recv, "Response_B".to_string());
292292
/// }
293293
/// ```
294294
#[proc_macro_attribute]

proc-macros/tests/ui/correct/basic.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,12 @@ async fn main() {
107107
assert_eq!(client.request::<bool>("foo_optional_param", rpc_params![1]).await.unwrap(), true);
108108

109109
let mut sub = client.sub().await.unwrap();
110-
let first_recv = sub.next().await.unwrap();
110+
let first_recv = sub.next().await.transpose().unwrap();
111111
assert_eq!(first_recv, Some("Response_A".to_string()));
112-
let second_recv = sub.next().await.unwrap();
112+
let second_recv = sub.next().await.transpose().unwrap();
113113
assert_eq!(second_recv, Some("Response_B".to_string()));
114114

115115
let mut sub = client.sub_with_override_notif_method().await.unwrap();
116-
let recv = sub.next().await.unwrap();
116+
let recv = sub.next().await.transpose().unwrap();
117117
assert_eq!(recv, Some(1));
118118
}

tests/tests/integration_tests.rs

+12-12
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ async fn ws_subscription_works() {
5353
let mut foo_sub: Subscription<u64> = client.subscribe("subscribe_foo", None, "unsubscribe_foo").await.unwrap();
5454

5555
for _ in 0..10 {
56-
let hello = hello_sub.next().await.unwrap();
57-
let foo = foo_sub.next().await.unwrap();
58-
assert_eq!(hello, Some("hello from subscription".into()));
59-
assert_eq!(foo, Some(1337));
56+
let hello = hello_sub.next().await.unwrap().unwrap();
57+
let foo = foo_sub.next().await.unwrap().unwrap();
58+
assert_eq!(hello, "hello from subscription".to_string());
59+
assert_eq!(foo, 1337);
6060
}
6161
}
6262

@@ -181,11 +181,11 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {
181181

182182
// Capacity is `num_sender` + `capacity`
183183
for _ in 0..5 {
184-
assert!(hello_sub.next().await.unwrap().is_some());
184+
assert!(hello_sub.next().await.unwrap().is_ok());
185185
}
186186

187187
// NOTE: this is now unuseable and unregistered.
188-
assert!(hello_sub.next().await.unwrap().is_none());
188+
assert!(hello_sub.next().await.is_none());
189189

190190
// The client should still be useable => make sure it still works.
191191
let _hello_req: JsonValue = client.request("say_hello", None).await.unwrap();
@@ -194,7 +194,7 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {
194194
let mut other_sub: Subscription<JsonValue> =
195195
client.subscribe("subscribe_hello", None, "unsubscribe_hello").await.unwrap();
196196

197-
other_sub.next().await.unwrap();
197+
other_sub.next().await.unwrap().unwrap();
198198
}
199199

200200
#[tokio::test]
@@ -285,7 +285,7 @@ async fn server_should_be_able_to_close_subscriptions() {
285285

286286
let res = sub.next().await;
287287

288-
assert!(matches!(res, Err(Error::SubscriptionClosed(_))));
288+
assert!(matches!(res, Some(Err(Error::SubscriptionClosed(_)))));
289289
}
290290

291291
#[tokio::test]
@@ -297,7 +297,7 @@ async fn ws_close_pending_subscription_when_server_terminated() {
297297

298298
let mut sub: Subscription<String> = c1.subscribe("subscribe_hello", None, "unsubscribe_hello").await.unwrap();
299299

300-
assert!(matches!(sub.next().await, Ok(Some(_))));
300+
assert!(matches!(sub.next().await, Some(Ok(_))));
301301

302302
handle.stop().unwrap().await;
303303

@@ -310,7 +310,7 @@ async fn ws_close_pending_subscription_when_server_terminated() {
310310
for _ in 0..2 {
311311
match sub.next().await {
312312
// All good, exit test
313-
Ok(None) => return,
313+
None => return,
314314
// Try again
315315
_ => continue,
316316
}
@@ -352,9 +352,9 @@ async fn ws_server_should_stop_subscription_after_client_drop() {
352352

353353
let mut sub: Subscription<usize> = client.subscribe("subscribe_hello", None, "unsubscribe_hello").await.unwrap();
354354

355-
let res = sub.next().await.unwrap();
355+
let res = sub.next().await.unwrap().unwrap();
356356

357-
assert_eq!(res.as_ref(), Some(&1));
357+
assert_eq!(res, 1);
358358
drop(client);
359359
// assert that the server received `SubscriptionClosed` after the client was dropped.
360360
assert!(matches!(rx.next().await.unwrap(), SubscriptionClosedError { .. }));

tests/tests/proc_macros.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -214,17 +214,17 @@ async fn proc_macros_generic_ws_client_api() {
214214

215215
// Sub without params
216216
let mut sub = client.sub().await.unwrap();
217-
let first_recv = sub.next().await.unwrap();
218-
assert_eq!(first_recv, Some("Response_A".to_string()));
219-
let second_recv = sub.next().await.unwrap();
220-
assert_eq!(second_recv, Some("Response_B".to_string()));
217+
let first_recv = sub.next().await.unwrap().unwrap();
218+
assert_eq!(first_recv, "Response_A".to_string());
219+
let second_recv = sub.next().await.unwrap().unwrap();
220+
assert_eq!(second_recv, "Response_B".to_string());
221221

222222
// Sub with params
223223
let mut sub = client.sub_with_params(42).await.unwrap();
224-
let first_recv = sub.next().await.unwrap();
225-
assert_eq!(first_recv, Some(42));
226-
let second_recv = sub.next().await.unwrap();
227-
assert_eq!(second_recv, Some(42));
224+
let first_recv = sub.next().await.unwrap().unwrap();
225+
assert_eq!(first_recv, 42);
226+
let second_recv = sub.next().await.unwrap().unwrap();
227+
assert_eq!(second_recv, 42);
228228
}
229229

230230
#[tokio::test]

types/src/client.rs

+36-12
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,17 @@
2727
use crate::{error::SubscriptionClosedError, v2::SubscriptionId, Error};
2828
use core::marker::PhantomData;
2929
use futures_channel::{mpsc, oneshot};
30-
use futures_util::{future::FutureExt, sink::SinkExt, stream::StreamExt};
30+
use futures_util::{
31+
future::FutureExt,
32+
sink::SinkExt,
33+
stream::{Stream, StreamExt},
34+
};
3135
use serde::{de::DeserializeOwned, Deserialize, Serialize};
3236
use serde_json::Value as JsonValue;
37+
use std::pin::Pin;
3338
use std::sync::atomic::{AtomicU64, Ordering};
3439
use std::sync::Arc;
40+
use std::task;
3541

3642
/// Subscription kind
3743
#[derive(Debug)]
@@ -68,6 +74,10 @@ pub struct Subscription<Notif> {
6874
marker: PhantomData<Notif>,
6975
}
7076

77+
// `Subscription` does not automatically implement this due to `PhantomData<Notif>`,
78+
// but type type has no need to be pinned.
79+
impl<Notif> std::marker::Unpin for Subscription<Notif> {}
80+
7181
impl<Notif> Subscription<Notif> {
7282
/// Create a new subscription.
7383
pub fn new(
@@ -157,17 +167,31 @@ where
157167
Notif: DeserializeOwned,
158168
{
159169
/// Returns the next notification from the stream.
160-
/// This may return `Ok(None)` if the subscription has been terminated,
161-
/// may happen if the channel becomes full or is dropped.
162-
pub async fn next(&mut self) -> Result<Option<Notif>, Error> {
163-
match self.notifs_rx.next().await {
164-
Some(n) => match serde_json::from_value::<NotifResponse<Notif>>(n) {
165-
Ok(NotifResponse::Ok(parsed)) => Ok(Some(parsed)),
166-
Ok(NotifResponse::Err(e)) => Err(Error::SubscriptionClosed(e)),
167-
Err(e) => Err(Error::ParseError(e)),
168-
},
169-
None => Ok(None),
170-
}
170+
/// This may return `None` if the subscription has been terminated,
171+
/// which may happen if the channel becomes full or is dropped.
172+
///
173+
/// **Note:** This has an identical signature to the [`StreamExt::next`]
174+
/// method (and delegates to that). Import [`StreamExt`] if you'd like
175+
/// access to other stream combinator methods.
176+
#[allow(clippy::should_implement_trait)]
177+
pub async fn next(&mut self) -> Option<Result<Notif, Error>> {
178+
StreamExt::next(self).await
179+
}
180+
}
181+
182+
impl<Notif> Stream for Subscription<Notif>
183+
where
184+
Notif: DeserializeOwned,
185+
{
186+
type Item = Result<Notif, Error>;
187+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
188+
let n = futures_util::ready!(self.notifs_rx.poll_next_unpin(cx));
189+
let res = n.map(|n| match serde_json::from_value::<NotifResponse<Notif>>(n) {
190+
Ok(NotifResponse::Ok(parsed)) => Ok(parsed),
191+
Ok(NotifResponse::Err(e)) => Err(Error::SubscriptionClosed(e)),
192+
Err(e) => Err(Error::ParseError(e)),
193+
});
194+
task::Poll::Ready(res)
171195
}
172196
}
173197

types/src/v2/params.rs

+1
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ impl<'a> ParamsSequence<'a> {
211211
/// assert_eq!(b, 10);
212212
/// assert_eq!(c, "foo");
213213
/// ```
214+
#[allow(clippy::should_implement_trait)]
214215
pub fn next<T>(&mut self) -> Result<T, CallError>
215216
where
216217
T: Deserialize<'a>,

ws-client/src/tests.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -173,18 +173,18 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {
173173

174174
// Capacity is `num_sender` + `capacity`
175175
for _ in 0..5 {
176-
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_some());
176+
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
177177
}
178178

179179
// NOTE: this is now unuseable and unregistered.
180-
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_none());
180+
assert!(nh.next().with_default_timeout().await.unwrap().is_none());
181181

182182
// The same subscription should be possible to register again.
183183
let mut other_nh: Subscription<String> =
184184
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();
185185

186186
// check that the new subscription works.
187-
assert!(other_nh.next().with_default_timeout().await.unwrap().unwrap().is_some());
187+
assert!(other_nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
188188
assert!(client.is_connected());
189189
}
190190

0 commit comments

Comments
 (0)