diff --git a/canhttp/src/client/mod.rs b/canhttp/src/client/mod.rs index 9f1e3f53..d9add4b1 100644 --- a/canhttp/src/client/mod.rs +++ b/canhttp/src/client/mod.rs @@ -1,6 +1,11 @@ +#[cfg(test)] +mod tests; + +use crate::convert::ConvertError; +use crate::ConvertServiceBuilder; use ic_cdk::api::call::RejectionCode; use ic_cdk::api::management_canister::http_request::{ - CanisterHttpRequestArgument as IcHttpRequest, HttpResponse as IcHttpResponse, + CanisterHttpRequestArgument as IcHttpRequest, HttpResponse as IcHttpResponse, TransformContext, }; use std::future::Future; use std::pin::Pin; @@ -16,21 +21,20 @@ use tower::{BoxError, Service, ServiceBuilder}; /// * [`crate::cycles::CyclesAccounting`]: handles cycles accounting. /// * [`crate::observability`]: add logging or metrics. /// * [`crate::http`]: use types from the [http](https://crates.io/crates/http) crate for requests and responses. +/// * [`crate::retry::DoubleMaxResponseBytes`]: automatically retry failed requests due to the response being too big. #[derive(Clone, Debug)] pub struct Client; impl Client { /// Create a new client returning custom errors. - pub fn new_with_error>( - ) -> impl Service { + pub fn new_with_error>() -> ConvertError { ServiceBuilder::new() - .map_err(CustomError::from) + .convert_error::() .service(Client) } /// Creates a new client where error type is erased. - pub fn new_with_box_error( - ) -> impl Service { + pub fn new_with_box_error() -> ConvertError { Self::new_with_error::() } } @@ -45,17 +49,6 @@ pub struct IcError { pub message: String, } -impl IcError { - /// Determines whether the error indicates that the response was larger than the specified - /// [`max_response_bytes`](https://internetcomputer.org/docs/current/references/ic-interface-spec#ic-http_request) specified in the request. - /// - /// If true, retrying with a larger value for `max_response_bytes` may help. - pub fn is_response_too_large(&self) -> bool { - self.code == RejectionCode::SysFatal - && (self.message.contains("size limit") || self.message.contains("length limit")) - } -} - impl Service for Client { type Response = IcHttpResponse; type Error = IcError; @@ -88,3 +81,106 @@ pub struct IcHttpRequestWithCycles { /// Number of cycles to attach. pub cycles: u128, } + +/// Add support for max response bytes. +pub trait MaxResponseBytesRequestExtension: Sized { + /// Set the max response bytes. + /// + /// If provided, the value must not exceed 2MB (2_000_000B). + /// The call will be charged based on this parameter. + /// If not provided, the maximum of 2MB will be used. + fn set_max_response_bytes(&mut self, value: u64); + + /// Retrieves the current max response bytes value, if any. + fn get_max_response_bytes(&self) -> Option; + + /// Convenience method to use the builder pattern. + fn max_response_bytes(mut self, value: u64) -> Self { + self.set_max_response_bytes(value); + self + } +} + +impl MaxResponseBytesRequestExtension for IcHttpRequest { + fn set_max_response_bytes(&mut self, value: u64) { + self.max_response_bytes = Some(value); + } + + fn get_max_response_bytes(&self) -> Option { + self.max_response_bytes + } +} + +impl MaxResponseBytesRequestExtension for IcHttpRequestWithCycles { + fn set_max_response_bytes(&mut self, value: u64) { + self.request.set_max_response_bytes(value); + } + + fn get_max_response_bytes(&self) -> Option { + self.request.get_max_response_bytes() + } +} + +/// Add support for transform context to specify how the response will be canonicalized by the replica +/// to maximize chances of consensus. +/// +/// See the [docs](https://internetcomputer.org/docs/references/https-outcalls-how-it-works#transformation-function) +/// on HTTPs outcalls for more details. +pub trait TransformContextRequestExtension: Sized { + /// Set the transform context. + fn set_transform_context(&mut self, value: TransformContext); + + /// Retrieve the current transform context, if any. + fn get_transform_context(&self) -> Option<&TransformContext>; + + /// Convenience method to use the builder pattern. + fn transform_context(mut self, value: TransformContext) -> Self { + self.set_transform_context(value); + self + } +} + +impl TransformContextRequestExtension for IcHttpRequest { + fn set_transform_context(&mut self, value: TransformContext) { + self.transform = Some(value); + } + + fn get_transform_context(&self) -> Option<&TransformContext> { + self.transform.as_ref() + } +} + +impl TransformContextRequestExtension for IcHttpRequestWithCycles { + fn set_transform_context(&mut self, value: TransformContext) { + self.request.set_transform_context(value); + } + + fn get_transform_context(&self) -> Option<&TransformContext> { + self.request.get_transform_context() + } +} + +/// Characterize errors that are specific to HTTPs outcalls. +pub trait HttpsOutcallError { + /// Determines whether the error indicates that the response was larger than the specified + /// [`max_response_bytes`](https://internetcomputer.org/docs/current/references/ic-interface-spec#ic-http_request) specified in the request. + /// + /// If true, retrying with a larger value for `max_response_bytes` may help. + fn is_response_too_large(&self) -> bool; +} + +impl HttpsOutcallError for IcError { + fn is_response_too_large(&self) -> bool { + self.code == RejectionCode::SysFatal + && (self.message.contains("size limit") || self.message.contains("length limit")) + } +} + +impl HttpsOutcallError for BoxError { + fn is_response_too_large(&self) -> bool { + if let Some(ic_error) = self.downcast_ref::() { + return ic_error.is_response_too_large(); + } + false + } +} diff --git a/canhttp/src/client/tests.rs b/canhttp/src/client/tests.rs new file mode 100644 index 00000000..792641d7 --- /dev/null +++ b/canhttp/src/client/tests.rs @@ -0,0 +1,43 @@ +use crate::retry::DoubleMaxResponseBytes; +use crate::{Client, HttpsOutcallError, IcError}; +use tower::{ServiceBuilder, ServiceExt}; + +// Some middlewares like tower::retry need the underlying service to be cloneable. +#[test] +fn should_be_clone() { + let client = Client::new_with_box_error(); + let _ = client.clone(); + + let client = Client::new_with_error::(); + let _ = client.clone(); +} + +// Note that calling `Client::call` would require a canister environment. +// We just ensure that the trait bounds are satisfied to have a service. +#[tokio::test] +async fn should_be_able_to_use_retry_layer() { + let mut service = ServiceBuilder::new() + .retry(DoubleMaxResponseBytes) + .service(Client::new_with_error::()); + let _ = service.ready().await.unwrap(); + + let mut service = ServiceBuilder::new() + .retry(DoubleMaxResponseBytes) + .service(Client::new_with_box_error()); + let _ = service.ready().await.unwrap(); +} + +#[derive(Debug)] +struct CustomError(IcError); + +impl HttpsOutcallError for CustomError { + fn is_response_too_large(&self) -> bool { + self.0.is_response_too_large() + } +} + +impl From for CustomError { + fn from(value: IcError) -> Self { + CustomError(value) + } +} diff --git a/canhttp/src/convert/error.rs b/canhttp/src/convert/error.rs new file mode 100644 index 00000000..7a931610 --- /dev/null +++ b/canhttp/src/convert/error.rs @@ -0,0 +1,115 @@ +use pin_project::pin_project; +use std::future::Future; +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tower::Service; +use tower_layer::Layer; + +/// Convert error of a service into another type, where the conversion does *not* fail. +/// +/// This [`Layer`] produces instances of the [`ConvertError`] service. +/// +/// [`Layer`]: tower::Layer +#[derive(Debug)] +pub struct ConvertErrorLayer { + _marker: PhantomData, +} + +impl ConvertErrorLayer { + /// Returns a new [`ConvertErrorLayer`] + pub fn new() -> Self { + Self { + _marker: PhantomData, + } + } +} + +impl Default for ConvertErrorLayer { + fn default() -> Self { + Self::new() + } +} + +impl Clone for ConvertErrorLayer { + fn clone(&self) -> Self { + Self { + _marker: self._marker, + } + } +} + +/// Convert the inner service error to another type, where the conversion does *not* fail. +#[derive(Debug)] +pub struct ConvertError { + inner: S, + _marker: PhantomData, +} + +impl Clone for ConvertError { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + _marker: self._marker, + } + } +} + +impl Layer for ConvertErrorLayer { + type Service = ConvertError; + + fn layer(&self, inner: S) -> Self::Service { + Self::Service { + inner, + _marker: PhantomData, + } + } +} + +impl Service for ConvertError +where + S: Service, + Error: Into, +{ + type Response = S::Response; + type Error = NewError; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) + } + + fn call(&mut self, req: Request) -> Self::Future { + ResponseFuture { + response_future: self.inner.call(req), + _marker: PhantomData, + } + } +} + +#[pin_project] +pub struct ResponseFuture { + #[pin] + response_future: F, + _marker: PhantomData, +} + +impl Future for ResponseFuture +where + F: Future>, + Error: Into, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let result_fut = this.response_future.poll(cx); + match result_fut { + Poll::Ready(result) => match result { + Ok(response) => Poll::Ready(Ok(response)), + Err(e) => Poll::Ready(Err(e.into())), + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/canhttp/src/convert/mod.rs b/canhttp/src/convert/mod.rs index 076607ef..ae89f701 100644 --- a/canhttp/src/convert/mod.rs +++ b/canhttp/src/convert/mod.rs @@ -98,10 +98,62 @@ //! # Ok(()) //! # } //! ``` +//! +//! ## To convert errors +//! +//! A service that returns an error of type `Error` can be turned into a service that returns +//! errors of type `NewError`, if `Error` can be converted `Into` the type `NewError`. +//! This is automatically the case if `NewError` implements `From`. +//! +//! ```rust +//! use canhttp::convert::{Convert, ConvertServiceBuilder}; +//! use tower::{ServiceBuilder, Service, ServiceExt}; +//! +//! enum Error { +//! OhNo +//! } +//! async fn bare_bone_service(request: Vec) -> Result, Error> { +//! Err(Error::OhNo) +//! } +//! +//! #[derive(Debug, PartialEq)] +//! enum NewError { +//! Oops +//! } +//! +//! impl From for NewError { +//! fn from(value: Error) -> Self { +//! match value { +//! Error::OhNo => NewError::Oops +//! } +//! } +//! } +//! +//! # #[tokio::main] +//! # async fn main() -> Result<(), Box> { +//! let mut service = ServiceBuilder::new() +//! .convert_error::() +//! .service_fn(bare_bone_service); +//! +//! let request = vec![42]; +//! +//! let response = service +//! .ready() +//! .await +//! .unwrap() +//! .call(request) +//! .await; +//! +//! assert_eq!(response, Err(NewError::Oops)); +//! # Ok(()) +//! # } +//! ``` +pub use error::{ConvertError, ConvertErrorLayer}; pub use request::{ConvertRequest, ConvertRequestLayer}; pub use response::{ConvertResponse, ConvertResponseLayer}; +mod error; mod request; mod response; @@ -132,6 +184,11 @@ pub trait ConvertServiceBuilder { /// /// See the [module docs](crate::convert) for examples. fn convert_response(self, f: C) -> ServiceBuilder, L>>; + + /// Convert the error type. + /// + /// See the [module docs](crate::convert) for examples. + fn convert_error(self) -> ServiceBuilder, L>>; } impl ConvertServiceBuilder for ServiceBuilder { @@ -145,4 +202,8 @@ impl ConvertServiceBuilder for ServiceBuilder { ) -> ServiceBuilder, L>> { self.layer(ConvertResponseLayer::new(converter)) } + + fn convert_error(self) -> ServiceBuilder, L>> { + self.layer(ConvertErrorLayer::new()) + } } diff --git a/canhttp/src/http/mod.rs b/canhttp/src/http/mod.rs index b7db6322..49052c24 100644 --- a/canhttp/src/http/mod.rs +++ b/canhttp/src/http/mod.rs @@ -29,7 +29,7 @@ //! # Examples //! //! ```rust -//! use canhttp::{http::{HttpConversionLayer, MaxResponseBytesRequestExtension}, IcError}; +//! use canhttp::{http::{HttpConversionLayer }, IcError, MaxResponseBytesRequestExtension}; //! use ic_cdk::api::management_canister::http_request::{CanisterHttpRequestArgument as IcHttpRequest, HttpResponse as IcHttpResponse}; //! use tower::{Service, ServiceBuilder, ServiceExt, BoxError}; //! @@ -65,10 +65,7 @@ #[cfg(test)] mod tests; -pub use request::{ - HttpRequest, HttpRequestConversionError, HttpRequestConverter, - MaxResponseBytesRequestExtension, TransformContextRequestExtension, -}; +pub use request::{HttpRequest, HttpRequestConversionError, HttpRequestConverter}; pub use response::{ FilterNonSuccessfulHttpResponse, FilterNonSuccessulHttpResponseError, HttpResponse, HttpResponseConversionError, HttpResponseConverter, diff --git a/canhttp/src/http/request.rs b/canhttp/src/http/request.rs index 4021ced8..68235468 100644 --- a/canhttp/src/http/request.rs +++ b/canhttp/src/http/request.rs @@ -1,4 +1,5 @@ use crate::convert::Convert; +use crate::{MaxResponseBytesRequestExtension, TransformContextRequestExtension}; use ic_cdk::api::management_canister::http_request::{ CanisterHttpRequestArgument as IcHttpRequest, HttpHeader as IcHttpHeader, HttpMethod as IcHttpMethod, TransformContext, @@ -8,25 +9,6 @@ use thiserror::Error; /// HTTP request with a body made of bytes. pub type HttpRequest = http::Request>; -/// Add support for max response bytes. -pub trait MaxResponseBytesRequestExtension: Sized { - /// Set the max response bytes. - /// - /// If provided, the value must not exceed 2MB (2_000_000B). - /// The call will be charged based on this parameter. - /// If not provided, the maximum of 2MB will be used. - fn set_max_response_bytes(&mut self, value: u64); - - /// Retrieves the current max response bytes value, if any. - fn get_max_response_bytes(&self) -> Option; - - /// Convenience method to use the builder pattern. - fn max_response_bytes(mut self, value: u64) -> Self { - self.set_max_response_bytes(value); - self - } -} - #[derive(Clone, Debug, PartialEq, Eq)] struct MaxResponseBytesExtension(pub u64); @@ -56,25 +38,6 @@ impl MaxResponseBytesRequestExtension for http::request::Builder { } } -/// Add support for transform context to specify how the response will be canonicalized by the replica -/// to maximize chances of consensus. -/// -/// See the [docs](https://internetcomputer.org/docs/references/https-outcalls-how-it-works#transformation-function) -/// on HTTPs outcalls for more details. -pub trait TransformContextRequestExtension: Sized { - /// Set the transform context. - fn set_transform_context(&mut self, value: TransformContext); - - /// Retrieve the current transform context, if any. - fn get_transform_context(&self) -> Option<&TransformContext>; - - /// Convenience method to use the builder pattern. - fn transform_context(mut self, value: TransformContext) -> Self { - self.set_transform_context(value); - self - } -} - #[derive(Clone, Debug, PartialEq, Eq)] struct TransformContextExtension(pub TransformContext); diff --git a/canhttp/src/http/tests.rs b/canhttp/src/http/tests.rs index 844c3b35..688b08db 100644 --- a/canhttp/src/http/tests.rs +++ b/canhttp/src/http/tests.rs @@ -1,10 +1,10 @@ use crate::http::request::HttpRequestConversionError; use crate::http::response::{HttpResponse, HttpResponseConversionError}; -use crate::http::{ - HttpConversionLayer, HttpRequestConverter, HttpResponseConverter, - MaxResponseBytesRequestExtension, TransformContextRequestExtension, +use crate::http::{HttpConversionLayer, HttpRequestConverter, HttpResponseConverter}; +use crate::{ + ConvertServiceBuilder, IcError, MaxResponseBytesRequestExtension, + TransformContextRequestExtension, }; -use crate::{ConvertServiceBuilder, IcError}; use assert_matches::assert_matches; use candid::{Decode, Encode, Principal}; use http::StatusCode; diff --git a/canhttp/src/lib.rs b/canhttp/src/lib.rs index c910a6ea..4c8b7024 100644 --- a/canhttp/src/lib.rs +++ b/canhttp/src/lib.rs @@ -5,15 +5,18 @@ #![forbid(unsafe_code)] #![forbid(missing_docs)] -pub use client::{Client, IcError, IcHttpRequestWithCycles}; +pub use client::{ + Client, HttpsOutcallError, IcError, IcHttpRequestWithCycles, MaxResponseBytesRequestExtension, + TransformContextRequestExtension, +}; pub use convert::ConvertServiceBuilder; pub use cycles::{ CyclesAccounting, CyclesAccountingError, CyclesChargingPolicy, CyclesCostEstimator, }; - mod client; pub mod convert; mod cycles; #[cfg(feature = "http")] pub mod http; pub mod observability; +pub mod retry; diff --git a/canhttp/src/observability/mod.rs b/canhttp/src/observability/mod.rs index 1ee19261..02a31d83 100644 --- a/canhttp/src/observability/mod.rs +++ b/canhttp/src/observability/mod.rs @@ -172,6 +172,7 @@ use tower::{Layer, Service}; /// /// [`Layer`]: tower::Layer /// [`Service`]: tower::Service +#[derive(Clone, Debug)] pub struct ObservabilityLayer { on_request: OnRequest, on_response: OnResponse, @@ -263,6 +264,7 @@ where /// See the [module docs](crate::observability) for an example. /// /// [`Service`]: tower::Service +#[derive(Clone, Debug)] pub struct Observability { inner: S, on_request: OnRequest, diff --git a/canhttp/src/retry/mod.rs b/canhttp/src/retry/mod.rs new file mode 100644 index 00000000..0a204320 --- /dev/null +++ b/canhttp/src/retry/mod.rs @@ -0,0 +1,101 @@ +//! Middleware for retrying "failed" requests. + +#[cfg(test)] +mod tests; + +use crate::{HttpsOutcallError, MaxResponseBytesRequestExtension}; +use std::future; +use tower::retry; + +// This constant comes from the IC specification: +// > If provided, the value must not exceed 2MB +const HTTP_MAX_SIZE: u64 = 2_000_000; + +/// Double the request `max_response_bytes` in case the error indicates the response was too big. +/// +/// The value for `max_response_bytes` will be doubled until one of the following conditions happen: +/// 1. Either the response is `Ok` or the error is not due to the response being too big; +/// 2. Or, the maximum value of 2MB (`2_000_000`) is reached. +/// +/// # Examples +/// +/// ```rust +/// use tower::{Service, ServiceBuilder, ServiceExt}; +/// use canhttp::{Client, http::HttpRequest, HttpsOutcallError, IcError, MaxResponseBytesRequestExtension, retry::DoubleMaxResponseBytes}; +/// use ic_cdk::api::call::RejectionCode; +/// +/// fn response_is_too_large_error() -> IcError { +/// let error = IcError { +/// code: RejectionCode::SysFatal, +/// message: "Http body exceeds size limit".to_string(), +/// }; +/// assert!(error.is_response_too_large()); +/// error +/// } +/// +/// # #[tokio::main] +/// # async fn main() -> Result<(), Box> { +/// let mut service = ServiceBuilder::new() +/// .retry(DoubleMaxResponseBytes) +/// .service_fn(|request: HttpRequest| async move { +/// match request.get_max_response_bytes() { +/// Some(max_response_bytes) if max_response_bytes >= 8192 => Ok(()), +/// _ => Err::<(), IcError>(response_is_too_large_error()), +/// } +/// }); +/// +/// let request = http::Request::post("https://internetcomputer.org/") +/// .max_response_bytes(0) +/// .body(vec![]) +/// .unwrap(); +/// +/// // This will effectively do 4 calls with the following max_response_bytes values: 0, 2048, 4096, 8192. +/// let response = service.ready().await?.call(request).await; +/// +/// assert_eq!(response, Ok(())); +/// # Ok(()) +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct DoubleMaxResponseBytes; + +impl retry::Policy for DoubleMaxResponseBytes +where + Request: MaxResponseBytesRequestExtension + Clone, + Error: HttpsOutcallError, +{ + type Future = future::Ready<()>; + + fn retry( + &mut self, + req: &mut Request, + result: &mut Result, + ) -> Option { + match result { + Err(e) if e.is_response_too_large() => { + if let Some(previous_estimate) = req.get_max_response_bytes() { + let new_estimate = previous_estimate + .max(1024) + .saturating_mul(2) + .min(HTTP_MAX_SIZE); + if new_estimate > previous_estimate { + req.set_max_response_bytes(new_estimate); + return Some(future::ready(())); + } + } + None + } + _ => None, + } + } + + fn clone_request(&mut self, req: &Request) -> Option { + match req.get_max_response_bytes() { + Some(max_response_bytes) if max_response_bytes < HTTP_MAX_SIZE => Some(req.clone()), + // Not having a value is equivalent to setting `max_response_bytes` to the maximum value. + // If there is a value, it's at least the maximum value. + // In both cases retrying will not help. + _ => None, + } + } +} diff --git a/canhttp/src/retry/tests.rs b/canhttp/src/retry/tests.rs new file mode 100644 index 00000000..09ee63be --- /dev/null +++ b/canhttp/src/retry/tests.rs @@ -0,0 +1,194 @@ +use crate::http::HttpRequest; +use crate::retry::DoubleMaxResponseBytes; +use crate::{HttpsOutcallError, IcError, MaxResponseBytesRequestExtension}; +use assert_matches::assert_matches; +use ic_cdk::api::call::RejectionCode; +use std::future; +use std::sync::mpsc; +use std::sync::mpsc::Sender; +use std::task::{Context, Poll}; +use tower::{Service, ServiceBuilder, ServiceExt}; + +#[tokio::test] +async fn should_retry_until_max() { + let (requests_tx, requests_rx) = mpsc::channel::(); + + let mut service = + ServiceBuilder::new() + .retry(DoubleMaxResponseBytes) + .service(StoreRequestServiceAndError::::always_error( + requests_tx.clone(), + )); + + let request = http::Request::post("https://internetcomputer.org/") + .max_response_bytes(0) + .body(vec![]) + .unwrap(); + + let response = service + .ready() + .await + .unwrap() + .call(request) + .await + .unwrap_err(); + assert!(response.is_response_too_large()); + + let mut all_requests = Vec::new(); + let mut iter = requests_rx.try_iter(); + while let Some(req) = iter.next() { + all_requests.push(req); + } + + assert_eq!(all_requests.len(), 12); + assert_eq!( + all_requests + .into_iter() + .map(|r| r.get_max_response_bytes().unwrap()) + .collect::>(), + vec![ + 0, + 1024 << 1, + 1024 << 2, + 1024 << 3, + 1024 << 4, + 1024 << 5, + 1024 << 6, + 1024 << 7, + 1024 << 8, + 1024 << 9, + 1024 << 10, + 2_000_000 + ] + ); +} + +#[tokio::test] +async fn should_not_retry() { + for max_response_bytes in [Some(2_000_000_u64), None] { + let (requests_tx, requests_rx) = mpsc::channel::(); + + let mut service = ServiceBuilder::new().retry(DoubleMaxResponseBytes).service( + StoreRequestServiceAndError::::always_error(requests_tx.clone()), + ); + + let mut builder = http::Request::post("https://internetcomputer.org/"); + if let Some(max_response_bytes) = max_response_bytes { + builder = builder.max_response_bytes(max_response_bytes); + } + let request = builder.body(vec![]).unwrap(); + + let response = service + .ready() + .await + .unwrap() + .call(request) + .await + .unwrap_err(); + assert!(response.is_response_too_large()); + + let mut all_requests = Vec::new(); + let mut iter = requests_rx.try_iter(); + while let Some(req) = iter.next() { + all_requests.push(req); + } + assert_eq!(all_requests.len(), 1); + } +} + +#[tokio::test] +async fn should_stop_retrying_when_ok() { + let (requests_tx, requests_rx) = mpsc::channel::(); + + let num_errors = 3; + let mut service = + ServiceBuilder::new() + .retry(DoubleMaxResponseBytes) + .service(StoreRequestServiceAndError::::error_n_times( + requests_tx.clone(), + num_errors, + )); + + let request = http::Request::post("https://internetcomputer.org/") + .max_response_bytes(0) + .body(vec![]) + .unwrap(); + + let response = service.ready().await.unwrap().call(request).await; + assert_matches!(response, Ok(_)); + + let mut all_requests = Vec::new(); + let mut iter = requests_rx.try_iter(); + while let Some(req) = iter.next() { + all_requests.push(req); + } + + assert_eq!(all_requests.len(), (num_errors + 1) as usize); + assert_eq!( + all_requests + .into_iter() + .map(|r| r.get_max_response_bytes().unwrap()) + .collect::>(), + vec![0, 1024 << 1, 1024 << 2, 1024 << 3] + ); +} + +#[derive(Clone, Debug)] +pub struct StoreRequestServiceAndError { + requests: Sender, + num_calls: u8, + num_errors_before_ok: u8, +} + +impl StoreRequestServiceAndError { + pub fn always_error(requests: Sender) -> Self { + Self { + requests, + num_calls: 0, + num_errors_before_ok: u8::MAX, + } + } + + pub fn error_n_times(requests: Sender, num_errors: u8) -> Self { + Self { + requests, + num_calls: 0, + num_errors_before_ok: num_errors, + } + } +} + +impl Service for StoreRequestServiceAndError +where + Request: Clone, +{ + type Response = Request; + type Error = IcError; + type Future = future::Ready>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.num_calls = self + .num_calls + .checked_add(1) + .expect("Unexpected large number of calls to service"); + self.requests.send(req.clone()).unwrap(); + if self.num_calls <= self.num_errors_before_ok { + future::ready(Err(response_is_too_large_error())) + } else { + future::ready(Ok(req)) + } + } +} + +fn response_is_too_large_error() -> IcError { + let error = IcError { + code: RejectionCode::SysFatal, + message: "Http body exceeds size limit".to_string(), + }; + assert!(error.is_response_too_large()); + error +} diff --git a/src/http.rs b/src/http.rs index 07293083..2e37a213 100644 --- a/src/http.rs +++ b/src/http.rs @@ -1,6 +1,6 @@ use crate::constants::COLLATERAL_CYCLES_PER_NODE; use crate::logs::{INFO, TRACE_HTTP}; -use crate::memory::{get_num_subnet_nodes, is_demo_active}; +use crate::memory::{get_num_subnet_nodes, is_demo_active, next_request_id}; use crate::{ add_metric_entry, constants::CONTENT_TYPE_VALUE, @@ -8,6 +8,7 @@ use crate::{ types::{MetricRpcHost, MetricRpcMethod, ResolvedRpcService}, util::canonicalize_json, }; +use canhttp::retry::DoubleMaxResponseBytes; use canhttp::{ convert::ConvertRequestLayer, http::{ @@ -18,10 +19,11 @@ use canhttp::{ }, FilterNonSuccessfulHttpResponse, FilterNonSuccessulHttpResponseError, HttpRequestConversionError, HttpRequestConverter, HttpResponseConversionError, - HttpResponseConverter, MaxResponseBytesRequestExtension, TransformContextRequestExtension, + HttpResponseConverter, }, observability::ObservabilityLayer, - ConvertServiceBuilder, CyclesAccounting, CyclesAccountingError, CyclesChargingPolicy, IcError, + ConvertServiceBuilder, CyclesAccounting, CyclesAccountingError, CyclesChargingPolicy, + HttpsOutcallError, IcError, MaxResponseBytesRequestExtension, TransformContextRequestExtension, }; use evm_rpc_types::{HttpOutcallError, ProviderError, RpcError, RpcResult, ValidationError}; use http::header::CONTENT_TYPE; @@ -36,7 +38,9 @@ use serde::Serialize; use std::fmt::Debug; use thiserror::Error; use tower::layer::util::{Identity, Stack}; -use tower::{BoxError, Service, ServiceBuilder}; +use tower::retry::RetryLayer; +use tower::util::MapRequestLayer; +use tower::{Service, ServiceBuilder}; use tower_http::set_header::SetRequestHeaderLayer; use tower_http::ServiceBuilderExt; @@ -70,20 +74,33 @@ pub async fn json_rpc_request( max_response_bytes: u64, ) -> RpcResult> { let request = json_rpc_request_arg(service, json_rpc_payload, max_response_bytes)?; - http_client(MetricRpcMethod("request".to_string())) + http_client(MetricRpcMethod("request".to_string()), false) .call(request) .await } pub fn http_client( rpc_method: MetricRpcMethod, + retry: bool, ) -> impl Service, Response = HttpJsonRpcResponse, Error = RpcError> where - I: Serialize, + I: Serialize + Clone, O: DeserializeOwned + Debug, { + let maybe_retry = if retry { + Some(RetryLayer::new(DoubleMaxResponseBytes)) + } else { + None + }; + let maybe_unique_id = if retry { + Some(MapRequestLayer::new(generate_request_id)) + } else { + None + }; ServiceBuilder::new() .map_err(|e: HttpClientError| RpcError::from(e)) + .option_layer(maybe_retry) + .option_layer(maybe_unique_id) .layer( ObservabilityLayer::new() .on_request(move |req: &HttpJsonRpcRequest| { @@ -167,6 +184,12 @@ where .service(canhttp::Client::new_with_error::()) } +fn generate_request_id(request: HttpJsonRpcRequest) -> HttpJsonRpcRequest { + let (parts, mut body) = request.into_parts(); + body.set_id(next_request_id()); + http::Request::from_parts(parts, body) +} + fn observe_response(method: MetricRpcMethod, host: MetricRpcHost, status: u16) { let status: u32 = status as u32; add_metric_entry!(responses, (method, host, status.into()), 1); @@ -195,12 +218,12 @@ pub fn service_request_builder() -> JsonRpcServiceBuilder { .convert_request(HttpRequestConverter) } -#[derive(Error, Debug)] +#[derive(Clone, Debug, Error)] pub enum HttpClientError { #[error("IC error: {0}")] IcError(IcError), #[error("unknown error (most likely sign of a bug): {0}")] - NotHandledError(BoxError), + NotHandledError(String), #[error("cycles accounting error: {0}")] CyclesAccountingError(CyclesAccountingError), #[error("HTTP response was not successful: {0}")] @@ -218,7 +241,7 @@ impl From for HttpClientError { impl From for HttpClientError { fn from(value: HttpResponseConversionError) -> Self { // Replica should return valid http::Response - HttpClientError::NotHandledError(BoxError::from(value)) + HttpClientError::NotHandledError(value.to_string()) } } @@ -242,13 +265,13 @@ impl From for HttpClientError { impl From for HttpClientError { fn from(value: HttpRequestConversionError) -> Self { - HttpClientError::NotHandledError(value.into()) + HttpClientError::NotHandledError(value.to_string()) } } impl From for HttpClientError { fn from(value: JsonRequestConversionError) -> Self { - HttpClientError::NotHandledError(value.into()) + HttpClientError::NotHandledError(value.to_string()) } } @@ -259,7 +282,7 @@ impl From for RpcError { RpcError::HttpOutcallError(HttpOutcallError::IcError { code, message }) } HttpClientError::NotHandledError(e) => { - RpcError::ValidationError(ValidationError::Custom(e.to_string())) + RpcError::ValidationError(ValidationError::Custom(e)) } HttpClientError::CyclesAccountingError( CyclesAccountingError::InsufficientCyclesError { expected, received }, @@ -286,6 +309,18 @@ impl From for RpcError { } } +impl HttpsOutcallError for HttpClientError { + fn is_response_too_large(&self) -> bool { + match self { + HttpClientError::IcError(e) => e.is_response_too_large(), + HttpClientError::NotHandledError(_) + | HttpClientError::CyclesAccountingError(_) + | HttpClientError::UnsuccessfulHttpResponse(_) + | HttpClientError::InvalidJsonResponse(_) => false, + } + } +} + struct MetricData { method: MetricRpcMethod, host: MetricRpcHost, diff --git a/src/rpc_client/eth_rpc/mod.rs b/src/rpc_client/eth_rpc/mod.rs index c4a87e99..5b40b83d 100644 --- a/src/rpc_client/eth_rpc/mod.rs +++ b/src/rpc_client/eth_rpc/mod.rs @@ -2,8 +2,7 @@ //! interface. use crate::http::http_client; -use crate::logs::{DEBUG, TRACE_HTTP}; -use crate::memory::{get_override_provider, next_request_id}; +use crate::memory::get_override_provider; use crate::providers::resolve_rpc_service; use crate::rpc_client::eth_rpc_error::{sanitize_send_raw_transaction_result, Parser}; use crate::rpc_client::json::responses::{ @@ -12,11 +11,11 @@ use crate::rpc_client::json::responses::{ use crate::rpc_client::numeric::{TransactionCount, Wei}; use crate::types::MetricRpcMethod; use candid::candid_method; -use canhttp::http::json::JsonRpcRequestBody; -use canhttp::http::{MaxResponseBytesRequestExtension, TransformContextRequestExtension}; -use evm_rpc_types::{HttpOutcallError, JsonRpcError, RpcError, RpcService}; -use ic_canister_log::log; -use ic_cdk::api::call::RejectionCode; +use canhttp::{ + http::json::JsonRpcRequestBody, MaxResponseBytesRequestExtension, + TransformContextRequestExtension, +}; +use evm_rpc_types::{JsonRpcError, RpcError, RpcService}; use ic_cdk::api::management_canister::http_request::{ HttpResponse, TransformArgs, TransformContext, }; @@ -118,11 +117,6 @@ fn cleanup_response(mut args: TransformArgs) -> HttpResponse { args.response } -pub fn is_response_too_large(code: &RejectionCode, message: &str) -> bool { - code == &RejectionCode::SysFatal - && (message.contains("size limit") || message.contains("length limit")) -} - #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct ResponseSizeEstimate(u64); @@ -138,11 +132,6 @@ impl ResponseSizeEstimate { pub fn get(self) -> u64 { self.0 } - - /// Returns a higher estimate for the payload size. - pub fn adjust(self) -> Self { - Self(self.0.max(1024).saturating_mul(2).min(MAX_PAYLOAD_SIZE)) - } } impl fmt::Display for ResponseSizeEstimate { @@ -168,7 +157,7 @@ pub async fn call( provider: &RpcService, method: impl Into, params: I, - mut response_size_estimate: ResponseSizeEstimate, + response_size_estimate: ResponseSizeEstimate, ) -> Result where I: Serialize + Clone + Debug, @@ -185,8 +174,10 @@ where }) .unwrap_or_default(); + let effective_size_estimate = response_size_estimate.get(); let request = resolve_rpc_service(provider.clone())? .post(&get_override_provider())? + .max_response_bytes(effective_size_estimate) .transform_context(TransformContext::from_name( "cleanup_response".to_owned(), transform_op.clone(), @@ -195,45 +186,13 @@ where .expect("BUG: invalid request"); let eth_method = request.body().method().to_string(); - let mut client = http_client(MetricRpcMethod(eth_method.clone())); - let mut retries = 0; - loop { - let effective_size_estimate = response_size_estimate.get(); - let request = { - let mut request = request.clone().max_response_bytes(effective_size_estimate); - let body = request.body_mut(); - body.set_id(next_request_id()); - request - }; - let url = request.uri().clone(); - log!( - TRACE_HTTP, - "Calling url (retries={retries}): {}, with payload: {request:?}", - url - ); - - let response = match client.call(request).await { - Err(RpcError::HttpOutcallError(HttpOutcallError::IcError { code, message })) - if is_response_too_large(&code, &message) => - { - let new_estimate = response_size_estimate.adjust(); - if response_size_estimate == new_estimate { - return Err(HttpOutcallError::IcError { code, message }.into()); - } - log!(DEBUG, "The {eth_method} response didn't fit into {response_size_estimate} bytes, retrying with {new_estimate}"); - response_size_estimate = new_estimate; - retries += 1; - continue; - } - result => result?, - }; - - return match response.into_body().result { - canhttp::http::json::JsonRpcResult::Result(r) => Ok(r), - canhttp::http::json::JsonRpcResult::Error { code, message } => { - Err(JsonRpcError { code, message }.into()) - } - }; + let mut client = http_client(MetricRpcMethod(eth_method.clone()), true); + let response = client.call(request).await?; + match response.into_body().result { + canhttp::http::json::JsonRpcResult::Result(r) => Ok(r), + canhttp::http::json::JsonRpcResult::Error { code, message } => { + Err(JsonRpcError { code, message }.into()) + } } } diff --git a/tests/tests.rs b/tests/tests.rs index baae1eed..67570dbf 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -600,6 +600,31 @@ fn should_canonicalize_json_response() { assert!(responses.windows(2).all(|w| w[0] == w[1])); } +#[test] +fn should_not_modify_json_rpc_request_from_request_endpoint() { + let setup = EvmRpcSetup::new(); + + let json_rpc_request = r#"{"id":123,"jsonrpc":"2.0","method":"eth_gasPrice"}"#; + let mock_response = r#"{"jsonrpc":"2.0","id":123,"result":"0x00112233"}"#; + + let response = setup + .request( + RpcService::Custom(RpcApi { + url: MOCK_REQUEST_URL.to_string(), + headers: None, + }), + json_rpc_request, + MOCK_REQUEST_RESPONSE_BYTES, + ) + .mock_http_once( + MockOutcallBuilder::new(200, mock_response).with_raw_request_body(json_rpc_request), + ) + .wait() + .unwrap(); + + assert_eq!(response, mock_response); +} + #[test] fn should_decode_renamed_field() { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, CandidType)] @@ -2143,7 +2168,7 @@ fn should_retry_when_response_too_large() { .mock_http_once(mock.clone().with_max_response_bytes(1024 << 8)) .mock_http_once(mock.clone().with_max_response_bytes(1024 << 9)) .mock_http_once(mock.clone().with_max_response_bytes(1024 << 10)) - .mock_http_once(mock.clone().with_max_response_bytes(2_000_000 - 2 * 1024)) + .mock_http_once(mock.clone().with_max_response_bytes(2_000_000)) .wait() .expect_consistent(); @@ -2191,6 +2216,58 @@ fn should_retry_when_response_too_large() { ); } +#[test] +fn should_have_different_request_ids_when_retrying_because_response_too_big() { + let setup = EvmRpcSetup::new().mock_api_keys(); + + let response = setup + .eth_get_transaction_count( + RpcServices::EthMainnet(Some(vec![EthMainnetService::Cloudflare])), + Some(evm_rpc_types::RpcConfig { + response_size_estimate: Some(1), + response_consensus: None, + }), + evm_rpc_types::GetTransactionCountArgs { + address: "0xdAC17F958D2ee523a2206206994597C13D831ec7" + .parse() + .unwrap(), + block: evm_rpc_types::BlockTag::Latest, + }, + ) + .mock_http_once( + MockOutcallBuilder::new(200, r#"{"jsonrpc":"2.0","id":0,"result":"0x1"}"#) + .with_raw_request_body(r#"{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0xdac17f958d2ee523a2206206994597c13d831ec7","latest"],"id":0}"#) + .with_max_response_bytes(1), + ) + .mock_http_once( + MockOutcallBuilder::new(200, r#"{"jsonrpc":"2.0","id":1,"result":"0x1"}"#) + .with_raw_request_body(r#"{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["0xdac17f958d2ee523a2206206994597c13d831ec7","latest"],"id":1}"#) + .with_max_response_bytes(2048), + ) + .wait() + .expect_consistent() + .unwrap(); + + assert_eq!(response, 1_u8.into()); + + let rpc_method = || RpcMethod::EthGetTransactionCount.into(); + assert_eq!( + setup.get_metrics(), + Metrics { + requests: hashmap! { + (rpc_method(), CLOUDFLARE_HOSTNAME.into()) => 2, + }, + responses: hashmap! { + (rpc_method(), CLOUDFLARE_HOSTNAME.into(), 200.into()) => 1, + }, + err_http_outcall: hashmap! { + (rpc_method(), CLOUDFLARE_HOSTNAME.into(), RejectionCode::SysFatal) => 1, + }, + ..Default::default() + } + ); +} + pub fn multi_logs_for_single_transaction(num_logs: usize) -> String { let mut logs = Vec::with_capacity(num_logs); for log_index in 0..num_logs {