diff --git a/Cargo.toml b/Cargo.toml index 5c87c28fc8..be9b35c194 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,15 +2,16 @@ members = [ "examples", "benches", - "http-client", "http-server", "test-utils", "jsonrpsee", "tests", "types", "core", - "ws-client", "ws-server", + "client/ws-client", + "client/http-client", + "client/transport", "proc-macros", ] resolver = "2" diff --git a/benches/Cargo.toml b/benches/Cargo.toml index a858c4a583..e4fde5d791 100644 --- a/benches/Cargo.toml +++ b/benches/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-benchmarks" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies "] description = "Benchmarks for jsonrpsee" edition = "2018" diff --git a/benches/bench.rs b/benches/bench.rs index dc4617531d..47ac08ec04 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use criterion::*; use futures_util::future::join_all; use helpers::{SUB_METHOD_NAME, UNSUB_METHOD_NAME}; -use jsonrpsee::core::client::{Client, SubscriptionClient}; +use jsonrpsee::core::client::{ClientT, SubscriptionClientT}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::types::{Id, ParamsSer, RequestSer}; use jsonrpsee::ws_client::WsClientBuilder; @@ -132,7 +132,13 @@ impl RequestBencher for AsyncBencher { const REQUEST_TYPE: RequestType = RequestType::Async; } -fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str, request: RequestType) { +fn run_round_trip( + rt: &TokioRuntime, + crit: &mut Criterion, + client: Arc, + name: &str, + request: RequestType, +) { crit.bench_function(&request.group_name(name), |b| { b.to_async(rt).iter(|| async { black_box(client.request::(request.method_name(), None).await.unwrap()); @@ -140,7 +146,7 @@ fn run_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { +fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, name: &str) { let mut group = crit.benchmark_group(name); group.bench_function("subscribe", |b| { b.to_async(rt).iter_with_large_drop(|| async { @@ -188,7 +194,7 @@ fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc, + client: Arc, name: &str, request: RequestType, ) { @@ -203,7 +209,7 @@ fn run_round_trip_with_batch( group.finish(); } -fn run_concurrent_round_trip( +fn run_concurrent_round_trip( rt: &TokioRuntime, crit: &mut Criterion, client: Arc, diff --git a/http-client/Cargo.toml b/client/http-client/Cargo.toml similarity index 79% rename from http-client/Cargo.toml rename to client/http-client/Cargo.toml index a615b4c5f3..575288122d 100644 --- a/http-client/Cargo.toml +++ b/client/http-client/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-http-client" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies ", "Pierre Krieger "] description = "HTTP client for JSON-RPC" edition = "2018" @@ -14,8 +14,8 @@ async-trait = "0.1" rustc-hash = "1" hyper = { version = "0.14.10", features = ["client", "http1", "http2", "tcp"] } hyper-rustls = { version = "0.23", optional = true } -jsonrpsee-types = { path = "../types", version = "0.6.0" } -jsonrpsee-core = { path = "../core", version = "0.6.0", features = ["client", "http-helpers"] } +jsonrpsee-types = { path = "../../types", version = "0.6.1" } +jsonrpsee-core = { path = "../../core", version = "0.6.1", features = ["client", "http-helpers"] } serde = { version = "1.0", default-features = false, features = ["derive"] } serde_json = "1.0" thiserror = "1.0" @@ -23,7 +23,7 @@ tokio = { version = "1.8", features = ["time"] } tracing = "0.1" [dev-dependencies] -jsonrpsee-test-utils = { path = "../test-utils" } +jsonrpsee-test-utils = { path = "../../test-utils" } tokio = { version = "1.8", features = ["net", "rt-multi-thread", "macros"] } [features] diff --git a/http-client/src/client.rs b/client/http-client/src/client.rs similarity index 96% rename from http-client/src/client.rs rename to client/http-client/src/client.rs index 364c7fb616..45a245fab3 100644 --- a/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -28,10 +28,10 @@ use std::sync::Arc; use std::time::Duration; use crate::transport::HttpTransportClient; -use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response, TEN_MB_SIZE_BYTES}; +use crate::types::{ErrorResponse, Id, NotificationSer, ParamsSer, RequestSer, Response}; use async_trait::async_trait; -use jsonrpsee_core::client::{CertificateStore, Client, RequestIdManager, Subscription, SubscriptionClient}; -use jsonrpsee_core::Error; +use jsonrpsee_core::client::{CertificateStore, ClientT, RequestIdManager, Subscription, SubscriptionClientT}; +use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; use rustc_hash::FxHashMap; use serde::de::DeserializeOwned; @@ -104,7 +104,7 @@ pub struct HttpClient { } #[async_trait] -impl Client for HttpClient { +impl ClientT for HttpClient { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { let notif = NotificationSer::new(method, params); let fut = self.transport.send(serde_json::to_string(¬if).map_err(Error::ParseError)?); @@ -196,7 +196,7 @@ impl Client for HttpClient { } #[async_trait] -impl SubscriptionClient for HttpClient { +impl SubscriptionClientT for HttpClient { /// Send a subscription request to the server. Not implemented for HTTP; will always return [`Error::HttpNotImplemented`]. async fn subscribe<'a, N>( &self, diff --git a/http-client/src/lib.rs b/client/http-client/src/lib.rs similarity index 100% rename from http-client/src/lib.rs rename to client/http-client/src/lib.rs diff --git a/http-client/src/tests.rs b/client/http-client/src/tests.rs similarity index 99% rename from http-client/src/tests.rs rename to client/http-client/src/tests.rs index c14fabea1a..2324443b75 100644 --- a/http-client/src/tests.rs +++ b/client/http-client/src/tests.rs @@ -27,7 +27,7 @@ use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse}; use crate::types::ParamsSer; use crate::HttpClientBuilder; -use jsonrpsee_core::client::Client; +use jsonrpsee_core::client::ClientT; use jsonrpsee_core::rpc_params; use jsonrpsee_core::Error; use jsonrpsee_test_utils::helpers::*; diff --git a/http-client/src/transport.rs b/client/http-client/src/transport.rs similarity index 100% rename from http-client/src/transport.rs rename to client/http-client/src/transport.rs diff --git a/client/transport/Cargo.toml b/client/transport/Cargo.toml new file mode 100644 index 0000000000..6bef203af5 --- /dev/null +++ b/client/transport/Cargo.toml @@ -0,0 +1,41 @@ +[package] +name = "jsonrpsee-client-transport" +version = "0.6.1" +authors = ["Parity Technologies ", "Pierre Krieger "] +description = "WebSocket client for JSON-RPC" +edition = "2018" +license = "MIT" +repository = "https://github.com/paritytech/jsonrpsee" +homepage = "https://github.com/paritytech/jsonrpsee" +documentation = "https://docs.rs/jsonrpsee-ws-client" + +[dependencies] +jsonrpsee-types = { path = "../../types", version = "0.6.1", optional = true } +jsonrpsee-core = { path = "../../core", version = "0.6.1", features = ["client"] } +tracing = { version = "0.1", optional = true } +thiserror = { version = "1", optional = true } +futures = { version = "0.3.14", default-features = false, features = ["std"], optional = true } +http = { version = "0.2", optional = true } +tokio-util = { version = "0.6", features = ["compat"], optional = true } +tokio = { version = "1", features = ["net", "time", "macros"], optional = true } +pin-project = { version = "1", optional = true } +rustls-native-certs = { version = "0.6", optional = true } +webpki-roots = { version = "0.22", optional = true } +tokio-rustls = { version = "0.23", optional = true } + +# ws +soketto = { version = "0.7.1", optional = true } + +[features] +tls = ["tokio-rustls", "webpki-roots", "rustls-native-certs"] +ws = [ + "futures", + "http", + "tokio", + "tokio-util", + "soketto", + "pin-project", + "jsonrpsee-types", + "thiserror", + "tracing" +] diff --git a/ws-client/src/lib.rs b/client/transport/src/lib.rs similarity index 68% rename from ws-client/src/lib.rs rename to client/transport/src/lib.rs index 2cd1189184..2ae0dab1ef 100644 --- a/ws-client/src/lib.rs +++ b/client/transport/src/lib.rs @@ -26,27 +26,9 @@ #![warn(missing_debug_implementations, missing_docs, unreachable_pub)] -//! # jsonrpsee-ws-client +//! # jsonrpsee-client-transports //! -//! `jsonrpsee-ws-client` is a [JSON RPC](https://www.jsonrpc.org/specification) WebSocket client library that's is built for `async/await`. -//! -//! ## Runtime support -//! -//! This library uses `tokio` as the runtime and does not support other kinds of runtimes. - -/// WebSocket Client. -pub mod client; -/// Helpers. -pub mod helpers; -/// Request manager. -pub mod manager; -/// Stream. -pub mod stream; -/// WebSocket transport. -pub mod transport; - -#[cfg(test)] -mod tests; -pub use client::{WsClient, WsClientBuilder}; -pub use jsonrpsee_types as types; +/// Websocket transport +#[cfg(feature = "ws")] +pub mod ws; diff --git a/ws-client/src/transport.rs b/client/transport/src/ws/mod.rs similarity index 83% rename from ws-client/src/transport.rs rename to client/transport/src/ws/mod.rs index 0390dbc5e8..af546c1fdd 100644 --- a/ws-client/src/transport.rs +++ b/client/transport/src/ws/mod.rs @@ -24,21 +24,26 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +mod stream; + use std::convert::{TryFrom, TryInto}; use std::io; use std::net::{SocketAddr, ToSocketAddrs}; use std::time::Duration; -use crate::stream::EitherStream; -use beef::Cow; use futures::io::{BufReader, BufWriter}; -use http::Uri; -use jsonrpsee_core::client::CertificateStore; +use jsonrpsee_core::client::{CertificateStore, TransportReceiverT, TransportSenderT}; +use jsonrpsee_core::TEN_MB_SIZE_BYTES; +use jsonrpsee_core::{async_trait, Cow}; use soketto::connection; -use soketto::handshake::client::{Client as WsHandshakeClient, Header, ServerResponse}; +use soketto::handshake::client::{Client as WsHandshakeClient, ServerResponse}; +use stream::EitherStream; use thiserror::Error; use tokio::net::TcpStream; +pub use http::{uri::InvalidUri, Uri}; +pub use soketto::handshake::client::Header; + /// Sending end of WebSocket transport. #[derive(Debug)] pub struct Sender { @@ -56,10 +61,8 @@ pub struct Receiver { pub struct WsTransportClientBuilder<'a> { /// What certificate store to use pub certificate_store: CertificateStore, - /// Remote WebSocket target. - pub target: Target, /// Timeout for the connection. - pub timeout: Duration, + pub connection_timeout: Duration, /// Custom headers to pass during the HTTP handshake. If `None`, no /// custom header is passed. pub headers: Vec>, @@ -69,6 +72,53 @@ pub struct WsTransportClientBuilder<'a> { pub max_redirections: usize, } +impl<'a> Default for WsTransportClientBuilder<'a> { + fn default() -> Self { + Self { + certificate_store: CertificateStore::Native, + max_request_body_size: TEN_MB_SIZE_BYTES, + connection_timeout: Duration::from_secs(10), + headers: Vec::new(), + max_redirections: 5, + } + } +} + +impl<'a> WsTransportClientBuilder<'a> { + /// Set whether to use system certificates (default is native). + pub fn certificate_store(mut self, certificate_store: CertificateStore) -> Self { + self.certificate_store = certificate_store; + self + } + + /// Set max request body size (default is 10 MB). + pub fn max_request_body_size(mut self, size: u32) -> Self { + self.max_request_body_size = size; + self + } + + /// Set connection timeout for the handshake (default is 10 seconds). + pub fn connection_timeout(mut self, timeout: Duration) -> Self { + self.connection_timeout = timeout; + self + } + + /// Set a custom header passed to the server during the handshake (default is none). + /// + /// The caller is responsible for checking that the headers do not conflict or are duplicated. + pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self { + self.headers.push(Header { name, value: value.as_bytes() }); + self + } + + /// Set the max number of redirections to perform until a connection is regarded as failed. + /// (default is 5). + pub fn max_redirections(mut self, redirect: usize) -> Self { + self.max_redirections = redirect; + self + } +} + /// Stream mode, either plain TCP or TLS. #[derive(Clone, Copy, Debug, PartialEq)] pub enum Mode { @@ -131,16 +181,15 @@ pub enum WsError { /// Error in the WebSocket connection. #[error("WebSocket connection error: {}", 0)] Connection(#[source] soketto::connection::Error), - - /// Failed to parse the message in JSON. - #[error("Failed to parse message in JSON: {}", 0)] - ParseError(#[source] serde_json::error::Error), } -impl Sender { +#[async_trait] +impl TransportSenderT for Sender { + type Error = WsError; + /// Sends out a request. Returns a `Future` that finishes when the request has been /// successfully sent. - pub async fn send(&mut self, body: String) -> Result<(), WsError> { + async fn send(&mut self, body: String) -> Result<(), WsError> { tracing::debug!("send: {}", body); self.inner.send_text(body).await?; self.inner.flush().await?; @@ -148,33 +197,37 @@ impl Sender { } /// Send a close message and close the connection. - pub async fn close(&mut self) -> Result<(), WsError> { + async fn close(&mut self) -> Result<(), WsError> { self.inner.close().await.map_err(Into::into) } } -impl Receiver { +#[async_trait] +impl TransportReceiverT for Receiver { + type Error = WsError; + /// Returns a `Future` resolving when the server sent us something back. - pub async fn next_response(&mut self) -> Result, WsError> { + async fn receive(&mut self) -> Result { let mut message = Vec::new(); self.inner.receive_data(&mut message).await?; - Ok(message) + let s = String::from_utf8(message).expect("Found invalid UTF-8"); + Ok(s) } } impl<'a> WsTransportClientBuilder<'a> { /// Try to establish the connection. - pub async fn build(self) -> Result<(Sender, Receiver), WsHandshakeError> { - self.try_connect().await + pub async fn build(self, uri: Uri) -> Result<(Sender, Receiver), WsHandshakeError> { + let target: Target = uri.try_into()?; + self.try_connect(target).await } - async fn try_connect(self) -> Result<(Sender, Receiver), WsHandshakeError> { - let mut target = self.target; + async fn try_connect(self, mut target: Target) -> Result<(Sender, Receiver), WsHandshakeError> { let mut err = None; // Only build TLS connector if `wss` in URL. #[cfg(feature = "tls")] - let mut connector = match target.mode { + let mut connector = match target._mode { Mode::Tls => Some(build_tls_config(&self.certificate_store)?), Mode::Plain => None, }; @@ -186,7 +239,7 @@ impl<'a> WsTransportClientBuilder<'a> { let sockaddrs = std::mem::take(&mut target.sockaddrs); for sockaddr in &sockaddrs { #[cfg(feature = "tls")] - let tcp_stream = match connect(*sockaddr, self.timeout, &target.host, connector.as_ref()).await { + let tcp_stream = match connect(*sockaddr, self.connection_timeout, &target.host, connector.as_ref()).await { Ok(stream) => stream, Err(e) => { tracing::debug!("Failed to connect to sockaddr: {:?}", sockaddr); @@ -196,7 +249,7 @@ impl<'a> WsTransportClientBuilder<'a> { }; #[cfg(not(feature = "tls"))] - let tcp_stream = match connect(*sockaddr, self.timeout).await { + let tcp_stream = match connect(*sockaddr, self.connection_timeout).await { Ok(stream) => stream, Err(e) => { tracing::debug!("Failed to connect to sockaddr: {:?}", sockaddr); @@ -238,7 +291,7 @@ impl<'a> WsTransportClientBuilder<'a> { // Only build TLS connector if `wss` in redirection URL. #[cfg(feature = "tls")] - match target.mode { + match target._mode { Mode::Tls if connector.is_none() => { connector = Some(build_tls_config(&self.certificate_store)?); } @@ -369,7 +422,7 @@ pub struct Target { /// The Host request header specifies the host and port number of the server to which the request is being sent. host_header: String, /// WebSocket stream mode, see [`Mode`] for further documentation. - mode: Mode, + _mode: Mode, /// The path and query parts from an URL. path_and_query: String, } @@ -378,7 +431,7 @@ impl TryFrom for Target { type Error = WsHandshakeError; fn try_from(uri: Uri) -> Result { - let mode = match uri.scheme_str() { + let _mode = match uri.scheme_str() { Some("ws") => Mode::Plain, #[cfg(feature = "tls")] Some("wss") => Mode::Tls, @@ -398,7 +451,13 @@ impl TryFrom for Target { let parts = uri.into_parts(); let path_and_query = parts.path_and_query.ok_or_else(|| WsHandshakeError::Url("No path in URL".into()))?; let sockaddrs = host_header.to_socket_addrs().map_err(WsHandshakeError::ResolutionFailed)?; - Ok(Self { sockaddrs: sockaddrs.collect(), host, host_header, mode, path_and_query: path_and_query.to_string() }) + Ok(Self { + sockaddrs: sockaddrs.collect(), + host, + host_header, + _mode, + path_and_query: path_and_query.to_string(), + }) } } @@ -451,7 +510,7 @@ mod tests { fn assert_ws_target(target: Target, host: &str, host_header: &str, mode: Mode, path_and_query: &str) { assert_eq!(&target.host, host); assert_eq!(&target.host_header, host_header); - assert_eq!(target.mode, mode); + assert_eq!(target._mode, mode); assert_eq!(&target.path_and_query, path_and_query); } diff --git a/ws-client/src/stream.rs b/client/transport/src/ws/stream.rs similarity index 99% rename from ws-client/src/stream.rs rename to client/transport/src/ws/stream.rs index 85b1e0d39b..edf11dc931 100644 --- a/ws-client/src/stream.rs +++ b/client/transport/src/ws/stream.rs @@ -40,7 +40,7 @@ use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; /// Stream to represent either a unencrypted or encrypted socket stream. #[pin_project(project = EitherStreamProj)] #[derive(Debug)] -pub enum EitherStream { +pub(crate) enum EitherStream { /// Unencrypted socket stream. Plain(#[pin] TcpStream), /// Encrypted socket stream. diff --git a/client/ws-client/Cargo.toml b/client/ws-client/Cargo.toml new file mode 100644 index 0000000000..d3e53bf88d --- /dev/null +++ b/client/ws-client/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "jsonrpsee-ws-client" +version = "0.6.1" +authors = ["Parity Technologies ", "Pierre Krieger "] +description = "WebSocket client for JSON-RPC" +edition = "2018" +license = "MIT" +repository = "https://github.com/paritytech/jsonrpsee" +homepage = "https://github.com/paritytech/jsonrpsee" +documentation = "https://docs.rs/jsonrpsee-ws-client" + +[dependencies] +jsonrpsee-types = { path = "../../types", version = "0.6.1" } +jsonrpsee-client-transport = { path = "../transport", version = "0.6.1", features = ["ws"] } +jsonrpsee-core = { path = "../../core", version = "0.6.1", features = ["async-client"] } + +[dev-dependencies] +env_logger = "0.9" +jsonrpsee-test-utils = { path = "../../test-utils" } +tokio = { version = "1", features = ["macros"] } +serde_json = "1" diff --git a/client/ws-client/src/lib.rs b/client/ws-client/src/lib.rs new file mode 100644 index 0000000000..a8ddf63ca9 --- /dev/null +++ b/client/ws-client/src/lib.rs @@ -0,0 +1,169 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] + +//! # jsonrpsee-ws-client +//! +//! `jsonrpsee-ws-client` is a [JSON RPC](https://www.jsonrpc.org/specification) WebSocket client library that's is built for `async/await`. +//! +//! ## Async runtime support +//! +//! This library uses `tokio` as the runtime and does not support other runtimes. + +#[cfg(test)] +mod tests; + +pub use jsonrpsee_types as types; + +use std::time::Duration; + +use jsonrpsee_client_transport::ws::{Header, InvalidUri, Uri, WsTransportClientBuilder}; +use jsonrpsee_core::client::{CertificateStore, Client, ClientBuilder}; +use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; + +/// Builder for [`Client`]. +/// +/// # Examples +/// +/// ```no_run +/// +/// use jsonrpsee_ws_client::WsClientBuilder; +/// +/// #[tokio::main] +/// async fn main() { +/// // build client +/// let client = WsClientBuilder::default() +/// .add_header("Any-Header-You-Like", "42") +/// .build("wss://localhost:443") +/// .await +/// .unwrap(); +/// +/// // use client.... +/// } +/// +/// ``` +#[derive(Clone, Debug)] +pub struct WsClientBuilder<'a> { + certificate_store: CertificateStore, + max_request_body_size: u32, + request_timeout: Duration, + connection_timeout: Duration, + headers: Vec>, + max_concurrent_requests: usize, + max_notifs_per_subscription: usize, + max_redirections: usize, +} + +impl<'a> Default for WsClientBuilder<'a> { + fn default() -> Self { + Self { + certificate_store: CertificateStore::Native, + max_request_body_size: TEN_MB_SIZE_BYTES, + request_timeout: Duration::from_secs(60), + connection_timeout: Duration::from_secs(10), + headers: Vec::new(), + max_concurrent_requests: 256, + max_notifs_per_subscription: 1024, + max_redirections: 5, + } + } +} + +impl<'a> WsClientBuilder<'a> { + /// See documentation [`WsTransportClientBuilder::certificate_store`] (default is native). + pub fn certificate_store(mut self, certificate_store: CertificateStore) -> Self { + self.certificate_store = certificate_store; + self + } + + /// See documentation [`WsTransportClientBuilder::max_request_body_size`] (default is 10 MB). + pub fn max_request_body_size(mut self, size: u32) -> Self { + self.max_request_body_size = size; + self + } + + /// See documentation [`ClientBuilder::request_timeout`] (default is 60 seconds). + pub fn request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + /// See documentation [`WsTransportClientBuilder::connection_timeout`] (default is 10 seconds). + pub fn connection_timeout(mut self, timeout: Duration) -> Self { + self.connection_timeout = timeout; + self + } + + /// See documentation [`WsTransportClientBuilder::add_header`] (default is none). + pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self { + self.headers.push(Header { name, value: value.as_bytes() }); + self + } + + /// See documentation [`ClientBuilder::max_concurrent_requests`] (default is 256). + pub fn max_concurrent_requests(mut self, max: usize) -> Self { + self.max_concurrent_requests = max; + self + } + + /// See documentation [`ClientBuilder::max_notifs_per_subscription`] (default is 1024). + pub fn max_notifs_per_subscription(mut self, max: usize) -> Self { + self.max_notifs_per_subscription = max; + self + } + + /// See documentation [`WsTransportClientBuilder::max_redirections`] (default is 5). + pub fn max_redirections(mut self, redirect: usize) -> Self { + self.max_redirections = redirect; + self + } + + /// Build the client with specified URL to connect to. + /// You must provide the port number in the URL. + /// + /// ## Panics + /// + /// Panics if being called outside of `tokio` runtime context. + pub async fn build(self, url: impl AsRef) -> Result { + let transport_builder = WsTransportClientBuilder { + certificate_store: self.certificate_store, + connection_timeout: self.connection_timeout, + headers: self.headers, + max_request_body_size: self.max_request_body_size, + max_redirections: self.max_redirections, + }; + + let uri: Uri = url.as_ref().parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?; + let (sender, receiver) = transport_builder.build(uri).await.map_err(|e| Error::Transport(e.into()))?; + + Ok(ClientBuilder::default() + .max_notifs_per_subscription(self.max_notifs_per_subscription) + .request_timeout(self.request_timeout) + .max_concurrent_requests(self.max_concurrent_requests) + .build(sender, receiver)) + } +} diff --git a/ws-client/src/tests.rs b/client/ws-client/src/tests.rs similarity index 99% rename from ws-client/src/tests.rs rename to client/ws-client/src/tests.rs index a8f7b7bec5..606a5c18be 100644 --- a/ws-client/src/tests.rs +++ b/client/ws-client/src/tests.rs @@ -29,7 +29,7 @@ use crate::types::error::{ErrorCode, ErrorObject, ErrorResponse}; use crate::types::ParamsSer; use crate::WsClientBuilder; use jsonrpsee_core::client::Subscription; -use jsonrpsee_core::client::{Client, SubscriptionClient}; +use jsonrpsee_core::client::{ClientT, SubscriptionClientT}; use jsonrpsee_core::rpc_params; use jsonrpsee_core::Error; use jsonrpsee_test_utils::helpers::*; diff --git a/core/Cargo.toml b/core/Cargo.toml index 77fd26206f..ded31bb99a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-core" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies "] description = "Utilities for jsonrpsee" edition = "2018" @@ -15,7 +15,7 @@ thiserror = "1" futures-channel = { version = "0.3.14", default-features = false } futures-util = { version = "0.3.14", default-features = false, optional = true } hyper = { version = "0.14.10", default-features = false, features = ["stream"] } -jsonrpsee-types = { path = "../types", version = "0.6.0" } +jsonrpsee-types = { path = "../types", version = "0.6.1" } tracing = { version = "0.1", optional = true } rustc-hash = { version = "1", optional = true } rand = { version = "0.8", optional = true } @@ -37,6 +37,14 @@ server = [ "tokio", ] client = ["futures-util"] +async-client = [ + "client", + "rustc-hash", + "tokio/sync", + "tokio/macros", + "tokio/time", + "tracing" +] [dev-dependencies] serde_json = "1.0" diff --git a/ws-client/src/helpers.rs b/core/src/client/async_client/helpers.rs similarity index 90% rename from ws-client/src/helpers.rs rename to core/src/client/async_client/helpers.rs index 6527105130..6043a0dad6 100644 --- a/ws-client/src/helpers.rs +++ b/core/src/client/async_client/helpers.rs @@ -27,19 +27,20 @@ use std::convert::TryInto; use std::time::Duration; -use crate::manager::{RequestManager, RequestStatus}; -use crate::transport::Sender as WsSender; -use crate::types::{ +use crate::client::async_client::manager::{RequestManager, RequestStatus}; +use crate::client::{RequestMessage, TransportSenderT}; +use crate::Error; + +use futures_channel::{mpsc, oneshot}; +use jsonrpsee_types::{ ErrorResponse, Id, Notification, ParamsSer, RequestSer, Response, SubscriptionId, SubscriptionResponse, }; -use futures::channel::{mpsc, oneshot}; -use jsonrpsee_core::{client::RequestMessage, Error}; use serde_json::Value as JsonValue; /// Attempts to process a batch response. /// /// On success the result is sent to the frontend. -pub fn process_batch_response(manager: &mut RequestManager, rps: Vec>) -> Result<(), Error> { +pub(crate) fn process_batch_response(manager: &mut RequestManager, rps: Vec>) -> Result<(), Error> { let mut digest = Vec::with_capacity(rps.len()); let mut ordered_responses = vec![JsonValue::Null; rps.len()]; let mut rps_unordered: Vec<_> = Vec::with_capacity(rps.len()); @@ -73,7 +74,7 @@ pub fn process_batch_response(manager: &mut RequestManager, rps: Vec, ) -> Result<(), Option> { @@ -104,7 +105,7 @@ pub fn process_subscription_response( /// /// Returns Ok() if the response was successfully handled /// Returns Err() if there was no handler for the method -pub fn process_notification(manager: &mut RequestManager, notif: Notification) -> Result<(), Error> { +pub(crate) fn process_notification(manager: &mut RequestManager, notif: Notification) -> Result<(), Error> { match manager.as_notification_handler_mut(notif.method.to_string()) { Some(send_back_sink) => match send_back_sink.try_send(notif.params) { Ok(()) => Ok(()), @@ -126,7 +127,7 @@ pub fn process_notification(manager: &mut RequestManager, notif: Notification, max_capacity_per_subscription: usize, @@ -177,7 +178,11 @@ pub fn process_single_response( /// that the client is not interested in the subscription anymore. // // NOTE: we don't count this a concurrent request as it's part of a subscription. -pub async fn stop_subscription(sender: &mut WsSender, manager: &mut RequestManager, unsub: RequestMessage) { +pub(crate) async fn stop_subscription( + sender: &mut impl TransportSenderT, + manager: &mut RequestManager, + unsub: RequestMessage, +) { if let Err(e) = sender.send(unsub.raw).await { tracing::error!("Send unsubscribe request failed: {:?}", e); let _ = manager.complete_pending_call(unsub.id); @@ -185,7 +190,7 @@ pub async fn stop_subscription(sender: &mut WsSender, manager: &mut RequestManag } /// Builds an unsubscription message. -pub fn build_unsubscribe_message( +pub(crate) fn build_unsubscribe_message( manager: &mut RequestManager, sub_req_id: u64, sub_id: SubscriptionId<'static>, @@ -202,7 +207,7 @@ pub fn build_unsubscribe_message( /// /// Returns `Ok` if the response was successfully sent. /// Returns `Err(_)` if the response ID was not found. -pub fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> { +pub(crate) fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) -> Result<(), Error> { let id = err.id.as_number().copied().ok_or(Error::InvalidRequestId)?; match manager.request_status(&id) { RequestStatus::PendingMethodCall => { @@ -220,7 +225,7 @@ pub fn process_error_response(manager: &mut RequestManager, err: ErrorResponse) } /// Wait for a stream to complete within the given timeout. -pub async fn call_with_timeout( +pub(crate) async fn call_with_timeout( timeout: Duration, rx: oneshot::Receiver>, ) -> Result, oneshot::Canceled> { diff --git a/ws-client/src/manager.rs b/core/src/client/async_client/manager.rs similarity index 91% rename from ws-client/src/manager.rs rename to core/src/client/async_client/manager.rs index 337786e4d6..16ac204340 100644 --- a/ws-client/src/manager.rs +++ b/core/src/client/async_client/manager.rs @@ -32,11 +32,11 @@ //! > **Note**: The spec allow number, string or null but this crate only supports numbers. //! - SubscriptionId: unique ID generated by server -use std::collections::hash_map::{Entry, HashMap}; +use std::collections::{hash_map::Entry, HashMap}; -use crate::types::SubscriptionId; -use futures::channel::{mpsc, oneshot}; -use jsonrpsee_core::Error; +use crate::Error; +use futures_channel::{mpsc, oneshot}; +use jsonrpsee_types::SubscriptionId; use rustc_hash::FxHashMap; use serde_json::value::Value as JsonValue; @@ -49,7 +49,7 @@ enum Kind { #[derive(Debug)] /// Indicates the status of a given request/response. -pub enum RequestStatus { +pub(crate) enum RequestStatus { /// The method call is waiting for a response, PendingMethodCall, /// The subscription is waiting for a response to become an active subscription. @@ -69,16 +69,16 @@ type RequestId = u64; #[derive(Debug)] /// Batch state. -pub struct BatchState { +pub(crate) struct BatchState { /// Order that the request was performed in. - pub order: FxHashMap, + pub(crate) order: FxHashMap, /// Oneshot send back. - pub send_back: PendingBatchOneshot, + pub(crate) send_back: PendingBatchOneshot, } #[derive(Debug, Default)] /// Manages and monitors JSONRPC v2 method calls and subscriptions. -pub struct RequestManager { +pub(crate) struct RequestManager { /// List of requests that are waiting for a response from the server. // NOTE: FnvHashMap is used here because RequestId is not under the caller's control and is known to be a short // key. @@ -94,14 +94,14 @@ pub struct RequestManager { impl RequestManager { /// Create a new `RequestManager`. - pub fn new() -> Self { + pub(crate) fn new() -> Self { Self::default() } /// Tries to insert a new pending call. /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. - pub fn insert_pending_call( + pub(crate) fn insert_pending_call( &mut self, id: RequestId, send_back: PendingCallOneshot, @@ -117,7 +117,7 @@ impl RequestManager { /// Tries to insert a new batch request /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. - pub fn insert_pending_batch( + pub(crate) fn insert_pending_batch( &mut self, mut batch: Vec, send_back: PendingBatchOneshot, @@ -137,7 +137,7 @@ impl RequestManager { /// Tries to insert a new pending subscription and reserves a slot for a "potential" unsubscription request. /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. - pub fn insert_pending_subscription( + pub(crate) fn insert_pending_subscription( &mut self, sub_req_id: RequestId, unsub_req_id: RequestId, @@ -160,7 +160,7 @@ impl RequestManager { /// Tries to insert a new subscription. /// /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. - pub fn insert_subscription( + pub(crate) fn insert_subscription( &mut self, sub_req_id: RequestId, unsub_req_id: RequestId, @@ -180,7 +180,11 @@ impl RequestManager { } /// Inserts a handler for incoming notifications - pub fn insert_notification_handler(&mut self, method: &str, send_back: SubscriptionSink) -> Result<(), Error> { + pub(crate) fn insert_notification_handler( + &mut self, + method: &str, + send_back: SubscriptionSink, + ) -> Result<(), Error> { if let Entry::Vacant(handle) = self.notification_handlers.entry(method.to_owned()) { handle.insert(send_back); Ok(()) @@ -190,7 +194,7 @@ impl RequestManager { } /// Removes a notification handler - pub fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> { + pub(crate) fn remove_notification_handler(&mut self, method: String) -> Result<(), Error> { if self.notification_handlers.remove(&method).is_some() { Ok(()) } else { @@ -201,7 +205,7 @@ impl RequestManager { /// Tries to complete a pending subscription. /// /// Returns `Some` if the subscription was completed otherwise `None`. - pub fn complete_pending_subscription( + pub(crate) fn complete_pending_subscription( &mut self, request_id: RequestId, ) -> Option<(RequestId, PendingSubscriptionOneshot, UnsubscribeMethod)> { @@ -221,7 +225,7 @@ impl RequestManager { /// Tries to complete a pending batch request /// /// Returns `Some` if the subscription was completed otherwise `None`. - pub fn complete_pending_batch(&mut self, batch: Vec) -> Option { + pub(crate) fn complete_pending_batch(&mut self, batch: Vec) -> Option { match self.batches.entry(batch) { Entry::Occupied(request) => { let (_digest, state) = request.remove_entry(); @@ -234,7 +238,7 @@ impl RequestManager { /// Tries to complete a pending call.. /// /// Returns `Some` if the call was completed otherwise `None`. - pub fn complete_pending_call(&mut self, request_id: RequestId) -> Option { + pub(crate) fn complete_pending_call(&mut self, request_id: RequestId) -> Option { match self.requests.entry(request_id) { Entry::Occupied(request) if matches!(request.get(), Kind::PendingMethodCall(_)) => { let (_req_id, kind) = request.remove_entry(); @@ -251,7 +255,7 @@ impl RequestManager { /// Tries to remove a subscription. /// /// Returns `Some` if the subscription was removed otherwise `None`. - pub fn remove_subscription( + pub(crate) fn remove_subscription( &mut self, request_id: RequestId, subscription_id: SubscriptionId<'static>, @@ -273,7 +277,7 @@ impl RequestManager { } /// Returns the status of a request ID - pub fn request_status(&mut self, id: &RequestId) -> RequestStatus { + pub(crate) fn request_status(&mut self, id: &RequestId) -> RequestStatus { self.requests.get(id).map_or(RequestStatus::Invalid, |kind| match kind { Kind::PendingMethodCall(_) => RequestStatus::PendingMethodCall, Kind::PendingSubscription(_) => RequestStatus::PendingSubscription, @@ -284,7 +288,7 @@ impl RequestManager { /// Get a mutable reference to underlying `Sink` in order to send messages to the subscription. /// /// Returns `Some` if the `request_id` was registered as a subscription otherwise `None`. - pub fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> { + pub(crate) fn as_subscription_mut(&mut self, request_id: &RequestId) -> Option<&mut SubscriptionSink> { if let Some(Kind::Subscription((_, sink, _))) = self.requests.get_mut(request_id) { Some(sink) } else { @@ -295,14 +299,14 @@ impl RequestManager { /// Get a mutable reference to underlying `Sink` in order to send incoming notifications to the subscription. /// /// Returns `Some` if the `method` was registered as a NotificationHandler otherwise `None`. - pub fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> { + pub(crate) fn as_notification_handler_mut(&mut self, method: String) -> Option<&mut SubscriptionSink> { self.notification_handlers.get_mut(&method) } /// Reverse lookup to get the request ID for a subscription ID. /// /// Returns `Some` if the subscription ID was registered as a subscription otherwise `None`. - pub fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option { + pub(crate) fn get_request_id_by_subscription_id(&self, sub_id: &SubscriptionId) -> Option { self.subscriptions.get(sub_id).copied() } } @@ -310,7 +314,7 @@ impl RequestManager { #[cfg(test)] mod tests { use super::{Error, RequestManager}; - use futures::channel::{mpsc, oneshot}; + use futures_channel::{mpsc, oneshot}; use jsonrpsee_types::SubscriptionId; use serde_json::Value as JsonValue; diff --git a/ws-client/src/client.rs b/core/src/client/async_client/mod.rs similarity index 70% rename from ws-client/src/client.rs rename to core/src/client/async_client/mod.rs index 778f859ec2..1b89bc2999 100644 --- a/ws-client/src/client.rs +++ b/core/src/client/async_client/mod.rs @@ -1,59 +1,33 @@ -// Copyright 2019-2021 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any -// person obtaining a copy of this software and associated -// documentation files (the "Software"), to deal in the -// Software without restriction, including without -// limitation the rights to use, copy, modify, merge, -// publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software -// is furnished to do so, subject to the following -// conditions: -// -// The above copyright notice and this permission notice -// shall be included in all copies or substantial portions -// of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF -// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED -// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A -// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY -// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION -// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR -// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use std::convert::TryInto; +mod helpers; +mod manager; + use std::time::Duration; -use crate::helpers::{ +use crate::client::{ + BatchMessage, ClientT, RegisterNotificationMessage, RequestMessage, Subscription, SubscriptionClientT, + SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, +}; +use helpers::{ build_unsubscribe_message, call_with_timeout, process_batch_response, process_error_response, process_notification, process_single_response, process_subscription_response, stop_subscription, }; -use crate::manager::RequestManager; -use crate::transport::{Receiver as WsReceiver, Sender as WsSender, WsHandshakeError, WsTransportClientBuilder}; -use crate::types::{ - ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, SubscriptionResponse, - TEN_MB_SIZE_BYTES, -}; +use manager::RequestManager; + +use crate::error::Error; use async_trait::async_trait; -use futures::channel::{mpsc, oneshot}; -use futures::future::Either; -use futures::prelude::*; -use futures::sink::SinkExt; -use http::uri::{InvalidUri, Uri}; -use jsonrpsee_core::client::{ - BatchMessage, CertificateStore, Client, FrontToBack, RegisterNotificationMessage, RequestIdManager, RequestMessage, - Subscription, SubscriptionClient, SubscriptionKind, SubscriptionMessage, +use futures_channel::{mpsc, oneshot}; +use futures_util::future::Either; +use futures_util::sink::SinkExt; +use futures_util::stream::StreamExt; +use jsonrpsee_types::{ + ErrorResponse, Id, Notification, NotificationSer, ParamsSer, RequestSer, Response, SubscriptionResponse, }; -use jsonrpsee_core::Error; use serde::de::DeserializeOwned; use tokio::sync::Mutex; -pub use soketto::handshake::client::Header; +use super::{FrontToBack, RequestIdManager}; -/// Wrapper over a [`oneshot::Receiver`](futures::channel::oneshot::Receiver) that reads +/// Wrapper over a [`oneshot::Receiver`](futures_channel::oneshot::Receiver) that reads /// the underlying channel once and then stores the result in String. /// It is possible that the error is read more than once if several calls are made /// when the background thread has been terminated. @@ -83,171 +57,88 @@ impl ErrorFromBack { } } -/// WebSocket client that works by maintaining a background task running in parallel. -/// -/// It's possible that the background thread is terminated and this makes the client unusable. -/// An error [`Error::RestartNeeded`] is returned if this happens and users has to manually -/// handle dropping and restarting a new client. -#[derive(Debug)] -pub struct WsClient { - /// Channel to send requests to the background task. - to_back: mpsc::Sender, - /// If the background thread terminates the error is sent to this channel. - // NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references. - error: Mutex, - /// Request timeout. Defaults to 60sec. - request_timeout: Duration, - /// Request ID manager. - id_manager: RequestIdManager, -} - -/// Builder for [`WsClient`]. -/// -/// # Examples -/// -/// ```no_run -/// -/// use jsonrpsee_ws_client::WsClientBuilder; -/// -/// #[tokio::main] -/// async fn main() { -/// // build client -/// let client = WsClientBuilder::default() -/// .add_header("Any-Header-You-Like", "42") -/// .build("wss://localhost:443") -/// .await -/// .unwrap(); -/// -/// // use client.... -/// } -/// -/// ``` +/// Builder for [`Client`]. #[derive(Clone, Debug)] -pub struct WsClientBuilder<'a> { - certificate_store: CertificateStore, - max_request_body_size: u32, +pub struct ClientBuilder { request_timeout: Duration, - connection_timeout: Duration, - headers: Vec>, max_concurrent_requests: usize, max_notifs_per_subscription: usize, - max_redirections: usize, } -impl<'a> Default for WsClientBuilder<'a> { +impl Default for ClientBuilder { fn default() -> Self { Self { - certificate_store: CertificateStore::Native, - max_request_body_size: TEN_MB_SIZE_BYTES, request_timeout: Duration::from_secs(60), - connection_timeout: Duration::from_secs(10), - headers: Vec::new(), max_concurrent_requests: 256, max_notifs_per_subscription: 1024, - max_redirections: 5, } } } -impl<'a> WsClientBuilder<'a> { - /// Set whether to use system certificates - pub fn certificate_store(mut self, certificate_store: CertificateStore) -> Self { - self.certificate_store = certificate_store; - self - } - - /// Set max request body size. - pub fn max_request_body_size(mut self, size: u32) -> Self { - self.max_request_body_size = size; - self - } - +impl ClientBuilder { /// Set request timeout (default is 60 seconds). pub fn request_timeout(mut self, timeout: Duration) -> Self { self.request_timeout = timeout; self } - /// Set connection timeout for the handshake. - pub fn connection_timeout(mut self, timeout: Duration) -> Self { - self.connection_timeout = timeout; - self - } - - /// Set a custom header passed to the server during the handshake. - /// - /// The caller is responsible for checking that the headers do not conflict or are duplicated. - pub fn add_header(mut self, name: &'a str, value: &'a str) -> Self { - self.headers.push(Header { name, value: value.as_bytes() }); - self - } - - /// Set max concurrent requests. + /// Set max concurrent requests (default is 256). pub fn max_concurrent_requests(mut self, max: usize) -> Self { self.max_concurrent_requests = max; self } /// Set max concurrent notification capacity for each subscription; when the capacity is exceeded the subscription - /// will be dropped. + /// will be dropped (default is 1024). /// - /// You can also prevent the subscription being dropped by calling - /// [`Subscription::next()`](../../jsonrpsee_core/client/struct.Subscription.html#method.next) frequently enough such that the buffer capacity doesn't - /// exceeds. + /// You may prevent the subscription from being dropped by polling often enough + /// [`Subscription::next()`](../../jsonrpsee_core/client/struct.Subscription.html#method.next) such that + /// it can keep with the rate as server produces new items on the subscription. /// /// **Note**: The actual capacity is `num_senders + max_subscription_capacity` - /// because it is passed to [`futures::channel::mpsc::channel`]. + /// because it is passed to [`futures_channel::mpsc::channel`]. pub fn max_notifs_per_subscription(mut self, max: usize) -> Self { self.max_notifs_per_subscription = max; self } - /// Set the max number of redirections to perform until a connection is regarded as failed. - pub fn max_redirections(mut self, redirect: usize) -> Self { - self.max_redirections = redirect; - self - } - - /// Build the client with specified URL to connect to. - /// You must provide the port number in the URL. + /// Build the client with given transport. /// /// ## Panics /// /// Panics if being called outside of `tokio` runtime context. - pub async fn build(self, uri: &'a str) -> Result { - let certificate_store = self.certificate_store; - let max_capacity_per_subscription = self.max_notifs_per_subscription; - let max_concurrent_requests = self.max_concurrent_requests; - let request_timeout = self.request_timeout; + pub fn build(self, sender: S, receiver: R) -> Client { let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests); let (err_tx, err_rx) = oneshot::channel(); - - let uri: Uri = uri.parse().map_err(|e: InvalidUri| Error::Transport(e.into()))?; - - let builder = WsTransportClientBuilder { - certificate_store, - target: uri.try_into().map_err(|e: WsHandshakeError| Error::Transport(e.into()))?, - timeout: self.connection_timeout, - headers: self.headers, - max_request_body_size: self.max_request_body_size, - max_redirections: self.max_redirections, - }; - - let (sender, receiver) = builder.build().await.map_err(|e| Error::Transport(e.into()))?; + let max_notifs_per_subscription = self.max_notifs_per_subscription; tokio::spawn(async move { - background_task(sender, receiver, from_front, err_tx, max_capacity_per_subscription).await; + background_task(sender, receiver, from_front, err_tx, max_notifs_per_subscription).await; }); - Ok(WsClient { + Client { to_back, - request_timeout, + request_timeout: self.request_timeout, error: Mutex::new(ErrorFromBack::Unread(err_rx)), - id_manager: RequestIdManager::new(max_concurrent_requests), - }) + id_manager: RequestIdManager::new(self.max_concurrent_requests), + } } } -impl WsClient { +/// Generic asyncronous client. +#[derive(Debug)] +pub struct Client { + /// Channel to send requests to the background task. + to_back: mpsc::Sender, + /// If the background thread terminates the error is sent to this channel. + // NOTE(niklasad1): This is a Mutex to circumvent that the async fns takes immutable references. + error: Mutex, + /// Request timeout. Defaults to 60sec. + request_timeout: Duration, + /// Request ID manager. + id_manager: RequestIdManager, +} + +impl Client { /// Checks if the client is connected to the target. pub fn is_connected(&self) -> bool { !self.to_back.is_closed() @@ -263,14 +154,20 @@ impl WsClient { } } -impl Drop for WsClient { +impl From<(S, R)> for Client { + fn from(transport: (S, R)) -> Client { + ClientBuilder::default().build(transport.0, transport.1) + } +} + +impl Drop for Client { fn drop(&mut self) { self.to_back.close_channel(); } } #[async_trait] -impl Client for WsClient { +impl ClientT for Client { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { // NOTE: we use this to guard against max number of concurrent requests. let _req_id = self.id_manager.next_request_id()?; @@ -362,7 +259,7 @@ impl Client for WsClient { } #[async_trait] -impl SubscriptionClient for WsClient { +impl SubscriptionClientT for Client { /// Send a subscription request to the server. /// /// The `subscribe_method` and `params` are used to ask for the subscription towards the @@ -447,28 +344,28 @@ impl SubscriptionClient for WsClient { } /// Function being run in the background that processes messages from the frontend. -async fn background_task( - mut sender: WsSender, - receiver: WsReceiver, +async fn background_task( + mut sender: S, + receiver: R, mut frontend: mpsc::Receiver, front_error: oneshot::Sender, max_notifs_per_subscription: usize, ) { let mut manager = RequestManager::new(); - let backend_event = futures::stream::unfold(receiver, |mut receiver| async { - let res = receiver.next_response().await; + let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async { + let res = receiver.receive().await; Some((res, receiver)) }); - futures::pin_mut!(backend_event); + futures_util::pin_mut!(backend_event); loop { let next_frontend = frontend.next(); let next_backend = backend_event.next(); - futures::pin_mut!(next_frontend, next_backend); + futures_util::pin_mut!(next_frontend, next_backend); - match future::select(next_frontend, next_backend).await { + match futures_util::future::select(next_frontend, next_backend).await { // User dropped the sender side of the channel. // There is nothing to do just terminate. Either::Left((None, _)) => { @@ -559,7 +456,7 @@ async fn background_task( } Either::Right((Some(Ok(raw)), _)) => { // Single response to a request. - if let Ok(single) = serde_json::from_slice::>(&raw) { + if let Ok(single) = serde_json::from_str::>(&raw) { tracing::debug!("[backend]: recv method_call {:?}", single); match process_single_response(&mut manager, single, max_notifs_per_subscription) { Ok(Some(unsub)) => { @@ -573,19 +470,19 @@ async fn background_task( } } // Subscription response. - else if let Ok(response) = serde_json::from_slice::>(&raw) { + else if let Ok(response) = serde_json::from_str::>(&raw) { tracing::debug!("[backend]: recv subscription {:?}", response); if let Err(Some(unsub)) = process_subscription_response(&mut manager, response) { let _ = stop_subscription(&mut sender, &mut manager, unsub).await; } } // Incoming Notification - else if let Ok(notif) = serde_json::from_slice::>(&raw) { + else if let Ok(notif) = serde_json::from_str::>(&raw) { tracing::debug!("[backend]: recv notification {:?}", notif); let _ = process_notification(&mut manager, notif); } // Batch response. - else if let Ok(batch) = serde_json::from_slice::>>(&raw) { + else if let Ok(batch) = serde_json::from_str::>>(&raw) { tracing::debug!("[backend]: recv batch {:?}", batch); if let Err(e) = process_batch_response(&mut manager, batch) { let _ = front_error.send(e); @@ -593,7 +490,7 @@ async fn background_task( } } // Error response - else if let Ok(err) = serde_json::from_slice::(&raw) { + else if let Ok(err) = serde_json::from_str::(&raw) { tracing::debug!("[backend]: recv error response {:?}", err); if let Err(e) = process_error_response(&mut manager, err) { let _ = front_error.send(e); @@ -604,7 +501,7 @@ async fn background_task( else { tracing::debug!( "[backend]: recv unparseable message: {:?}", - serde_json::from_slice::(&raw) + serde_json::from_str::(&raw) ); let _ = front_error.send(Error::Custom("Unparsable response".into())); break; @@ -622,7 +519,6 @@ async fn background_task( } } } - // Send close message to the server. let _ = sender.close().await; } diff --git a/core/src/client.rs b/core/src/client/mod.rs similarity index 92% rename from core/src/client.rs rename to core/src/client/mod.rs index 268bdac18a..8ad4c59918 100644 --- a/core/src/client.rs +++ b/core/src/client/mod.rs @@ -47,9 +47,16 @@ pub mod __reexports { pub use jsonrpsee_types::ParamsSer; } +/// Async client abstraction that brings additional deps. +#[cfg(feature = "async-client")] +mod async_client; + +#[cfg(feature = "async-client")] +pub use async_client::{Client, ClientBuilder}; + /// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests and notifications. #[async_trait] -pub trait Client { +pub trait ClientT { /// Send a [notification request](https://www.jsonrpc.org/specification#notification) async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error>; @@ -71,7 +78,7 @@ pub trait Client { /// [JSON-RPC](https://www.jsonrpc.org/specification) client interface that can make requests, notifications and subscriptions. #[async_trait] -pub trait SubscriptionClient: Client { +pub trait SubscriptionClientT: ClientT { /// Initiate a subscription by performing a JSON-RPC method call where the server responds with /// a `Subscription ID` that is used to fetch messages on that subscription, /// @@ -102,6 +109,32 @@ pub trait SubscriptionClient: Client { Notif: DeserializeOwned; } +/// Transport interface to send data asynchronous. +#[async_trait] +/// Transport interface for an asyncronous client. +pub trait TransportSenderT: Send + 'static { + /// Error. + type Error: std::error::Error + Send + Sync; + + /// Send. + async fn send(&mut self, msg: String) -> Result<(), Self::Error>; + + /// If the transport supports sending customized close messages. + async fn close(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// Transport interface to receive data asynchronous. +#[async_trait] +pub trait TransportReceiverT: Send + 'static { + /// Error that occur during send or receiving a message. + type Error: std::error::Error + Send + Sync; + + /// Receive. + async fn receive(&mut self) -> Result; +} + #[macro_export] /// Convert the given values to a [`jsonrpsee_types::ParamsSer`] as expected by a jsonrpsee Client (http or websocket). macro_rules! rpc_params { @@ -356,6 +389,16 @@ impl RequestIdGuard { } } +/// What certificate store to use +#[derive(Clone, Copy, Debug, PartialEq)] +#[non_exhaustive] +pub enum CertificateStore { + /// Use the native system certificate store + Native, + /// Use WebPKI's certificate store + WebPki, +} + #[cfg(test)] mod tests { use super::RequestIdManager; @@ -374,13 +417,3 @@ mod tests { assert!(manager.next_request_id().is_ok()); } } - -/// What certificate store to use -#[derive(Clone, Copy, Debug, PartialEq)] -#[non_exhaustive] -pub enum CertificateStore { - /// Use the native system certificate store - Native, - /// Use WebPKI's certificate store - WebPki, -} diff --git a/core/src/lib.rs b/core/src/lib.rs index 396d7c1301..38c16a5159 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -70,3 +70,6 @@ pub use serde_json::{ to_value as to_json_value, value::to_raw_value as to_json_raw_value, value::RawValue as JsonRawValue, Value as JsonValue, }; + +/// Ten megabytes. +pub const TEN_MB_SIZE_BYTES: u32 = 10 * 1024 * 1024; diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 078cca1d66..3d9b6bc3fc 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-examples" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies "] description = "Examples for jsonrpsee" edition = "2018" @@ -8,7 +8,8 @@ publish = false [dev-dependencies] anyhow = "1" -env_logger = "0.9.0" +env_logger = "0.9" +futures = "0.3" jsonrpsee = { path = "../jsonrpsee", features = ["full"] } tracing = "0.1" tracing-subscriber = "0.2" @@ -45,3 +46,7 @@ path = "ws_sub_with_params.rs" [[example]] name = "proc_macro" path = "proc_macro.rs" + +[[example]] +name = "core_client" +path = "core_client.rs" diff --git a/examples/core_client.rs b/examples/core_client.rs new file mode 100644 index 0000000000..b49f954e29 --- /dev/null +++ b/examples/core_client.rs @@ -0,0 +1,57 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::net::SocketAddr; + +use jsonrpsee::client_transport::ws::{Uri, WsTransportClientBuilder}; +use jsonrpsee::core::client::{Client, ClientT}; +use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let addr = run_server().await?; + let uri: Uri = format!("ws://{}", addr).parse()?; + + let client: Client = WsTransportClientBuilder::default().build(uri).await?.into(); + let response: String = client.request("say_hello", None).await?; + tracing::info!("response: {:?}", response); + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + let server = WsServerBuilder::default().build("127.0.0.1:0").await?; + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _| Ok("lo"))?; + let addr = server.local_addr()?; + server.start(module)?; + Ok(addr) +} diff --git a/examples/http.rs b/examples/http.rs index c2dc66603d..5c91df6c45 100644 --- a/examples/http.rs +++ b/examples/http.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; -use jsonrpsee::core::client::Client; +use jsonrpsee::core::client::ClientT; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; use jsonrpsee::rpc_params; diff --git a/examples/middleware_http.rs b/examples/middleware_http.rs index 124c7612e0..c92afa46a4 100644 --- a/examples/middleware_http.rs +++ b/examples/middleware_http.rs @@ -27,7 +27,7 @@ use std::net::SocketAddr; use std::time::Instant; -use jsonrpsee::core::{client::Client, middleware}; +use jsonrpsee::core::{client::ClientT, middleware}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle, RpcModule}; diff --git a/examples/middleware_ws.rs b/examples/middleware_ws.rs index aac3556453..6c2afe963d 100644 --- a/examples/middleware_ws.rs +++ b/examples/middleware_ws.rs @@ -27,7 +27,7 @@ use std::net::SocketAddr; use std::time::Instant; -use jsonrpsee::core::{client::Client, middleware}; +use jsonrpsee::core::{client::ClientT, middleware}; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; diff --git a/examples/multi_middleware.rs b/examples/multi_middleware.rs index 6b411115e5..63f875ca97 100644 --- a/examples/multi_middleware.rs +++ b/examples/multi_middleware.rs @@ -30,7 +30,7 @@ use std::net::SocketAddr; use std::process::Command; use std::time::Instant; -use jsonrpsee::core::{client::Client, middleware}; +use jsonrpsee::core::{client::ClientT, middleware}; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; diff --git a/examples/ws.rs b/examples/ws.rs index a6a5032173..43be8fd742 100644 --- a/examples/ws.rs +++ b/examples/ws.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; -use jsonrpsee::core::client::Client; +use jsonrpsee::core::client::ClientT; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; diff --git a/examples/ws_sub_with_params.rs b/examples/ws_sub_with_params.rs index 90b65129a2..df8cd32729 100644 --- a/examples/ws_sub_with_params.rs +++ b/examples/ws_sub_with_params.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; -use jsonrpsee::core::client::SubscriptionClient; +use jsonrpsee::core::client::SubscriptionClientT; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::ws_server::{RpcModule, WsServerBuilder}; diff --git a/examples/ws_subscription.rs b/examples/ws_subscription.rs index 1ce239c2ef..16cb170d8d 100644 --- a/examples/ws_subscription.rs +++ b/examples/ws_subscription.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; -use jsonrpsee::core::client::{Subscription, SubscriptionClient}; +use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use jsonrpsee::core::Error; use jsonrpsee::rpc_params; use jsonrpsee::ws_client::WsClientBuilder; diff --git a/http-server/Cargo.toml b/http-server/Cargo.toml index 80d1d51493..bc17b8b5e8 100644 --- a/http-server/Cargo.toml +++ b/http-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-http-server" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies ", "Pierre Krieger "] description = "HTTP server for JSON-RPC" edition = "2018" @@ -13,8 +13,8 @@ documentation = "https://docs.rs/jsonrpsee-http-server" hyper = { version = "0.14.10", features = ["server", "http1", "http2", "tcp"] } futures-channel = "0.3.14" futures-util = { version = "0.3.14", default-features = false } -jsonrpsee-types = { path = "../types", version = "0.6.0" } -jsonrpsee-core = { path = "../core", version = "0.6.0", features = ["server", "http-helpers"] } +jsonrpsee-types = { path = "../types", version = "0.6.1" } +jsonrpsee-core = { path = "../core", version = "0.6.1", features = ["server", "http-helpers"] } globset = "0.4" lazy_static = "1.4" tracing = "0.1" diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 5e51f06c37..8a81aa3b43 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -42,8 +42,9 @@ use jsonrpsee_core::middleware::Middleware; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{MethodResult, Methods}; +use jsonrpsee_core::TEN_MB_SIZE_BYTES; use jsonrpsee_types::error::ErrorCode; -use jsonrpsee_types::{Id, Notification, Request, TEN_MB_SIZE_BYTES}; +use jsonrpsee_types::{Id, Notification, Request}; use serde_json::value::RawValue; use socket2::{Domain, Socket, Type}; diff --git a/jsonrpsee/Cargo.toml b/jsonrpsee/Cargo.toml index ee6d143c02..a41f9f4b13 100644 --- a/jsonrpsee/Cargo.toml +++ b/jsonrpsee/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "jsonrpsee" description = "JSON-RPC crate" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies ", "Pierre Krieger "] license = "MIT" edition = "2018" @@ -12,16 +12,19 @@ documentation = "https://docs.rs/jsonrpsee" [dependencies] # No support for namespaced features yet so workspace dependencies are prefixed with `jsonrpsee-`. # See https://github.com/rust-lang/cargo/issues/5565 for more details. -jsonrpsee-http-client = { path = "../http-client", version = "0.6.0", package = "jsonrpsee-http-client", optional = true } -jsonrpsee-http-server = { path = "../http-server", version = "0.6.0", package = "jsonrpsee-http-server", optional = true } -jsonrpsee-ws-client = { path = "../ws-client", version = "0.6.0", package = "jsonrpsee-ws-client", optional = true } -jsonrpsee-ws-server = { path = "../ws-server", version = "0.6.0", package = "jsonrpsee-ws-server", optional = true } -jsonrpsee-proc-macros = { path = "../proc-macros", version = "0.6.0", package = "jsonrpsee-proc-macros", optional = true } -jsonrpsee-core = { path = "../core", version = "0.6.0", package = "jsonrpsee-core", optional = true } -jsonrpsee-types = { path = "../types", version = "0.6.0", package = "jsonrpsee-types", optional = true } +jsonrpsee-http-client = { path = "../client/http-client", version = "0.6.1", package = "jsonrpsee-http-client", optional = true } +jsonrpsee-ws-client = { path = "../client/ws-client", version = "0.6.1", package = "jsonrpsee-ws-client", optional = true } +jsonrpsee-client-transport = { path = "../client/transport", version = "0.6.1", package = "jsonrpsee-client-transport", optional = true } +jsonrpsee-http-server = { path = "../http-server", version = "0.6.1", package = "jsonrpsee-http-server", optional = true } +jsonrpsee-ws-server = { path = "../ws-server", version = "0.6.1", package = "jsonrpsee-ws-server", optional = true } +jsonrpsee-proc-macros = { path = "../proc-macros", version = "0.6.1", package = "jsonrpsee-proc-macros", optional = true } +jsonrpsee-core = { path = "../core", version = "0.6.1", package = "jsonrpsee-core", optional = true } +jsonrpsee-types = { path = "../types", version = "0.6.1", package = "jsonrpsee-types", optional = true } [features] -http-client = ["jsonrpsee-http-client", "jsonrpsee-types", "jsonrpsee-core/client"] +client-ws-transport = ["jsonrpsee-client-transport/ws"] +core-client = ["jsonrpsee-core/client"] +http-client = ["jsonrpsee-http-client", "jsonrpsee-types", "jsonrpsee-core"] http-server = ["jsonrpsee-http-server", "jsonrpsee-types", "jsonrpsee-core"] ws-client = ["jsonrpsee-ws-client", "jsonrpsee-types", "jsonrpsee-core/client"] ws-server = ["jsonrpsee-ws-server", "jsonrpsee-types", "jsonrpsee-core"] @@ -29,4 +32,4 @@ macros = ["jsonrpsee-proc-macros", "jsonrpsee-types"] client = ["http-client", "ws-client"] server = ["http-server", "ws-server"] -full = ["client", "server", "macros"] +full = ["client", "server", "macros", "core-client", "client-ws-transport"] diff --git a/jsonrpsee/src/lib.rs b/jsonrpsee/src/lib.rs index 098ecff7bd..b912325320 100644 --- a/jsonrpsee/src/lib.rs +++ b/jsonrpsee/src/lib.rs @@ -52,6 +52,9 @@ pub use jsonrpsee_http_client as http_client; #[cfg(feature = "jsonrpsee-ws-client")] pub use jsonrpsee_ws_client as ws_client; +#[cfg(feature = "jsonrpsee-client-transport")] +pub use jsonrpsee_client_transport as client_transport; + /// JSON-RPC client convenience macro to build params. #[cfg(any(feature = "http-client", feature = "ws-client"))] pub use jsonrpsee_core::rpc_params; @@ -76,7 +79,7 @@ pub use jsonrpsee_types as types; #[cfg(any(feature = "http-server", feature = "ws-server"))] pub use jsonrpsee_core::server::rpc_module::{RpcModule, SubscriptionSink}; -#[cfg(any(feature = "http-server", feature = "ws-server"))] +#[cfg(any(feature = "http-server", feature = "ws-server", feature = "core-client"))] pub use jsonrpsee_core as core; #[cfg(feature = "http-server")] diff --git a/proc-macros/Cargo.toml b/proc-macros/Cargo.toml index 4e350017ab..384548b570 100644 --- a/proc-macros/Cargo.toml +++ b/proc-macros/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "jsonrpsee-proc-macros" description = "Procedueral macros for jsonrpsee" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies ", "Pierre Krieger "] license = "MIT" edition = "2018" diff --git a/proc-macros/src/render_client.rs b/proc-macros/src/render_client.rs index 6ba94cedd9..39e077687a 100644 --- a/proc-macros/src/render_client.rs +++ b/proc-macros/src/render_client.rs @@ -41,9 +41,9 @@ impl RpcDescription { let (impl_generics, type_generics, _) = self.trait_def.generics.split_for_impl(); let super_trait = if self.subscriptions.is_empty() { - quote! { #jsonrpsee::core::client::Client } + quote! { #jsonrpsee::core::client::ClientT } } else { - quote! { #jsonrpsee::core::client::SubscriptionClient } + quote! { #jsonrpsee::core::client::SubscriptionClientT } }; let method_impls = diff --git a/proc-macros/tests/ui/correct/basic.rs b/proc-macros/tests/ui/correct/basic.rs index f97095ae3f..7b4b11c95e 100644 --- a/proc-macros/tests/ui/correct/basic.rs +++ b/proc-macros/tests/ui/correct/basic.rs @@ -4,7 +4,7 @@ use std::net::SocketAddr; use jsonrpsee::proc_macros::rpc; use jsonrpsee::rpc_params; -use jsonrpsee::core::{async_trait, client::Client, RpcResult}; +use jsonrpsee::core::{async_trait, client::ClientT, RpcResult}; use jsonrpsee::ws_client::*; use jsonrpsee::ws_server::{SubscriptionSink, WsServerBuilder}; diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index b4e76853e6..44269b1afe 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-test-utils" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies "] license = "MIT" edition = "2018" diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 9daa5653e2..0d208778b0 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-integration-tests" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies "] description = "Integration tests for jsonrpsee" edition = "2018" @@ -8,7 +8,7 @@ license = "MIT" publish = false [dev-dependencies] -env_logger = "0.9.0" +env_logger = "0.9" beef = { version = "0.5.1", features = ["impl_serde"] } futures = { version = "0.3.14", default-features = false, features = ["std"] } jsonrpsee = { path = "../jsonrpsee", features = ["full"] } diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 50b964214d..874e504c12 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -31,7 +31,7 @@ use std::sync::Arc; use std::time::Duration; use helpers::{http_server, websocket_server, websocket_server_with_subscription}; -use jsonrpsee::core::client::{Client, Subscription, SubscriptionClient}; +use jsonrpsee::core::client::{ClientT, Subscription, SubscriptionClientT}; use jsonrpsee::core::error::SubscriptionClosedReason; use jsonrpsee::core::{Error, JsonValue}; use jsonrpsee::http_client::HttpClientBuilder; diff --git a/tests/tests/middleware.rs b/tests/tests/middleware.rs index 28fee2fbde..7564c98768 100644 --- a/tests/tests/middleware.rs +++ b/tests/tests/middleware.rs @@ -29,7 +29,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use std::time::Duration; -use jsonrpsee::core::{client::Client, middleware::Middleware, Error}; +use jsonrpsee::core::{client::ClientT, middleware::Middleware, Error}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; use jsonrpsee::proc_macros::rpc; diff --git a/tests/tests/resource_limiting.rs b/tests/tests/resource_limiting.rs index f0d0837a7c..b8f555d78a 100644 --- a/tests/tests/resource_limiting.rs +++ b/tests/tests/resource_limiting.rs @@ -27,7 +27,7 @@ use std::net::SocketAddr; use std::time::Duration; -use jsonrpsee::core::client::Client; +use jsonrpsee::core::client::ClientT; use jsonrpsee::core::Error; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_server::{HttpServerBuilder, HttpServerHandle}; diff --git a/types/Cargo.toml b/types/Cargo.toml index 423f57d0c3..0091983023 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-types" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies "] description = "Shared types for jsonrpsee" edition = "2018" diff --git a/types/src/lib.rs b/types/src/lib.rs index 9b3d5909c6..be18495b4b 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -31,9 +31,6 @@ extern crate alloc; -/// Ten megabytes. -pub const TEN_MB_SIZE_BYTES: u32 = 10 * 1024 * 1024; - /// JSON-RPC params related types. pub mod params; diff --git a/ws-client/Cargo.toml b/ws-client/Cargo.toml deleted file mode 100644 index 2282667053..0000000000 --- a/ws-client/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -name = "jsonrpsee-ws-client" -version = "0.6.0" -authors = ["Parity Technologies ", "Pierre Krieger "] -description = "WebSocket client for JSON-RPC" -edition = "2018" -license = "MIT" -repository = "https://github.com/paritytech/jsonrpsee" -homepage = "https://github.com/paritytech/jsonrpsee" -documentation = "https://docs.rs/jsonrpsee-ws-client" - -[dependencies] -async-trait = "0.1" -beef = "0.5.1" -rustc-hash = "1" -futures = { version = "0.3.14", default-features = false, features = ["std"] } -http = "0.2" -jsonrpsee-types = { path = "../types", version = "0.6.0" } -jsonrpsee-core = { path = "../core", features = ["client"] } -pin-project = "1" -rustls-native-certs = "0.6.0" -serde = "1" -serde_json = "1" -soketto = "0.7.1" -thiserror = "1" -tokio = { version = "1.8", features = ["net", "time", "rt-multi-thread", "macros"] } -tokio-rustls = { version = "0.23", optional = true } -tokio-util = { version = "0.6", features = ["compat"] } -tracing = "0.1" -webpki-roots = "0.22.0" - -[dev-dependencies] -env_logger = "0.9.0" -jsonrpsee-test-utils = { path = "../test-utils" } -tokio = { version = "1.8", features = ["macros"] } - -[features] -default = ["tls"] -tls = ["tokio-rustls"] diff --git a/ws-server/Cargo.toml b/ws-server/Cargo.toml index d3b4e04e3e..d288602652 100644 --- a/ws-server/Cargo.toml +++ b/ws-server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "jsonrpsee-ws-server" -version = "0.6.0" +version = "0.6.1" authors = ["Parity Technologies ", "Pierre Krieger "] description = "WebSocket server for JSON-RPC" edition = "2018" @@ -13,7 +13,7 @@ documentation = "https://docs.rs/jsonrpsee-ws-server" futures-channel = "0.3.14" futures-util = { version = "0.3.14", default-features = false, features = ["io", "async-await-macro"] } jsonrpsee-types = { path = "../types", version = "0.6.0" } -jsonrpsee-core = { path = "../core", version = "0.6.0", features = ["server"] } +jsonrpsee-core = { path = "../core", version = "0.6.1", features = ["server"] } tracing = "0.1" serde_json = { version = "1", features = ["raw_value"] } soketto = "0.7.1" diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 8adaef98fc..3aa470d8c2 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -31,7 +31,7 @@ use std::task::{Context, Poll}; use crate::future::{FutureDriver, ServerHandle, StopMonitor}; use crate::types::error::ErrorCode; -use crate::types::{Id, Request, TEN_MB_SIZE_BYTES}; +use crate::types::{Id, Request}; use futures_channel::mpsc; use futures_util::future::{join_all, FutureExt}; use futures_util::io::{BufReader, BufWriter}; @@ -40,7 +40,7 @@ use jsonrpsee_core::middleware::Middleware; use jsonrpsee_core::server::helpers::{collect_batch_response, prepare_error, MethodSink}; use jsonrpsee_core::server::resource_limiting::Resources; use jsonrpsee_core::server::rpc_module::{ConnectionId, MethodResult, Methods}; -use jsonrpsee_core::Error; +use jsonrpsee_core::{Error, TEN_MB_SIZE_BYTES}; use soketto::connection::Error as SokettoError; use soketto::handshake::{server::Response, Server as SokettoServer}; use soketto::Sender;