diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e06060e0..08fdfa4f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -44,7 +44,7 @@ jobs: - name: Cargo doc run: | - cargo doc --workspace --no-deps + cargo doc --workspace --no-deps --all-features env: RUSTDOCFLAGS: "--deny warnings" @@ -94,7 +94,7 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: Cargo test - run: cargo test --workspace -- --test-threads=2 --nocapture + run: cargo test --workspace --all-features -- --test-threads=2 --nocapture e2e: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 9bb479a5..7d2363fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,11 +309,14 @@ version = "0.1.0" dependencies = [ "assert_matches", "candid", + "futures-util", "http", "ic-cdk", "maplit", "num-traits", "pin-project", + "serde", + "serde_json", "thiserror 2.0.11", "tokio", "tower", @@ -773,6 +776,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", + "thiserror 2.0.11", "thousands", "tower", "tower-http", diff --git a/Cargo.toml b/Cargo.toml index 8fb76f0d..612498d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ inherits = "release" [dependencies] candid = { workspace = true } -canhttp = { path = "canhttp" } +canhttp = { path = "canhttp", features = ["json"] } ethnum = { workspace = true } evm_rpc_types = { path = "evm_rpc_types" } futures = { workspace = true } @@ -36,9 +36,10 @@ minicbor = { workspace = true } serde = { workspace = true } serde_bytes = { workspace = true } serde_json = { workspace = true } +thiserror = { workspace = true } thousands = "0.2" -tower = {workspace = true} -tower-http = {workspace = true, features = ["set-header", "util"]} +tower = { workspace = true } +tower-http = { workspace = true, features = ["set-header", "util"] } url = { workspace = true } hex = "0.4" ethers-core = "2.0" @@ -59,9 +60,10 @@ rand = "0.8" [workspace.dependencies] assert_matches = "1.5.0" candid = { version = "0.10.13" } -candid_parser = {version = "0.1.4"} +candid_parser = { version = "0.1.4" } ethnum = { version = "1.5.0", features = ["serde"] } futures = "0.3.31" +futures-util = "0.3.31" getrandom = { version = "0.2", features = ["custom"] } hex = "0.4.3" http = "1.2.0" @@ -90,4 +92,4 @@ thiserror = "2.0.11" url = "2.5" [workspace] -members = [ "canhttp", "e2e/rust", "evm_rpc_types"] +members = ["canhttp", "e2e/rust", "evm_rpc_types"] diff --git a/canhttp/Cargo.toml b/canhttp/Cargo.toml index 16b80d69..3d13f210 100644 --- a/canhttp/Cargo.toml +++ b/canhttp/Cargo.toml @@ -11,10 +11,13 @@ documentation = "https://docs.rs/canhttp" [dependencies] ic-cdk = { workspace = true } +futures-util = { workspace = true } http = { workspace = true, optional = true } num-traits = { workspace = true, optional = true } pin-project = { workspace = true } -tower = { workspace = true, features = ["filter", "retry"] } +serde = { workspace = true, optional = true } +serde_json = { workspace = true, optional = true } +tower = { workspace = true, features = ["retry"] } tower-layer = { workspace = true, optional = true } thiserror = { workspace = true } @@ -26,4 +29,5 @@ tokio = { workspace = true, features = ["full"] } [features] default = ["http"] -http = ["dep:http", "dep:num-traits", "dep:tower-layer"] \ No newline at end of file +http = ["dep:http", "dep:num-traits", "dep:tower-layer"] +json = ["http", "dep:serde", "dep:serde_json"] \ No newline at end of file diff --git a/canhttp/src/client/mod.rs b/canhttp/src/client/mod.rs index 0868291b..9f1e3f53 100644 --- a/canhttp/src/client/mod.rs +++ b/canhttp/src/client/mod.rs @@ -6,7 +6,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use thiserror::Error; -use tower::{BoxError, Service}; +use tower::{BoxError, Service, ServiceBuilder}; /// Thin wrapper around [`ic_cdk::api::management_canister::http_request::http_request`] /// that implements the [`tower::Service`] trait. Its functionality can be extended by composing so-called @@ -19,6 +19,22 @@ use tower::{BoxError, Service}; #[derive(Clone, Debug)] pub struct Client; +impl Client { + /// Create a new client returning custom errors. + pub fn new_with_error>( + ) -> impl Service { + ServiceBuilder::new() + .map_err(CustomError::from) + .service(Client) + } + + /// Creates a new client where error type is erased. + pub fn new_with_box_error( + ) -> impl Service { + Self::new_with_error::() + } +} + /// Error returned by the Internet Computer when making an HTTPs outcall. #[derive(Error, Clone, Debug, PartialEq, Eq)] #[error("Error from ICP: (code {code:?}, message {message})")] @@ -42,7 +58,7 @@ impl IcError { impl Service for Client { type Response = IcHttpResponse; - type Error = BoxError; + type Error = IcError; type Future = Pin>>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { @@ -58,14 +74,17 @@ impl Service for Client { .await { Ok((response,)) => Ok(response), - Err((code, message)) => Err(BoxError::from(IcError { code, message })), + Err((code, message)) => Err(IcError { code, message }), } }) } } -#[derive(Clone, Debug, PartialEq, Eq)] +/// [`IcHttpRequest`] specifying how many cycles should be attached for the HTTPs outcall. +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct IcHttpRequestWithCycles { + /// Request to be made. pub request: IcHttpRequest, + /// Number of cycles to attach. pub cycles: u128, } diff --git a/canhttp/src/convert/mod.rs b/canhttp/src/convert/mod.rs new file mode 100644 index 00000000..076607ef --- /dev/null +++ b/canhttp/src/convert/mod.rs @@ -0,0 +1,148 @@ +//! Fallible conversion from one type to another that can be used as a tower middleware. +//! +//! # Examples +//! +//! ## To convert requests +//! +//! A converter can be used to convert request types: +//! * If the result of the conversion is [`Ok`], the converted type will be forwarded to the inner service. +//! * If the result of the conversion is [`Err`], the error will be returned and the inner service will *not* be called. +//! +//! When used to convert requests (with [`ConvertRequestLayer`], the functionality offered by [`Convert`] is similar to that of +//! [`Predicate`](https://docs.rs/tower/0.5.2/tower/filter/trait.Predicate.html) in that it can act as a *filter*. The main difference is that the error does not need to be boxed. +//! +//! ```rust +//! use std::convert::Infallible; +//! use canhttp::convert::{Convert, ConvertServiceBuilder}; +//! use tower::{ServiceBuilder, Service, ServiceExt}; +//! +//! async fn bare_bone_service(request: Vec) -> Result, Infallible> { +//! Ok(request) +//! } +//! +//! struct UsefulRequest(Vec); +//! +//! #[derive(Clone)] +//! struct UsefulRequestConverter; +//! +//! impl Convert for UsefulRequestConverter { +//! type Output = Vec; +//! type Error = Infallible; +//! +//! fn try_convert(&mut self, input: UsefulRequest) -> Result { +//! Ok(input.0) +//! } +//! } +//! +//! # #[tokio::main] +//! # async fn main() -> Result<(), Box> { +//! let mut service = ServiceBuilder::new() +//! .convert_request(UsefulRequestConverter) +//! .service_fn(bare_bone_service); +//! +//! let request = UsefulRequest(vec![42]); +//! +//! let response = service +//! .ready() +//! .await? +//! .call(request) +//! .await?; +//! +//! assert_eq!(response, vec![42_u8]); +//! # Ok(()) +//! # } +//! ``` +//! +//! ## To convert responses +//! +//! A converter can be used to convert response types: +//! ```rust +//! use std::convert::Infallible; +//! use canhttp::convert::{Convert, ConvertServiceBuilder}; +//! use tower::{ServiceBuilder, Service, ServiceExt}; +//! +//! async fn bare_bone_service(request: Vec) -> Result, Infallible> { +//! Ok(request) +//! } +//! +//! #[derive(Debug, PartialEq)] +//! struct UsefulResponse(Vec); +//! +//! #[derive(Clone)] +//! struct UsefulResponseConverter; +//! +//! impl Convert> for UsefulResponseConverter { +//! type Output = UsefulResponse; +//! type Error = Infallible; +//! +//! fn try_convert(&mut self, input: Vec) -> Result { +//! Ok(UsefulResponse(input)) +//! } +//! } +//! +//! # #[tokio::main] +//! # async fn main() -> Result<(), Box> { +//! let mut service = ServiceBuilder::new() +//! .convert_response(UsefulResponseConverter) +//! .service_fn(bare_bone_service); +//! +//! let request = vec![42]; +//! +//! let response = service +//! .ready() +//! .await? +//! .call(request) +//! .await?; +//! +//! assert_eq!(response, UsefulResponse(vec![42_u8])); +//! # Ok(()) +//! # } +//! ``` + +pub use request::{ConvertRequest, ConvertRequestLayer}; +pub use response::{ConvertResponse, ConvertResponseLayer}; + +mod request; +mod response; + +use tower::ServiceBuilder; +use tower_layer::Stack; + +/// Fallible conversion from one type to another. +pub trait Convert { + /// Converted type if the conversion succeeds. + type Output; + /// Error type if the conversion fails + type Error; + + /// Try to convert an instance of the input type to the output type. + /// The conversion may fail, in which case an error is returned. + fn try_convert(&mut self, response: Input) -> Result; +} + +/// Extension trait that adds methods to [`tower::ServiceBuilder`] for adding middleware +/// based on fallible conversion between types. +pub trait ConvertServiceBuilder { + /// Convert the request type. + /// + /// See the [module docs](crate::convert) for examples. + fn convert_request(self, f: C) -> ServiceBuilder, L>>; + + /// Convert the response type. + /// + /// See the [module docs](crate::convert) for examples. + fn convert_response(self, f: C) -> ServiceBuilder, L>>; +} + +impl ConvertServiceBuilder for ServiceBuilder { + fn convert_request(self, converter: C) -> ServiceBuilder, L>> { + self.layer(ConvertRequestLayer::new(converter)) + } + + fn convert_response( + self, + converter: C, + ) -> ServiceBuilder, L>> { + self.layer(ConvertResponseLayer::new(converter)) + } +} diff --git a/canhttp/src/convert/request.rs b/canhttp/src/convert/request.rs new file mode 100644 index 00000000..b96525f4 --- /dev/null +++ b/canhttp/src/convert/request.rs @@ -0,0 +1,63 @@ +use crate::convert::Convert; +use futures_util::future; +use std::task::{Context, Poll}; +use tower::Service; +use tower_layer::Layer; + +/// Convert request of a service into another type, where the conversion may fail. +/// +/// This [`Layer`] produces instances of the [`ConvertRequest`] service. +/// +/// [`Layer`]: tower::Layer +#[derive(Debug, Clone)] +pub struct ConvertRequestLayer { + converter: C, +} + +impl ConvertRequestLayer { + /// Returns a new [`ConvertRequestLayer`] + pub fn new(converter: C) -> Self { + Self { converter } + } +} + +/// Convert requests into another type and forward the converted type to the inner service +/// *only if* the conversion was successful. +#[derive(Debug, Clone)] +pub struct ConvertRequest { + inner: S, + converter: C, +} + +impl Layer for ConvertRequestLayer { + type Service = ConvertRequest; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service { + inner, + converter: self.converter.clone(), + } + } +} + +impl Service for ConvertRequest +where + Converter: Convert, + S: Service, + Converter::Error: Into, +{ + type Response = S::Response; + type Error = S::Error; + type Future = future::Either>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, new_req: NewRequest) -> Self::Future { + match self.converter.try_convert(new_req) { + Ok(request) => future::Either::Left(self.inner.call(request)), + Err(err) => future::Either::Right(future::ready(Err(err.into()))), + } + } +} diff --git a/canhttp/src/convert/response.rs b/canhttp/src/convert/response.rs new file mode 100644 index 00000000..ec23523f --- /dev/null +++ b/canhttp/src/convert/response.rs @@ -0,0 +1,94 @@ +use crate::convert::Convert; +use pin_project::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; +use tower_layer::Layer; + +/// Convert responses of a service into another type, where the conversion may fail. +/// +/// This [`Layer`] produces instances of the [`ConvertResponse`] service. +/// +/// [`Layer`]: tower::Layer +#[derive(Debug, Clone)] +pub struct ConvertResponseLayer { + converter: C, +} + +impl ConvertResponseLayer { + /// Creates a new [`ConvertResponseLayer`] + pub fn new(converter: C) -> Self { + Self { converter } + } +} + +/// Convert the inner service response to another type, where the conversion may fail. +#[derive(Debug, Clone)] +pub struct ConvertResponse { + inner: S, + converter: C, +} + +impl Layer for ConvertResponseLayer { + type Service = ConvertResponse; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service { + inner, + converter: self.converter.clone(), + } + } +} + +impl Service + for ConvertResponse +where + S: Service, + Converter: Convert + Clone, + Converter::Error: Into, +{ + type Response = NewResponse; + type Error = S::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + ResponseFuture { + response_future: self.inner.call(req), + converter: self.converter.clone(), + } + } +} + +#[pin_project] +pub struct ResponseFuture { + #[pin] + response_future: F, + converter: Converter, +} +impl Future for ResponseFuture +where + F: Future>, + Filter: Convert, + Filter::Error: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let result_fut = this.response_future.poll(cx); + match result_fut { + Poll::Ready(result) => match result { + Ok(response) => { + Poll::Ready(this.converter.try_convert(response).map_err(Into::into)) + } + Err(err) => Poll::Ready(Err(err)), + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/canhttp/src/cycles/mod.rs b/canhttp/src/cycles/mod.rs index a47563f7..95f5f92d 100644 --- a/canhttp/src/cycles/mod.rs +++ b/canhttp/src/cycles/mod.rs @@ -2,10 +2,9 @@ mod tests; use crate::client::IcHttpRequestWithCycles; +use crate::convert::Convert; use ic_cdk::api::management_canister::http_request::CanisterHttpRequestArgument; use thiserror::Error; -use tower::filter::Predicate; -use tower::BoxError; /// Estimate the amount of cycles to charge for a single HTTPs outcall. pub trait CyclesChargingPolicy { @@ -97,7 +96,7 @@ impl CyclesCostEstimator { } /// Error return by the [`CyclesAccounting`] middleware. -#[derive(Error, Debug)] +#[derive(Error, Clone, Debug, PartialEq, Eq)] pub enum CyclesAccountingError { /// Error returned when the caller should be charged but did not attach sufficiently many cycles. #[error("insufficient cycles (expected {expected:?}, received {received:?})")] @@ -127,13 +126,17 @@ impl CyclesAccounting { } } -impl Predicate for CyclesAccounting +impl Convert for CyclesAccounting where CyclesEstimator: CyclesChargingPolicy, { - type Request = IcHttpRequestWithCycles; + type Output = IcHttpRequestWithCycles; + type Error = CyclesAccountingError; - fn check(&mut self, request: CanisterHttpRequestArgument) -> Result { + fn try_convert( + &mut self, + request: CanisterHttpRequestArgument, + ) -> Result { let cycles_to_attach = self.cycles_cost_estimator.cost_of_http_request(&request); let cycles_to_charge = self .charging_policy @@ -141,10 +144,10 @@ where if cycles_to_charge > 0 { let cycles_available = ic_cdk::api::call::msg_cycles_available128(); if cycles_available < cycles_to_charge { - return Err(Box::new(CyclesAccountingError::InsufficientCyclesError { + return Err(CyclesAccountingError::InsufficientCyclesError { expected: cycles_to_charge, received: cycles_available, - })); + }); } let cycles_received = ic_cdk::api::call::msg_cycles_accept128(cycles_to_charge); assert_eq!( diff --git a/canhttp/src/http/json/mod.rs b/canhttp/src/http/json/mod.rs new file mode 100644 index 00000000..e1b62756 --- /dev/null +++ b/canhttp/src/http/json/mod.rs @@ -0,0 +1,119 @@ +//! Middleware to add a JSON translation layer (over HTTP). +//! +//! Transforms a low-level service that transmits bytes into one that transmits JSON payloads: +//! +//! ```text +//! │ ▲ +//! http::Request│ │http::Response +//! ┌─┴─────────────────────┴───┐ +//! │ JsonResponseConverter │ +//! └─┬─────────────────────▲───┘ +//! │ │ +//! ┌─▼─────────────────────┴───┐ +//! │ JsonRequestConverter │ +//! └─┬─────────────────────┬───┘ +//! HttpRequest│ │HttpResponse +//! ▼ │ +//! ┌─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┐ +//! │ SERVICE │ +//! └─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─┘ +//! ``` +//! This can be used to transmit any kind of JSON payloads, such as JSON RPC over HTTP. +//! +//! # Examples +//! +//! ```rust +//! use canhttp::http::{HttpRequest, HttpResponse, json::JsonConversionLayer}; +//! use ic_cdk::api::management_canister::http_request::{CanisterHttpRequestArgument as IcHttpRequest, HttpResponse as IcHttpResponse}; +//! use tower::{Service, ServiceBuilder, ServiceExt, BoxError}; +//! use serde_json::json; +//! +//! async fn echo_bytes(request: HttpRequest) -> Result { +//! Ok(http::Response::new(request.into_body())) +//! } +//! +//! # #[tokio::main] +//! # async fn main() -> Result<(), Box> { +//! let mut service = ServiceBuilder::new() +//! .layer(JsonConversionLayer::::new()) +//! .service_fn(echo_bytes); +//! +//! let request = http::Request::post("https://internetcomputer.org") +//! .header("Content-Type", "application/json") +//! .body(json!({"key": "value"})) +//! .unwrap(); +//! +//! let response = service.ready().await.unwrap().call(request).await.unwrap(); +//! +//! assert_eq!(response.into_body()["key"], "value"); +//! # Ok(()) +//! # } + +use crate::convert::{ConvertRequest, ConvertRequestLayer, ConvertResponse, ConvertResponseLayer}; +pub use request::{ + HttpJsonRpcRequest, JsonRequestConversionError, JsonRequestConverter, JsonRpcRequestBody, +}; +pub use response::{ + HttpJsonRpcResponse, JsonResponseConversionError, JsonResponseConverter, JsonRpcResult, +}; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::marker::PhantomData; +use tower_layer::Layer; + +#[cfg(test)] +mod tests; + +mod request; +mod response; + +/// Middleware that combines [`JsonRequestConverter`] to convert requests +/// and [`JsonResponseConverter`] to convert responses to a [`Service`]. +/// +/// See the [module docs](crate::http::json) for an example. +/// +/// [`Service`]: tower::Service +#[derive(Debug)] +pub struct JsonConversionLayer { + _marker: PhantomData<(I, O)>, +} + +impl JsonConversionLayer { + /// Returns a new [`JsonConversionLayer`]. + pub fn new() -> Self { + Self { + _marker: PhantomData, + } + } +} + +impl Clone for JsonConversionLayer { + fn clone(&self) -> Self { + Self { + _marker: self._marker, + } + } +} + +impl Default for JsonConversionLayer { + fn default() -> Self { + Self::new() + } +} + +impl Layer for JsonConversionLayer +where + I: Serialize, + O: DeserializeOwned, +{ + type Service = + ConvertResponse>, JsonResponseConverter>; + + fn layer(&self, inner: S) -> Self::Service { + let stack = tower_layer::Stack::new( + ConvertRequestLayer::new(JsonRequestConverter::::new()), + ConvertResponseLayer::new(JsonResponseConverter::::new()), + ); + stack.layer(inner) + } +} diff --git a/canhttp/src/http/json/request.rs b/canhttp/src/http/json/request.rs new file mode 100644 index 00000000..adfab09f --- /dev/null +++ b/canhttp/src/http/json/request.rs @@ -0,0 +1,120 @@ +use crate::convert::Convert; +use crate::http::HttpRequest; +use http::header::CONTENT_TYPE; +use http::HeaderValue; +use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; +use thiserror::Error; + +/// Convert requests of type [`http::Request`], +/// where `T` is `Serializable`, into [`HttpRequest`]. +#[derive(Debug)] +pub struct JsonRequestConverter { + _marker: PhantomData, +} + +impl JsonRequestConverter { + /// Create a new instance of [`JsonRequestConverter`]. + pub fn new() -> Self { + Self { + _marker: PhantomData, + } + } +} + +// #[derive(Clone)] would otherwise introduce a bound T: Clone, which is not needed. +impl Clone for JsonRequestConverter { + fn clone(&self) -> Self { + Self { + _marker: self._marker, + } + } +} + +impl Default for JsonRequestConverter { + fn default() -> Self { + Self::new() + } +} + +/// Error return when converting requests with [`JsonRequestConverter`]. +#[derive(Error, Clone, Debug, Eq, PartialEq)] +pub enum JsonRequestConversionError { + /// Request body failed to be serialized. + #[error("Invalid JSON body: {0}")] + InvalidJson(String), +} + +impl Convert> for JsonRequestConverter +where + T: Serialize, +{ + type Output = HttpRequest; + type Error = JsonRequestConversionError; + + fn try_convert(&mut self, request: http::Request) -> Result { + try_serialize_request(request) + .map(add_content_type_header_if_missing) + .map_err(Into::into) + } +} + +fn try_serialize_request( + request: http::Request, +) -> Result +where + T: Serialize, +{ + let (parts, body) = request.into_parts(); + let json_body = serde_json::to_vec(&body) + .map_err(|e| JsonRequestConversionError::InvalidJson(e.to_string()))?; + Ok(HttpRequest::from_parts(parts, json_body)) +} + +fn add_content_type_header_if_missing(mut request: HttpRequest) -> HttpRequest { + if !request.headers().contains_key(CONTENT_TYPE) { + request + .headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); + } + request +} + +/// JSON-RPC request. +pub type HttpJsonRpcRequest = http::Request>; + +/// Body for all JSON-RPC requests, see the [specification](https://www.jsonrpc.org/specification). +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct JsonRpcRequestBody { + jsonrpc: String, + method: String, + id: Option, + params: Option, +} + +impl JsonRpcRequestBody { + /// Create a new body of a JSON-RPC request. + pub fn new(method: impl Into, params: T) -> Self { + Self { + jsonrpc: "2.0".to_string(), + method: method.into(), + id: Some(serde_json::Value::Number(0.into())), + params: Some(params), + } + } + + /// Change the request ID. + pub fn set_id(&mut self, id: u64) { + self.id = Some(serde_json::Value::Number(id.into())); + } + + /// Returns the request ID, if any. + pub fn id(&self) -> Option<&serde_json::Value> { + self.id.as_ref() + } + + /// Returns the JSON-RPC method. + pub fn method(&self) -> &str { + &self.method + } +} diff --git a/canhttp/src/http/json/response.rs b/canhttp/src/http/json/response.rs new file mode 100644 index 00000000..5b44843d --- /dev/null +++ b/canhttp/src/http/json/response.rs @@ -0,0 +1,101 @@ +use crate::convert::Convert; +use crate::http::HttpResponse; +use serde::de::DeserializeOwned; +use serde::{Deserialize, Serialize}; +use std::marker::PhantomData; +use thiserror::Error; + +/// Convert responses of type [HttpResponse] into [`http::Response`], +/// where `T` can be deserialized. +#[derive(Debug)] +pub struct JsonResponseConverter { + _marker: PhantomData, +} + +impl JsonResponseConverter { + /// Create a new instance of [`JsonResponseConverter`]. + pub fn new() -> Self { + Self { + _marker: PhantomData, + } + } +} + +// #[derive(Clone)] would otherwise introduce a bound T: Clone, which is not needed. +impl Clone for JsonResponseConverter { + fn clone(&self) -> Self { + Self { + _marker: self._marker, + } + } +} + +impl Default for JsonResponseConverter { + fn default() -> Self { + Self::new() + } +} + +/// Error returned when converting responses with [`JsonResponseConverter`]. +#[derive(Error, Clone, Debug, Eq, PartialEq)] +pub enum JsonResponseConversionError { + /// Response body could not be deserialized into a JSON-RPC response. + #[error("Invalid HTTP JSON-RPC response: status {status}, body: {body}, parsing error: {parsing_error:?}" + )] + InvalidJsonResponse { + /// Response status code + status: u16, + /// Response body + body: String, + /// Deserialization error + parsing_error: String, + }, +} + +impl Convert for JsonResponseConverter +where + T: DeserializeOwned, +{ + type Output = http::Response; + type Error = JsonResponseConversionError; + + fn try_convert(&mut self, response: HttpResponse) -> Result { + let (parts, body) = response.into_parts(); + let json_body: T = serde_json::from_slice(&body).map_err(|e| { + JsonResponseConversionError::InvalidJsonResponse { + status: parts.status.as_u16(), + body: String::from_utf8_lossy(&body).to_string(), + parsing_error: e.to_string(), + } + })?; + Ok(http::Response::from_parts(parts, json_body)) + } +} + +/// JSON-RPC response. +pub type HttpJsonRpcResponse = http::Response>; + +/// Body of a JSON-RPC response +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct JsonRpcResponseBody { + pub jsonrpc: String, + pub id: serde_json::Value, + #[serde(flatten)] + pub result: JsonRpcResult, +} + +/// An envelope for all JSON-RPC replies. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum JsonRpcResult { + /// Successful JSON-RPC response + #[serde(rename = "result")] + Result(T), + /// Failed JSON-RPC response + #[serde(rename = "error")] + Error { + /// Indicate error type that occurred. + code: i64, + /// Short description of the error. + message: String, + }, +} diff --git a/canhttp/src/http/json/tests.rs b/canhttp/src/http/json/tests.rs new file mode 100644 index 00000000..a2026bcd --- /dev/null +++ b/canhttp/src/http/json/tests.rs @@ -0,0 +1,122 @@ +use crate::http::json::{JsonConversionLayer, JsonRequestConverter, JsonResponseConverter}; +use crate::http::{HttpRequest, HttpResponse}; +use crate::ConvertServiceBuilder; +use http::HeaderValue; +use serde_json::json; +use tower::{BoxError, Service, ServiceBuilder, ServiceExt}; + +#[tokio::test] +async fn should_convert_json_request() { + let url = "https://internetcomputer.org/"; + let mut service = ServiceBuilder::new() + .convert_request(JsonRequestConverter::::new()) + .service_fn(echo_request); + + let body = json!({"foo": "bar"}); + let request = http::Request::post(url).body(body.clone()).unwrap(); + + let converted_request = service.ready().await.unwrap().call(request).await.unwrap(); + + assert_eq!( + serde_json::to_vec(&body).unwrap(), + converted_request.into_body() + ); +} + +#[tokio::test] +async fn should_add_content_type_header_if_missing() { + let url = "https://internetcomputer.org/"; + let mut service = ServiceBuilder::new() + .convert_request(JsonRequestConverter::::new()) + .service_fn(echo_request); + + for (request_content_type, expected_content_type) in [ + (None, "application/json"), + (Some("wrong-value"), "wrong-value"), //do not overwrite explicitly set header + (Some("application/json"), "application/json"), + ] { + let mut builder = http::Request::post(url); + if let Some(request_content_type) = request_content_type { + builder = builder.header(http::header::CONTENT_TYPE, request_content_type); + } + let request = builder + .header("other-header", "should-remain") + .body(json!({"foo": "bar"})) + .unwrap(); + + let converted_request = service + .ready() + .await + .unwrap() + .call(request.clone()) + .await + .unwrap(); + + let (mut request_parts, _body) = request.into_parts(); + let (mut converted_request_parts, _body) = converted_request.into_parts(); + + assert_eq!(request_parts.method, converted_request_parts.method); + assert_eq!(request_parts.uri, converted_request_parts.uri); + assert_eq!(request_parts.version, converted_request_parts.version); + + // Headers should be identical, excepted for content-type + request_parts.headers.remove(http::header::CONTENT_TYPE); + let converted_request_content_type = converted_request_parts + .headers + .remove(http::header::CONTENT_TYPE) + .unwrap(); + assert_eq!( + converted_request_content_type, + HeaderValue::from_static(expected_content_type) + ); + + assert_eq!(request_parts.headers, converted_request_parts.headers); + } +} + +#[tokio::test] +async fn should_convert_json_response() { + let mut service = ServiceBuilder::new() + .convert_response(JsonResponseConverter::::new()) + .service_fn(echo_response); + + let expected_response = json!({"foo": "bar"}); + let response = http::Response::new(serde_json::to_vec(&expected_response).unwrap()); + + let converted_response = service.ready().await.unwrap().call(response).await.unwrap(); + + assert_eq!(converted_response.into_body(), expected_response); +} + +#[tokio::test] +async fn should_convert_both_request_and_response() { + let mut service = ServiceBuilder::new() + .layer(JsonConversionLayer::::new()) + .service_fn(forward_body); + + let response = service + .ready() + .await + .unwrap() + .call( + http::Request::post("https://internetcomputer.org/") + .body(json!({"foo": "bar"})) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.into_body(), json!({"foo": "bar"})); +} + +async fn echo_request(request: HttpRequest) -> Result { + Ok(request) +} + +async fn echo_response(response: HttpResponse) -> Result { + Ok(response) +} + +async fn forward_body(request: HttpRequest) -> Result { + Ok(http::Response::new(request.into_body())) +} diff --git a/canhttp/src/http/mod.rs b/canhttp/src/http/mod.rs index 5c48f9c9..b7db6322 100644 --- a/canhttp/src/http/mod.rs +++ b/canhttp/src/http/mod.rs @@ -7,11 +7,11 @@ //! │ ▲ //! http::Request│ │http::Response //! ┌─┴─────────────────────┴───┐ -//! │HttpResponseConversionLayer│ +//! │ HttpResponseConverter │ //! └─┬─────────────────────▲───┘ //! │ │ //! ┌─▼─────────────────────┴───┐ -//! │HttpRequestConversionLayer │ +//! │ HttpRequestConverter │ //! └─┬─────────────────────┬───┘ //! IcHttpRequest│ │IcHttpResponse //! ▼ │ @@ -31,9 +31,9 @@ //! ```rust //! use canhttp::{http::{HttpConversionLayer, MaxResponseBytesRequestExtension}, IcError}; //! use ic_cdk::api::management_canister::http_request::{CanisterHttpRequestArgument as IcHttpRequest, HttpResponse as IcHttpResponse}; -//! use tower::{Service, ServiceBuilder, ServiceExt}; +//! use tower::{Service, ServiceBuilder, ServiceExt, BoxError}; //! -//! async fn always_200_ok(request: IcHttpRequest) -> Result { +//! async fn always_200_ok(request: IcHttpRequest) -> Result { //! Ok(IcHttpResponse { //! status: 200_u8.into(), //! ..Default::default() @@ -66,20 +66,24 @@ mod tests; pub use request::{ - HttpRequest, HttpRequestConversionLayer, MaxResponseBytesRequestExtension, - TransformContextRequestExtension, + HttpRequest, HttpRequestConversionError, HttpRequestConverter, + MaxResponseBytesRequestExtension, TransformContextRequestExtension, +}; +pub use response::{ + FilterNonSuccessfulHttpResponse, FilterNonSuccessulHttpResponseError, HttpResponse, + HttpResponseConversionError, HttpResponseConverter, }; -pub use response::{HttpResponse, HttpResponseConversionLayer}; +#[cfg(feature = "json")] +pub mod json; mod request; mod response; -use request::HttpRequestFilter; -use response::HttpResponseConversion; +use crate::convert::{ConvertRequest, ConvertRequestLayer, ConvertResponse, ConvertResponseLayer}; use tower::Layer; -/// Middleware that combines [`HttpRequestConversionLayer`] to convert requests -/// and [`HttpResponseConversionLayer`] to convert responses to a [`Service`]. +/// Middleware that combines [`HttpRequestConverter`] to convert requests +/// and [`HttpResponseConverter`] to convert responses to a [`Service`]. /// /// See the [module docs](crate::http) for an example. /// @@ -87,11 +91,13 @@ use tower::Layer; pub struct HttpConversionLayer; impl Layer for HttpConversionLayer { - type Service = HttpResponseConversion>; + type Service = ConvertResponse, HttpResponseConverter>; fn layer(&self, inner: S) -> Self::Service { - let stack = - tower_layer::Stack::new(HttpRequestConversionLayer, HttpResponseConversionLayer); + let stack = tower_layer::Stack::new( + ConvertRequestLayer::new(HttpRequestConverter), + ConvertResponseLayer::new(HttpResponseConverter), + ); stack.layer(inner) } } diff --git a/canhttp/src/http/request.rs b/canhttp/src/http/request.rs index 4a43cc72..4021ced8 100644 --- a/canhttp/src/http/request.rs +++ b/canhttp/src/http/request.rs @@ -1,11 +1,9 @@ +use crate::convert::Convert; use ic_cdk::api::management_canister::http_request::{ CanisterHttpRequestArgument as IcHttpRequest, HttpHeader as IcHttpHeader, HttpMethod as IcHttpMethod, TransformContext, }; use thiserror::Error; -use tower::filter::Predicate; -use tower::BoxError; -use tower_layer::Layer; /// HTTP request with a body made of bytes. pub type HttpRequest = http::Request>; @@ -106,75 +104,66 @@ impl TransformContextRequestExtension for http::request::Builder { } } +/// Error return when converting requests with [`HttpRequestConverter`]. #[derive(Error, Clone, Debug, Eq, PartialEq)] pub enum HttpRequestConversionError { + /// HTTP method is not supported #[error("HTTP method `{0}` is not supported")] UnsupportedHttpMethod(String), + /// Header name is invalid. #[error("HTTP header `{name}` has an invalid value: {reason}")] - InvalidHttpHeaderValue { name: String, reason: String }, + InvalidHttpHeaderValue { + /// Header name + name: String, + /// Reason for header value being invalid. + reason: String, + }, } -fn try_map_http_request(request: HttpRequest) -> Result { - let url = request.uri().to_string(); - let max_response_bytes = request.get_max_response_bytes(); - let method = match request.method().as_str() { - "GET" => IcHttpMethod::GET, - "POST" => IcHttpMethod::POST, - "HEAD" => IcHttpMethod::HEAD, - unsupported => { - return Err(HttpRequestConversionError::UnsupportedHttpMethod( - unsupported.to_string(), - )) - } - }; - let headers = request - .headers() - .iter() - .map(|(header_name, header_value)| match header_value.to_str() { - Ok(value) => Ok(IcHttpHeader { - name: header_name.to_string(), - value: value.to_string(), - }), - Err(e) => Err(HttpRequestConversionError::InvalidHttpHeaderValue { - name: header_name.to_string(), - reason: e.to_string(), - }), +/// Convert requests of type [`HttpRequest`] into [`IcHttpRequest`]. +#[derive(Clone, Debug)] +pub struct HttpRequestConverter; + +impl Convert for HttpRequestConverter { + type Output = IcHttpRequest; + type Error = HttpRequestConversionError; + + fn try_convert(&mut self, request: HttpRequest) -> Result { + let url = request.uri().to_string(); + let max_response_bytes = request.get_max_response_bytes(); + let method = match request.method().as_str() { + "GET" => IcHttpMethod::GET, + "POST" => IcHttpMethod::POST, + "HEAD" => IcHttpMethod::HEAD, + unsupported => { + return Err(HttpRequestConversionError::UnsupportedHttpMethod( + unsupported.to_string(), + )) + } + }; + let headers = request + .headers() + .iter() + .map(|(header_name, header_value)| match header_value.to_str() { + Ok(value) => Ok(IcHttpHeader { + name: header_name.to_string(), + value: value.to_string(), + }), + Err(e) => Err(HttpRequestConversionError::InvalidHttpHeaderValue { + name: header_name.to_string(), + reason: e.to_string(), + }), + }) + .collect::, _>>()?; + let transform = request.get_transform_context().cloned(); + let body = Some(request.into_body()); + Ok(IcHttpRequest { + url, + max_response_bytes, + method, + headers, + body, + transform, }) - .collect::, _>>()?; - let transform = request.get_transform_context().cloned(); - let body = Some(request.into_body()); - Ok(IcHttpRequest { - url, - max_response_bytes, - method, - headers, - body, - transform, - }) -} - -pub struct HttpRequestFilter; - -impl Predicate for HttpRequestFilter { - type Request = IcHttpRequest; - - fn check(&mut self, request: HttpRequest) -> Result { - try_map_http_request(request).map_err(Into::into) - } -} - -/// Middleware to convert a request of type [`HttpRequest`] into -/// one of type [`IcHttpRequest`] to a [`Service`]. -/// -/// See the [module docs](crate::http) for an example. -/// -/// [`Service`]: tower::Service -pub struct HttpRequestConversionLayer; - -impl Layer for HttpRequestConversionLayer { - type Service = tower::filter::Filter; - - fn layer(&self, inner: S) -> Self::Service { - tower::filter::Filter::new(inner, HttpRequestFilter) } } diff --git a/canhttp/src/http/response.rs b/canhttp/src/http/response.rs index e3a50fa6..59aa10de 100644 --- a/canhttp/src/http/response.rs +++ b/canhttp/src/http/response.rs @@ -1,130 +1,106 @@ +use crate::convert::Convert; +use http::Response; use ic_cdk::api::management_canister::http_request::HttpResponse as IcHttpResponse; -use pin_project::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; use thiserror::Error; -use tower::{BoxError, Service}; -use tower_layer::Layer; /// HTTP response with a body made of bytes. pub type HttpResponse = http::Response>; +/// Error returned when converting respones with [`HttpResponseConverter`]. #[derive(Error, Clone, Debug, Eq, PartialEq)] #[allow(clippy::enum_variant_names)] //current variants reflect invalid data and so start with the prefix Invalid. pub enum HttpResponseConversionError { + /// Status code is invalid #[error("Status code is invalid")] InvalidStatusCode, + /// Header name is invalid. #[error("HTTP header `{name}` is invalid: {reason}")] - InvalidHttpHeaderName { name: String, reason: String }, + InvalidHttpHeaderName { + /// Header name + name: String, + /// Reason for being invalid. + reason: String, + }, + /// Header value is invalid. #[error("HTTP header `{name}` has an invalid value: {reason}")] - InvalidHttpHeaderValue { name: String, reason: String }, + InvalidHttpHeaderValue { + /// Header name + name: String, + /// Reason for header value being invalid. + reason: String, + }, } -fn try_map_http_response( - response: IcHttpResponse, -) -> Result { - use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; - use ic_cdk::api::management_canister::http_request::HttpHeader as IcHttpHeader; - use num_traits::ToPrimitive; - - let status = response - .status - .0 - .to_u16() - .and_then(|s| StatusCode::try_from(s).ok()) - .ok_or(HttpResponseConversionError::InvalidStatusCode)?; - - let mut builder = http::Response::builder().status(status); - if let Some(headers) = builder.headers_mut() { - let mut response_headers = HeaderMap::with_capacity(response.headers.len()); - for IcHttpHeader { name, value } in response.headers { - response_headers.insert( - HeaderName::try_from(&name).map_err(|e| { - HttpResponseConversionError::InvalidHttpHeaderName { - name: name.clone(), - reason: e.to_string(), - } - })?, - HeaderValue::try_from(&value).map_err(|e| { - HttpResponseConversionError::InvalidHttpHeaderValue { - name, - reason: e.to_string(), - } - })?, - ); +/// Convert responses of type [`IcHttpResponse`] into [HttpResponse]. +#[derive(Debug, Clone)] +pub struct HttpResponseConverter; + +impl Convert for HttpResponseConverter { + type Output = HttpResponse; + type Error = HttpResponseConversionError; + + fn try_convert(&mut self, response: IcHttpResponse) -> Result { + use http::{HeaderMap, HeaderName, HeaderValue, StatusCode}; + use ic_cdk::api::management_canister::http_request::HttpHeader as IcHttpHeader; + use num_traits::ToPrimitive; + + let status = response + .status + .0 + .to_u16() + .and_then(|s| StatusCode::try_from(s).ok()) + .ok_or(HttpResponseConversionError::InvalidStatusCode)?; + + let mut builder = http::Response::builder().status(status); + if let Some(headers) = builder.headers_mut() { + let mut response_headers = HeaderMap::with_capacity(response.headers.len()); + for IcHttpHeader { name, value } in response.headers { + response_headers.insert( + HeaderName::try_from(&name).map_err(|e| { + HttpResponseConversionError::InvalidHttpHeaderName { + name: name.clone(), + reason: e.to_string(), + } + })?, + HeaderValue::try_from(&value).map_err(|e| { + HttpResponseConversionError::InvalidHttpHeaderValue { + name, + reason: e.to_string(), + } + })?, + ); + } + headers.extend(response_headers); } - headers.extend(response_headers); - } - - Ok(builder - .body(response.body) - .expect("BUG: builder should have been modified only with validated data")) -} - -/// Middleware to convert a response of type [`HttpResponse`] into -/// one of type [`IcHttpResponse`] to a [`Service`]. -/// -/// See the [module docs](crate::http) for an example. -/// -/// [`Service`]: tower::Service -pub struct HttpResponseConversionLayer; - -pub struct HttpResponseConversion { - inner: S, -} -impl Layer for HttpResponseConversionLayer { - type Service = HttpResponseConversion; - - fn layer(&self, inner: S) -> Self::Service { - Self::Service { inner } - } -} - -impl Service for HttpResponseConversion -where - S: Service, - Error: Into, -{ - type Response = HttpResponse; - type Error = BoxError; - type Future = ResponseFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_ready(cx).map_err(Into::into) - } - - fn call(&mut self, req: Request) -> Self::Future { - ResponseFuture { - response_future: self.inner.call(req), - } + Ok(builder + .body(response.body) + .expect("BUG: builder should have been modified only with validated data")) } } -#[pin_project] -pub struct ResponseFuture { - #[pin] - response_future: F, +/// Error returned when converting responses with [`FilterNonSuccessfulHttpResponse`]. +#[derive(Error, Clone, Debug)] +pub enum FilterNonSuccessulHttpResponseError { + /// Response has a non-successful status code. + #[error("HTTP response is not successful: {0:?}")] + UnsuccessfulResponse(http::Response), } -impl Future for ResponseFuture -where - F: Future>, - E: Into, -{ - type Output = Result; +/// Filter out non-successful responses. +#[derive(Clone, Debug)] +pub struct FilterNonSuccessfulHttpResponse; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let result_fut = this.response_future.poll(cx); +impl Convert> for FilterNonSuccessfulHttpResponse { + type Output = http::Response; + type Error = FilterNonSuccessulHttpResponseError; - match result_fut { - Poll::Ready(result) => match result { - Ok(response) => Poll::Ready(try_map_http_response(response).map_err(Into::into)), - Err(e) => Poll::Ready(Err(e.into())), - }, - Poll::Pending => Poll::Pending, + fn try_convert(&mut self, response: Response) -> Result { + if !response.status().is_success() { + return Err(FilterNonSuccessulHttpResponseError::UnsuccessfulResponse( + response, + )); } + Ok(response) } } diff --git a/canhttp/src/http/tests.rs b/canhttp/src/http/tests.rs index bddac151..844c3b35 100644 --- a/canhttp/src/http/tests.rs +++ b/canhttp/src/http/tests.rs @@ -1,11 +1,10 @@ -use crate::http::request::{HttpRequestConversionError, HttpRequestConversionLayer}; -use crate::http::response::{ - HttpResponse, HttpResponseConversionError, HttpResponseConversionLayer, -}; +use crate::http::request::HttpRequestConversionError; +use crate::http::response::{HttpResponse, HttpResponseConversionError}; use crate::http::{ - HttpConversionLayer, MaxResponseBytesRequestExtension, TransformContextRequestExtension, + HttpConversionLayer, HttpRequestConverter, HttpResponseConverter, + MaxResponseBytesRequestExtension, TransformContextRequestExtension, }; -use crate::IcError; +use crate::{ConvertServiceBuilder, IcError}; use assert_matches::assert_matches; use candid::{Decode, Encode, Principal}; use http::StatusCode; @@ -30,7 +29,7 @@ async fn should_convert_http_request() { let body = vec![42_u8; 32]; let mut service = ServiceBuilder::new() - .layer(HttpRequestConversionLayer) + .convert_request(HttpRequestConverter) .service_fn(echo_request); for (request_builder, expected_http_method) in [ @@ -67,7 +66,7 @@ async fn should_convert_http_request() { #[tokio::test] async fn should_fail_when_http_method_unsupported() { let mut service = ServiceBuilder::new() - .layer(HttpRequestConversionLayer) + .convert_request(HttpRequestConverter) .service_fn(echo_request); let url = "https://internetcomputer.org/"; @@ -97,7 +96,7 @@ async fn should_fail_when_http_method_unsupported() { #[tokio::test] async fn should_convert_http_response() { let mut service = ServiceBuilder::new() - .layer(HttpResponseConversionLayer) + .convert_response(HttpResponseConverter) .service_fn(echo_response); let response = IcHttpResponse { @@ -133,7 +132,7 @@ async fn should_fail_to_convert_http_response() { }; let mut service = ServiceBuilder::new() - .layer(HttpResponseConversionLayer) + .convert_response(HttpResponseConverter) .service_fn(echo_response); let error = expect_error::<_, HttpResponseConversionError>( service @@ -146,7 +145,7 @@ async fn should_fail_to_convert_http_response() { assert_eq!(error, HttpResponseConversionError::InvalidStatusCode); let mut service = ServiceBuilder::new() - .layer(HttpResponseConversionLayer) + .convert_response(HttpResponseConverter) .service_fn(always_error); let error = expect_error::<_, IcError>(service.ready().await.unwrap().call(invalid_response).await); @@ -163,7 +162,7 @@ async fn should_fail_to_convert_http_response() { async fn should_convert_both_request_and_responses() { async fn serialize_request_and_add_header( request: IcHttpRequest, - ) -> Result { + ) -> Result { Ok(IcHttpResponse { status: 200_u8.into(), headers: vec![IcHttpHeader { @@ -218,19 +217,19 @@ async fn should_convert_both_request_and_responses() { ) } -async fn echo_request(request: IcHttpRequest) -> Result { +async fn echo_request(request: IcHttpRequest) -> Result { Ok(request) } -async fn echo_response(response: IcHttpResponse) -> Result { +async fn echo_response(response: IcHttpResponse) -> Result { Ok(response) } -async fn always_error(_response: IcHttpResponse) -> Result { - Err(IcError { +async fn always_error(_response: IcHttpResponse) -> Result { + Err(BoxError::from(IcError { code: RejectionCode::Unknown, message: "always error".to_string(), - }) + })) } // http::Response does not implement PartialEq diff --git a/canhttp/src/lib.rs b/canhttp/src/lib.rs index a0ae246a..c910a6ea 100644 --- a/canhttp/src/lib.rs +++ b/canhttp/src/lib.rs @@ -5,12 +5,14 @@ #![forbid(unsafe_code)] #![forbid(missing_docs)] -pub use client::{Client, IcError}; +pub use client::{Client, IcError, IcHttpRequestWithCycles}; +pub use convert::ConvertServiceBuilder; pub use cycles::{ CyclesAccounting, CyclesAccountingError, CyclesChargingPolicy, CyclesCostEstimator, }; mod client; +pub mod convert; mod cycles; #[cfg(feature = "http")] pub mod http; diff --git a/e2e/rust/src/main.rs b/e2e/rust/src/main.rs index e8a15ca7..21434bc2 100644 --- a/e2e/rust/src/main.rs +++ b/e2e/rust/src/main.rs @@ -64,7 +64,7 @@ pub async fn test() { // Check response structure around gas price assert_eq!( &response[..36], - "{\"id\":1,\"jsonrpc\":\"2.0\",\"result\":\"0x" + "{\"jsonrpc\":\"2.0\",\"id\":1,\"result\":\"0x" ); assert_eq!(&response[response.len() - 2..], "\"}"); } diff --git a/src/http.rs b/src/http.rs index c877daa7..07293083 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,4 +1,5 @@ use crate::constants::COLLATERAL_CYCLES_PER_NODE; +use crate::logs::{INFO, TRACE_HTTP}; use crate::memory::{get_num_subnet_nodes, is_demo_active}; use crate::{ add_metric_entry, @@ -7,21 +8,33 @@ use crate::{ types::{MetricRpcHost, MetricRpcMethod, ResolvedRpcService}, util::canonicalize_json, }; -use canhttp::http::{ - HttpRequest, HttpRequestConversionLayer, HttpResponse, HttpResponseConversionLayer, - MaxResponseBytesRequestExtension, TransformContextRequestExtension, -}; use canhttp::{ - observability::ObservabilityLayer, CyclesAccounting, CyclesAccountingError, - CyclesChargingPolicy, + convert::ConvertRequestLayer, + http::{ + json::{ + HttpJsonRpcRequest, HttpJsonRpcResponse, JsonRequestConversionError, + JsonRequestConverter, JsonResponseConversionError, JsonResponseConverter, + JsonRpcRequestBody, + }, + FilterNonSuccessfulHttpResponse, FilterNonSuccessulHttpResponseError, + HttpRequestConversionError, HttpRequestConverter, HttpResponseConversionError, + HttpResponseConverter, MaxResponseBytesRequestExtension, TransformContextRequestExtension, + }, + observability::ObservabilityLayer, + ConvertServiceBuilder, CyclesAccounting, CyclesAccountingError, CyclesChargingPolicy, IcError, }; use evm_rpc_types::{HttpOutcallError, ProviderError, RpcError, RpcResult, ValidationError}; use http::header::CONTENT_TYPE; use http::HeaderValue; +use ic_canister_log::log; use ic_cdk::api::management_canister::http_request::{ CanisterHttpRequestArgument as IcHttpRequest, HttpResponse as IcHttpResponse, TransformArgs, TransformContext, }; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::fmt::Debug; +use thiserror::Error; use tower::layer::util::{Identity, Stack}; use tower::{BoxError, Service, ServiceBuilder}; use tower_http::set_header::SetRequestHeaderLayer; @@ -31,7 +44,13 @@ pub fn json_rpc_request_arg( service: ResolvedRpcService, json_rpc_payload: &str, max_response_bytes: u64, -) -> RpcResult { +) -> RpcResult> { + let body: JsonRpcRequestBody = serde_json::from_str(json_rpc_payload) + .map_err(|e| { + RpcError::ValidationError(ValidationError::Custom(format!( + "Invalid JSON RPC request: {e}" + ))) + })?; service .post(&get_override_provider())? .max_response_bytes(max_response_bytes) @@ -39,7 +58,7 @@ pub fn json_rpc_request_arg( "__transform_json_rpc".to_string(), vec![], )) - .body(json_rpc_payload.as_bytes().to_vec()) + .body(body) .map_err(|e| { RpcError::ValidationError(ValidationError::Custom(format!("Invalid request: {e}"))) }) @@ -49,23 +68,29 @@ pub async fn json_rpc_request( service: ResolvedRpcService, json_rpc_payload: &str, max_response_bytes: u64, -) -> RpcResult { +) -> RpcResult> { let request = json_rpc_request_arg(service, json_rpc_payload, max_response_bytes)?; http_client(MetricRpcMethod("request".to_string())) .call(request) .await } -pub fn http_client( +pub fn http_client( rpc_method: MetricRpcMethod, -) -> impl Service { +) -> impl Service, Response = HttpJsonRpcResponse, Error = RpcError> +where + I: Serialize, + O: DeserializeOwned + Debug, +{ ServiceBuilder::new() + .map_err(|e: HttpClientError| RpcError::from(e)) .layer( ObservabilityLayer::new() - .on_request(move |req: &HttpRequest| { + .on_request(move |req: &HttpJsonRpcRequest| { let req_data = MetricData { method: rpc_method.clone(), host: MetricRpcHost(req.uri().host().unwrap().to_string()), + request_id: req.body().id().cloned(), }; add_metric_entry!( requests, @@ -74,80 +99,197 @@ pub fn http_client( ); req_data }) - .on_response(|req_data: MetricData, response: &HttpResponse| { - let status: u32 = response.status().as_u16() as u32; - add_metric_entry!( - responses, - (req_data.method, req_data.host, status.into()), - 1 + .on_response(|req_data: MetricData, response: &HttpJsonRpcResponse| { + observe_response(req_data.method, req_data.host, response.status().as_u16()); + log!( + TRACE_HTTP, + "Got response for request with id `{:?}`. Response with status {}: {:?}", + req_data.request_id, + response.status(), + response.body() ); }) - .on_error(|req_data: MetricData, error: &RpcError| { - if let RpcError::HttpOutcallError(HttpOutcallError::IcError { - code, - message: _, - }) = error - { - add_metric_entry!( - err_http_outcall, - (req_data.method, req_data.host, *code), - 1 - ); - } - }), + .on_error( + |req_data: MetricData, error: &HttpClientError| match error { + HttpClientError::IcError(IcError { code, message: _ }) => { + add_metric_entry!( + err_http_outcall, + (req_data.method, req_data.host, *code), + 1 + ); + } + HttpClientError::UnsuccessfulHttpResponse( + FilterNonSuccessulHttpResponseError::UnsuccessfulResponse(response), + ) => { + observe_response( + req_data.method, + req_data.host, + response.status().as_u16(), + ); + log!( + TRACE_HTTP, + "Unsuccessful HTTP response for request with id `{:?}`. Response with status {}: {}", + req_data.request_id, + response.status(), + String::from_utf8_lossy(response.body()) + ); + } + HttpClientError::InvalidJsonResponse( + JsonResponseConversionError::InvalidJsonResponse { + status, + body: _, + parsing_error: _, + }, + ) => { + observe_response(req_data.method, req_data.host, *status); + log!( + TRACE_HTTP, + "Invalid JSON RPC response for request with id `{:?}`: {}", + req_data.request_id, + error + ); + } + HttpClientError::NotHandledError(e) => { + log!(INFO, "BUG: Unexpected error: {}", e); + } + HttpClientError::CyclesAccountingError(_) => {} + }, + ), ) - .map_err(map_error) .layer(service_request_builder()) - .layer(HttpResponseConversionLayer) - .filter(CyclesAccounting::new( + .convert_response(JsonResponseConverter::new()) + .convert_response(FilterNonSuccessfulHttpResponse) + .convert_response(HttpResponseConverter) + .convert_request(CyclesAccounting::new( get_num_subnet_nodes(), ChargingPolicyWithCollateral::default(), )) - .service(canhttp::Client) + .service(canhttp::Client::new_with_error::()) +} + +fn observe_response(method: MetricRpcMethod, host: MetricRpcHost, status: u16) { + let status: u32 = status as u32; + add_metric_entry!(responses, (method, host, status.into()), 1); } +type JsonRpcServiceBuilder = ServiceBuilder< + Stack< + ConvertRequestLayer, + Stack< + ConvertRequestLayer>, + Stack, Identity>, + >, + >, +>; + /// Middleware that takes care of transforming the request. /// /// It's required to separate it from the other middlewares, to compute the exact request cost. -pub fn service_request_builder() -> ServiceBuilder< - Stack, Identity>>, -> { +pub fn service_request_builder() -> JsonRpcServiceBuilder { ServiceBuilder::new() .insert_request_header_if_not_present( CONTENT_TYPE, HeaderValue::from_static(CONTENT_TYPE_VALUE), ) - .layer(HttpRequestConversionLayer) + .convert_request(JsonRequestConverter::::new()) + .convert_request(HttpRequestConverter) } -struct MetricData { - method: MetricRpcMethod, - host: MetricRpcHost, +#[derive(Error, Debug)] +pub enum HttpClientError { + #[error("IC error: {0}")] + IcError(IcError), + #[error("unknown error (most likely sign of a bug): {0}")] + NotHandledError(BoxError), + #[error("cycles accounting error: {0}")] + CyclesAccountingError(CyclesAccountingError), + #[error("HTTP response was not successful: {0}")] + UnsuccessfulHttpResponse(FilterNonSuccessulHttpResponseError>), + #[error("Error converting response to JSON: {0}")] + InvalidJsonResponse(JsonResponseConversionError), } -fn map_error(e: BoxError) -> RpcError { - if let Some(charging_error) = e.downcast_ref::() { - return match charging_error { - CyclesAccountingError::InsufficientCyclesError { expected, received } => { - ProviderError::TooFewCycles { - expected: *expected, - received: *received, - } - .into() - } - }; +impl From for HttpClientError { + fn from(value: IcError) -> Self { + HttpClientError::IcError(value) } - if let Some(canhttp::IcError { code, message }) = e.downcast_ref::() { - return HttpOutcallError::IcError { - code: *code, - message: message.clone(), +} + +impl From for HttpClientError { + fn from(value: HttpResponseConversionError) -> Self { + // Replica should return valid http::Response + HttpClientError::NotHandledError(BoxError::from(value)) + } +} + +impl From>> for HttpClientError { + fn from(value: FilterNonSuccessulHttpResponseError>) -> Self { + HttpClientError::UnsuccessfulHttpResponse(value) + } +} + +impl From for HttpClientError { + fn from(value: JsonResponseConversionError) -> Self { + HttpClientError::InvalidJsonResponse(value) + } +} + +impl From for HttpClientError { + fn from(value: CyclesAccountingError) -> Self { + HttpClientError::CyclesAccountingError(value) + } +} + +impl From for HttpClientError { + fn from(value: HttpRequestConversionError) -> Self { + HttpClientError::NotHandledError(value.into()) + } +} + +impl From for HttpClientError { + fn from(value: JsonRequestConversionError) -> Self { + HttpClientError::NotHandledError(value.into()) + } +} + +impl From for RpcError { + fn from(error: HttpClientError) -> Self { + match error { + HttpClientError::IcError(IcError { code, message }) => { + RpcError::HttpOutcallError(HttpOutcallError::IcError { code, message }) + } + HttpClientError::NotHandledError(e) => { + RpcError::ValidationError(ValidationError::Custom(e.to_string())) + } + HttpClientError::CyclesAccountingError( + CyclesAccountingError::InsufficientCyclesError { expected, received }, + ) => RpcError::ProviderError(ProviderError::TooFewCycles { expected, received }), + HttpClientError::InvalidJsonResponse( + JsonResponseConversionError::InvalidJsonResponse { + status, + body, + parsing_error, + }, + ) => RpcError::HttpOutcallError(HttpOutcallError::InvalidHttpJsonRpcResponse { + status, + body, + parsing_error: Some(parsing_error), + }), + HttpClientError::UnsuccessfulHttpResponse( + FilterNonSuccessulHttpResponseError::UnsuccessfulResponse(response), + ) => RpcError::HttpOutcallError(HttpOutcallError::InvalidHttpJsonRpcResponse { + status: response.status().as_u16(), + body: String::from_utf8_lossy(response.body()).to_string(), + parsing_error: None, + }), } - .into(); } - RpcError::ProviderError(ProviderError::InvalidRpcConfig(format!( - "Unknown error: {}", - e - ))) +} + +struct MetricData { + method: MetricRpcMethod, + host: MetricRpcHost, + request_id: Option, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -198,15 +340,3 @@ pub fn transform_http_request(args: TransformArgs) -> IcHttpResponse { headers: vec![], } } - -pub fn get_http_response_body(response: HttpResponse) -> Result { - let (parts, body) = response.into_parts(); - String::from_utf8(body).map_err(|e| { - HttpOutcallError::InvalidHttpJsonRpcResponse { - status: parts.status.as_u16(), - body: "".to_string(), - parsing_error: Some(format!("{e}")), - } - .into() - }) -} diff --git a/src/main.rs b/src/main.rs index 41d0b17c..3b345f2b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,7 @@ use candid::candid_method; use canhttp::{CyclesChargingPolicy, CyclesCostEstimator}; use evm_rpc::candid_rpc::CandidRpcClient; -use evm_rpc::http::{ - get_http_response_body, service_request_builder, ChargingPolicyWithCollateral, -}; +use evm_rpc::http::{service_request_builder, ChargingPolicyWithCollateral}; use evm_rpc::logs::INFO; use evm_rpc::memory::{ get_num_subnet_nodes, insert_api_key, is_api_key_principal, is_demo_active, remove_api_key, @@ -19,7 +17,7 @@ use evm_rpc::{ memory::UNSTABLE_METRICS, types::Metrics, }; -use evm_rpc_types::{Hex32, MultiRpcResult, RpcResult}; +use evm_rpc_types::{Hex32, HttpOutcallError, MultiRpcResult, RpcResult}; use ic_canister_log::log; use ic_cdk::{ api::{ @@ -32,7 +30,6 @@ use ic_cdk::{ query, update, }; use ic_metrics_encoder::MetricsEncoder; -use std::convert::Infallible; use tower::Service; pub fn require_api_key_principal_or_controller() -> Result<(), String> { @@ -152,7 +149,14 @@ async fn request( max_response_bytes, ) .await?; - get_http_response_body(response) + serde_json::to_string(response.body()).map_err(|e| { + HttpOutcallError::InvalidHttpJsonRpcResponse { + status: response.status().as_u16(), + body: format!("{:?}", response.body()), + parsing_error: Some(format!("{e}")), + } + .into() + }) } #[query(name = "requestCost")] @@ -173,7 +177,7 @@ async fn request_cost( async fn extract_request( request: IcHttpRequest, - ) -> Result, Infallible> { + ) -> Result, tower::BoxError> { Ok(http::Response::new(request)) } diff --git a/src/rpc_client/eth_rpc/mod.rs b/src/rpc_client/eth_rpc/mod.rs index 0a3c5edf..c4a87e99 100644 --- a/src/rpc_client/eth_rpc/mod.rs +++ b/src/rpc_client/eth_rpc/mod.rs @@ -6,15 +6,15 @@ use crate::logs::{DEBUG, TRACE_HTTP}; use crate::memory::{get_override_provider, next_request_id}; use crate::providers::resolve_rpc_service; use crate::rpc_client::eth_rpc_error::{sanitize_send_raw_transaction_result, Parser}; -use crate::rpc_client::json::requests::JsonRpcRequest; use crate::rpc_client::json::responses::{ Block, FeeHistory, JsonRpcReply, JsonRpcResult, LogEntry, TransactionReceipt, }; use crate::rpc_client::numeric::{TransactionCount, Wei}; use crate::types::MetricRpcMethod; use candid::candid_method; +use canhttp::http::json::JsonRpcRequestBody; use canhttp::http::{MaxResponseBytesRequestExtension, TransformContextRequestExtension}; -use evm_rpc_types::{HttpOutcallError, RpcError, RpcService}; +use evm_rpc_types::{HttpOutcallError, JsonRpcError, RpcError, RpcService}; use ic_canister_log::log; use ic_cdk::api::call::RejectionCode; use ic_cdk::api::management_canister::http_request::{ @@ -171,20 +171,11 @@ pub async fn call( mut response_size_estimate: ResponseSizeEstimate, ) -> Result where - I: Serialize + Debug, - O: DeserializeOwned + HttpResponsePayload, + I: Serialize + Clone + Debug, + O: Debug + DeserializeOwned + HttpResponsePayload, { use tower::Service; - let eth_method = method.into(); - let rpc_method = MetricRpcMethod(eth_method.clone()); - let mut client = http_client(rpc_method); - let mut rpc_request = JsonRpcRequest { - jsonrpc: "2.0".to_string(), - params, - method: eth_method.clone(), - id: 1, - }; let transform_op = O::response_transform() .as_ref() .map(|t| { @@ -193,26 +184,31 @@ where buf }) .unwrap_or_default(); - let (parts, _body) = resolve_rpc_service(provider.clone())? + + let request = resolve_rpc_service(provider.clone())? .post(&get_override_provider())? .transform_context(TransformContext::from_name( "cleanup_response".to_owned(), transform_op.clone(), )) - .body(()) - .expect("BUG: invalid request") - .into_parts(); + .body(JsonRpcRequestBody::new(method, params)) + .expect("BUG: invalid request"); + + let eth_method = request.body().method().to_string(); + let mut client = http_client(MetricRpcMethod(eth_method.clone())); let mut retries = 0; loop { - rpc_request.id = next_request_id(); let effective_size_estimate = response_size_estimate.get(); - let request = - http::Request::from_parts(parts.clone(), serde_json::to_vec(&rpc_request).unwrap()) - .max_response_bytes(effective_size_estimate); + let request = { + let mut request = request.clone().max_response_bytes(effective_size_estimate); + let body = request.body_mut(); + body.set_id(next_request_id()); + request + }; let url = request.uri().clone(); log!( TRACE_HTTP, - "Calling url (retries={retries}): {}, with payload: {rpc_request:?}", + "Calling url (retries={retries}): {}, with payload: {request:?}", url ); @@ -232,37 +228,12 @@ where result => result?, }; - let (response_parts, response_body) = response.into_parts(); - log!( - TRACE_HTTP, - "Got response (with {} bytes): {} from url: {} with status: {}", - response_body.len(), - String::from_utf8_lossy(&response_body), - url, - response_parts.status - ); - - // JSON-RPC responses over HTTP should have a 2xx status code, - // even if the contained JsonRpcResult is an error. - // If the server is not available, it will sometimes (wrongly) return HTML that will fail parsing as JSON. - if !response_parts.status.is_success() { - return Err(HttpOutcallError::InvalidHttpJsonRpcResponse { - status: response_parts.status.as_u16(), - body: String::from_utf8_lossy(&response_body).to_string(), - parsing_error: None, - } - .into()); - } - - let reply: JsonRpcReply = serde_json::from_slice(&response_body).map_err(|e| { - HttpOutcallError::InvalidHttpJsonRpcResponse { - status: response_parts.status.as_u16(), - body: String::from_utf8_lossy(&response_body).to_string(), - parsing_error: Some(e.to_string()), + return match response.into_body().result { + canhttp::http::json::JsonRpcResult::Result(r) => Ok(r), + canhttp::http::json::JsonRpcResult::Error { code, message } => { + Err(JsonRpcError { code, message }.into()) } - })?; - - return reply.result.into(); + }; } } diff --git a/src/rpc_client/json/requests.rs b/src/rpc_client/json/requests.rs index cda58fa4..81ebf6b6 100644 --- a/src/rpc_client/json/requests.rs +++ b/src/rpc_client/json/requests.rs @@ -251,12 +251,3 @@ pub struct AccessListItem { #[serde(rename = "storageKeys")] pub storage_keys: Vec, } - -/// An envelope for all JSON-RPC requests. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct JsonRpcRequest { - pub jsonrpc: String, - pub method: String, - pub id: u64, - pub params: T, -} diff --git a/src/rpc_client/json/responses.rs b/src/rpc_client/json/responses.rs index 19d4d217..9d11febd 100644 --- a/src/rpc_client/json/responses.rs +++ b/src/rpc_client/json/responses.rs @@ -316,6 +316,7 @@ impl AsRef<[u8]> for Data { } } +//TODO XC-287: remove this type to use the corresponding ones from canhttp #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct JsonRpcReply { pub id: u64, diff --git a/src/rpc_client/mod.rs b/src/rpc_client/mod.rs index e7a530fd..01fbf9e1 100644 --- a/src/rpc_client/mod.rs +++ b/src/rpc_client/mod.rs @@ -295,7 +295,7 @@ impl EthRpcClient { ) -> MultiCallResults where I: Serialize + Clone + Debug, - O: DeserializeOwned + HttpResponsePayload, + O: Debug + DeserializeOwned + HttpResponsePayload, { let providers = self.providers(); let results = { diff --git a/tests/tests.rs b/tests/tests.rs index 6a9746f7..baae1eed 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -45,7 +45,7 @@ const MAX_TICKS: usize = 10; const MOCK_REQUEST_URL: &str = "https://cloudflare-eth.com"; const MOCK_REQUEST_PAYLOAD: &str = r#"{"id":1,"jsonrpc":"2.0","method":"eth_gasPrice"}"#; -const MOCK_REQUEST_RESPONSE: &str = r#"{"id":1,"jsonrpc":"2.0","result":"0x00112233"}"#; +const MOCK_REQUEST_RESPONSE: &str = r#"{"jsonrpc":"2.0","id":1,"result":"0x00112233"}"#; const MOCK_REQUEST_RESPONSE_BYTES: u64 = 1000; const MOCK_API_KEY: &str = "mock-api-key"; @@ -1150,7 +1150,7 @@ fn candid_rpc_should_err_when_service_unavailable() { HttpOutcallError::InvalidHttpJsonRpcResponse { status: 503, body: "Service unavailable".to_string(), - parsing_error: None, + parsing_error: None } )) ); @@ -1986,7 +1986,7 @@ fn upgrade_should_keep_demo() { setup .request_cost( RpcService::EthMainnet(EthMainnetService::PublicNode), - r#"{"jsonrpc":"2.0","id":0,"result":"0x1"}"#, + r#"{"jsonrpc":"2.0","id":0,"method":"test"}"#, 1000 ) .unwrap(), @@ -1997,7 +1997,7 @@ fn upgrade_should_keep_demo() { setup .request_cost( RpcService::EthMainnet(EthMainnetService::PublicNode), - r#"{"jsonrpc":"2.0","id":0,"result":"0x1"}"#, + r#"{"jsonrpc":"2.0","id":0,"method":"test"}"#, 1000 ) .unwrap(), @@ -2015,7 +2015,7 @@ fn upgrade_should_change_demo() { setup .request_cost( RpcService::EthMainnet(EthMainnetService::PublicNode), - r#"{"jsonrpc":"2.0","id":0,"result":"0x1"}"#, + r#"{"jsonrpc":"2.0","id":0,"method":"test"}"#, 1000 ) .unwrap(), @@ -2029,7 +2029,7 @@ fn upgrade_should_change_demo() { setup .request_cost( RpcService::EthMainnet(EthMainnetService::PublicNode), - r#"{"jsonrpc":"2.0","id":0,"result":"0x1"}"#, + r#"{"jsonrpc":"2.0","id":0,"method":"test"}"#, 1000 ) .unwrap(),