diff --git a/Cargo.lock b/Cargo.lock index e3887b3e..eac584a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -602,9 +602,11 @@ dependencies = [ [[package]] name = "canhttp" version = "0.1.0" -source = "git+https://github.com/dfinity/evm-rpc-canister?rev=1aeeca3bdcb86ce4493b71e4117f65ab78475396#1aeeca3bdcb86ce4493b71e4117f65ab78475396" +source = "git+https://github.com/dfinity/evm-rpc-canister?rev=b976abe57379be7d2649f6a31522d0bb25c5f455#b976abe57379be7d2649f6a31522d0bb25c5f455" dependencies = [ "assert_matches", + "ciborium", + "futures-channel", "futures-util", "http 1.3.1", "ic-cdk", @@ -612,6 +614,7 @@ dependencies = [ "pin-project", "serde", "serde_json", + "sha2 0.10.8", "thiserror 2.0.12", "tower", "tower-layer", @@ -4040,6 +4043,7 @@ dependencies = [ "async-trait", "candid", "canlog", + "const_format", "futures", "ic-cdk", "ic-test-utilities-load-wasm", diff --git a/Cargo.toml b/Cargo.toml index 29e1d842..b261b732 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ assert_matches = "1.5.0" async-trait = "0.1.88" candid = "0.10.13" candid_parser = "0.1.4" -canhttp = { git = "https://github.com/dfinity/evm-rpc-canister", rev = "1aeeca3bdcb86ce4493b71e4117f65ab78475396" } +canhttp = { git = "https://github.com/dfinity/evm-rpc-canister", rev = "b976abe57379be7d2649f6a31522d0bb25c5f455" } ciborium = "0.2.2" # Transitive dependency of ic-ed25519 # See https://forum.dfinity.org/t/module-imports-function-wbindgen-describe-from-wbindgen-placeholder-that-is-not-exported-by-the-runtime/11545/8 diff --git a/canister/Cargo.toml b/canister/Cargo.toml index af8bc78c..ba43410c 100644 --- a/canister/Cargo.toml +++ b/canister/Cargo.toml @@ -15,7 +15,7 @@ path = "src/main.rs" [dependencies] assert_matches = { workspace = true } candid = { workspace = true } -canhttp = { workspace = true, features = ["json"] } +canhttp = { workspace = true, features = ["json", "multi"] } canlog = { path = "../canlog", features = ["derive"] } ciborium = { workspace = true } const_format = { workspace = true } diff --git a/canister/sol_rpc_canister.did b/canister/sol_rpc_canister.did index f6a1fdce..edc76822 100644 --- a/canister/sol_rpc_canister.did +++ b/canister/sol_rpc_canister.did @@ -153,6 +153,12 @@ type GetSlotParams = record { // Represents the result of a generic RPC request. type RequestResult = variant { Ok : text; Err : RpcError }; +// Represents an aggregated result from multiple RPC calls for a generic RPC request. +type MultiRequestResult = variant { + Consistent : RequestResult; + Inconsistent : vec record { RpcSource; RequestResult }; +}; + // A string used as a regex pattern. type Regex = text; @@ -210,5 +216,5 @@ service : (InstallArgs,) -> { getSlot : (RpcSources, opt RpcConfig, opt GetSlotParams) -> (MultiGetSlotResult); // Make a generic RPC request that sends the given json_rpc_payload. - request : (RpcSource, json_rpc_paylod: text, max_response_bytes: nat64) -> (RequestResult) + request : (RpcSources, opt RpcConfig, json_rpc_paylod: text) -> (MultiRequestResult) }; diff --git a/canister/src/candid_rpc/mod.rs b/canister/src/candid_rpc/mod.rs index 03cceaa8..0d03efe9 100644 --- a/canister/src/candid_rpc/mod.rs +++ b/canister/src/candid_rpc/mod.rs @@ -1,17 +1,17 @@ -use crate::{ - rpc_client::{MultiCallError, SolRpcClient}, - types::MultiRpcResult, -}; -use sol_rpc_types::{GetSlotParams, RpcConfig, RpcResult, RpcSources}; +use crate::rpc_client::{ReducedResult, SolRpcClient}; +use canhttp::multi::ReductionError; +use serde::Serialize; +use sol_rpc_types::{GetSlotParams, MultiRpcResult, RpcConfig, RpcResult, RpcSources}; use solana_clock::Slot; +use std::fmt::Debug; -fn process_result(result: Result>) -> MultiRpcResult { +fn process_result(result: ReducedResult) -> MultiRpcResult { match result { Ok(value) => MultiRpcResult::Consistent(Ok(value)), Err(err) => match err { - MultiCallError::ConsistentError(err) => MultiRpcResult::Consistent(Err(err)), - MultiCallError::InconsistentResults(multi_call_results) => { - let results = multi_call_results.into_vec(); + ReductionError::ConsistentError(err) => MultiRpcResult::Consistent(Err(err)), + ReductionError::InconsistentResults(multi_call_results) => { + let results: Vec<_> = multi_call_results.into_iter().collect(); results.iter().for_each(|(_service, _service_result)| { // TODO XC-296: Add metrics for inconsistent providers }); @@ -36,4 +36,14 @@ impl CandidRpcClient { pub async fn get_slot(&self, params: GetSlotParams) -> MultiRpcResult { process_result(self.client.get_slot(params).await) } + + pub async fn raw_request( + &self, + request: canhttp::http::json::JsonRpcRequest, + ) -> MultiRpcResult + where + I: Serialize + Clone + Debug, + { + process_result(self.client.raw_request(request).await) + } } diff --git a/canister/src/main.rs b/canister/src/main.rs index aad52cb9..4db70a1e 100644 --- a/canister/src/main.rs +++ b/canister/src/main.rs @@ -1,4 +1,5 @@ use candid::candid_method; +use canhttp::http::json::JsonRpcRequest; use canlog::{log, Log, Sort}; use ic_cdk::{api::is_controller, query, update}; use sol_rpc_canister::{ @@ -6,12 +7,11 @@ use sol_rpc_canister::{ http_types, lifecycle, logs::Priority, providers::{get_provider, PROVIDERS}, - rpc_client, state::{mutate_state, read_state}, }; use sol_rpc_types::{ - GetSlotParams, MultiRpcResult, RpcAccess, RpcConfig, RpcError, RpcResult, RpcSource, - RpcSources, SupportedRpcProvider, SupportedRpcProviderId, + GetSlotParams, MultiRpcResult, RpcAccess, RpcConfig, RpcError, RpcSources, + SupportedRpcProvider, SupportedRpcProviderId, }; use solana_clock::Slot; use std::str::FromStr; @@ -80,7 +80,7 @@ async fn get_slot( params: Option, ) -> MultiRpcResult { match CandidRpcClient::new(source, config) { - Ok(client) => client.get_slot(params.unwrap_or_default()).await.into(), + Ok(client) => client.get_slot(params.unwrap_or_default()).await, Err(err) => Err(err).into(), } } @@ -88,16 +88,26 @@ async fn get_slot( #[update] #[candid_method] async fn request( - provider: RpcSource, + source: RpcSources, + config: Option, json_rpc_payload: String, - max_response_bytes: u64, -) -> RpcResult { - let request: canhttp::http::json::JsonRpcRequest = - serde_json::from_str(&json_rpc_payload) - .map_err(|e| RpcError::ValidationError(format!("Invalid JSON RPC request: {e}")))?; - rpc_client::call(&provider, request, max_response_bytes) - .await - .map(|value: serde_json::Value| value.to_string()) +) -> MultiRpcResult { + let request: JsonRpcRequest = match serde_json::from_str(&json_rpc_payload) { + Ok(req) => req, + Err(e) => { + return Err(RpcError::ValidationError(format!( + "Invalid JSON RPC request: {e}" + ))) + .into() + } + }; + match CandidRpcClient::new(source, config) { + Ok(client) => client + .raw_request(request) + .await + .map(|value| value.to_string()), + Err(err) => Err(err).into(), + } } #[query(hidden = true)] diff --git a/canister/src/rpc_client/mod.rs b/canister/src/rpc_client/mod.rs index 745def9c..98a31a6c 100644 --- a/canister/src/rpc_client/mod.rs +++ b/canister/src/rpc_client/mod.rs @@ -3,41 +3,27 @@ mod sol_rpc; mod tests; use crate::{ + http::http_client, logs::Priority, - providers::Providers, + providers::{request_builder, resolve_rpc_provider, Providers}, rpc_client::sol_rpc::{ResponseSizeEstimate, ResponseTransform, HEADER_SIZE_LIMIT}, + state::read_state, +}; +use canhttp::{ + http::json::JsonRpcRequest, + multi::{MultiResults, Reduce, ReduceWithEquality, ReduceWithThreshold}, + MaxResponseBytesRequestExtension, TransformContextRequestExtension, }; -use canhttp::http::json::JsonRpcRequest; use canlog::log; +use ic_cdk::api::management_canister::http_request::TransformContext; use serde::{de::DeserializeOwned, Serialize}; use sol_rpc_types::{ - ConsensusStrategy, GetSlotParams, ProviderError, RpcConfig, RpcError, RpcResult, RpcSource, + ConsensusStrategy, GetSlotParams, JsonRpcError, ProviderError, RpcConfig, RpcError, RpcSource, RpcSources, }; use solana_clock::Slot; -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::Debug, -}; - -pub async fn call( - provider: &RpcSource, - request: JsonRpcRequest, - max_response_size: u64, -) -> Result -where - I: Serialize + Clone + Debug, - O: Debug + DeserializeOwned, -{ - sol_rpc::call::<_, _>( - false, - provider, - request, - ResponseSizeEstimate::new(max_response_size), - &Some(ResponseTransform::Raw), - ) - .await -} +use std::{collections::BTreeSet, fmt::Debug}; +use tower::ServiceExt; #[derive(Clone, Debug, PartialEq, Eq)] pub struct SolRpcClient { @@ -63,12 +49,14 @@ impl SolRpcClient { ResponseSizeEstimate::new(self.config.response_size_estimate.unwrap_or(estimate)) } - fn consensus_strategy(&self) -> ConsensusStrategy { - self.config - .response_consensus - .as_ref() - .cloned() - .unwrap_or_default() + fn reduction_strategy(&self) -> ReductionStrategy { + ReductionStrategy::from( + self.config + .response_consensus + .as_ref() + .cloned() + .unwrap_or_default(), + ) } /// Query all providers in parallel and return all results. @@ -76,6 +64,11 @@ impl SolRpcClient { /// (e.g., if different providers gave different responses). /// This method is useful for querying data that is critical for the system to ensure that /// there is no single point of failure. + /// Query all providers in parallel and return all results. + /// It's up to the caller to decide how to handle the results, which could be inconsistent + /// (e.g., if different providers gave different responses). + /// This method is useful for querying data that is critical for the system to ensure that there is no single point of failure, + /// e.g., ethereum logs upon which ckETH will be minted. async fn parallel_call( &self, method: impl Into, @@ -88,33 +81,61 @@ impl SolRpcClient { O: Debug + DeserializeOwned, { let providers = self.providers(); - let request = JsonRpcRequest::new(method, params); - let results = { - let mut fut = Vec::with_capacity(providers.len()); - for provider in providers { - log!( - Priority::Debug, - "[parallel_call]: will call provider: {:?}", - provider - ); - fut.push(async { - sol_rpc::call::<_, _>( - true, - provider, - request.clone(), - response_size_estimate, - response_transform, - ) - .await - }); - } - futures::future::join_all(fut).await - }; - MultiCallResults::from_non_empty_iter(providers.iter().cloned().zip(results.into_iter())) + let request_body = JsonRpcRequest::new(method, params); + let effective_size_estimate = response_size_estimate.get(); + let transform_op = response_transform + .as_ref() + .map(|t| { + let mut buf = vec![]; + minicbor::encode(t, &mut buf).unwrap(); + buf + }) + .unwrap_or_default(); + let mut requests = MultiResults::default(); + for provider in providers { + log!( + Priority::Debug, + "[parallel_call]: will call provider: {:?}", + provider + ); + let request = request_builder( + resolve_rpc_provider(provider.clone()), + &read_state(|state| state.get_override_provider()), + ) + .map(|builder| { + builder + .max_response_bytes(effective_size_estimate) + .transform_context(TransformContext::from_name( + "cleanup_response".to_owned(), + transform_op.clone(), + )) + .body(request_body.clone()) + .expect("BUG: invalid request") + }); + requests.insert_once(provider.clone(), request); + } + + let client = http_client(true).map_result(|r| match r?.into_body().into_result() { + Ok(value) => Ok(value), + Err(json_rpc_error) => Err(RpcError::JsonRpcError(JsonRpcError { + code: json_rpc_error.code, + message: json_rpc_error.message, + })), + }); + + let (requests, errors) = requests.into_inner(); + let (_client, mut results) = canhttp::multi::parallel_call(client, requests).await; + results.add_errors(errors); + assert_eq!( + results.len(), + providers.len(), + "BUG: expected 1 result per provider" + ); + results } /// Query the Solana [`getSlot`](https://solana.com/docs/rpc/http/getslot) RPC method. - pub async fn get_slot(&self, params: GetSlotParams) -> Result> { + pub async fn get_slot(&self, params: GetSlotParams) -> ReducedResult { self.parallel_call( "getSlot", vec![params], @@ -122,240 +143,51 @@ impl SolRpcClient { &Some(ResponseTransform::GetSlot), ) .await - .reduce(self.consensus_strategy()) + .reduce(self.reduction_strategy()) } -} -/// Aggregates responses of different providers to the same query. -/// Guaranteed to be non-empty. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct MultiCallResults { - ok_results: BTreeMap, - errors: BTreeMap, -} - -impl Default for MultiCallResults { - fn default() -> Self { - Self::new() + pub async fn raw_request( + &self, + request: JsonRpcRequest, + ) -> ReducedResult + where + I: Serialize + Clone + Debug, + { + self.parallel_call( + request.method(), + request.params(), + self.response_size_estimate(1024 + HEADER_SIZE_LIMIT), + &Some(ResponseTransform::Raw), + ) + .await + .reduce(self.reduction_strategy()) } } -impl MultiCallResults { - pub fn new() -> Self { - Self { - ok_results: BTreeMap::new(), - errors: BTreeMap::new(), - } - } - - pub fn from_non_empty_iter)>>(iter: I) -> Self { - let mut results = Self::new(); - for (provider, result) in iter { - results.insert_once(provider, result); - } - if results.is_empty() { - panic!("BUG: MultiCallResults cannot be empty!") - } - results - } - - fn is_empty(&self) -> bool { - self.ok_results.is_empty() && self.errors.is_empty() - } - - fn insert_once(&mut self, provider: RpcSource, result: RpcResult) { - match result { - Ok(value) => { - assert!(!self.errors.contains_key(&provider)); - assert!(self.ok_results.insert(provider, value).is_none()); - } - Err(error) => { - assert!(!self.ok_results.contains_key(&provider)); - assert!(self.errors.insert(provider, error).is_none()); - } - } - } - - pub fn into_vec(self) -> Vec<(RpcSource, RpcResult)> { - self.ok_results - .into_iter() - .map(|(provider, result)| (provider, Ok(result))) - .chain( - self.errors - .into_iter() - .map(|(provider, error)| (provider, Err(error))), - ) - .collect() - } - - fn group_errors(&self) -> BTreeMap<&RpcError, BTreeSet<&RpcSource>> { - let mut errors: BTreeMap<_, _> = BTreeMap::new(); - for (provider, error) in self.errors.iter() { - errors - .entry(error) - .or_insert_with(BTreeSet::new) - .insert(provider); - } - errors - } +pub enum ReductionStrategy { + ByEquality(ReduceWithEquality), + ByThreshold(ReduceWithThreshold), } -impl MultiCallResults { - /// Expects all results to be ok or return the following error: - /// * MultiCallError::ConsistentError: all errors are the same and there is no ok results. - /// * MultiCallError::InconsistentResults: in all other cases. - fn all_ok(self) -> Result, MultiCallError> { - if self.errors.is_empty() { - return Ok(self.ok_results); - } - Err(self.expect_error()) - } - - fn expect_error(self) -> MultiCallError { - let errors = self.group_errors(); - match errors.len() { - 0 => { - panic!("BUG: errors should be non-empty") +impl From for ReductionStrategy { + fn from(value: ConsensusStrategy) -> Self { + match value { + ConsensusStrategy::Equality => ReductionStrategy::ByEquality(ReduceWithEquality), + ConsensusStrategy::Threshold { total: _, min } => { + ReductionStrategy::ByThreshold(ReduceWithThreshold::new(min)) } - 1 if self.ok_results.is_empty() => { - MultiCallError::ConsistentError(errors.into_keys().next().unwrap().clone()) - } - _ => MultiCallError::InconsistentResults(self), } } } -#[derive(Debug, PartialEq, Eq)] -pub enum MultiCallError { - ConsistentError(RpcError), - InconsistentResults(MultiCallResults), -} - -impl MultiCallResults { - pub fn reduce(self, strategy: ConsensusStrategy) -> Result> { - match strategy { - ConsensusStrategy::Equality => self.reduce_with_equality(), - ConsensusStrategy::Threshold { total: _, min } => self.reduce_with_threshold(min), - } - } - - fn reduce_with_equality(self) -> Result> { - let mut results = self.all_ok()?.into_iter(); - let (base_node_provider, base_result) = results - .next() - .expect("BUG: MultiCallResults is guaranteed to be non-empty"); - let mut inconsistent_results: Vec<_> = results - .filter(|(_provider, result)| result != &base_result) - .collect(); - if !inconsistent_results.is_empty() { - inconsistent_results.push((base_node_provider, base_result)); - let error = MultiCallError::InconsistentResults(MultiCallResults::from_non_empty_iter( - inconsistent_results - .into_iter() - .map(|(provider, result)| (provider, Ok(result))), - )); - log!( - Priority::Info, - "[reduce_with_equality]: inconsistent results {error:?}" - ); - return Err(error); +impl Reduce for ReductionStrategy { + fn reduce(&self, results: MultiResults) -> ReducedResult { + match self { + ReductionStrategy::ByEquality(r) => r.reduce(results), + ReductionStrategy::ByThreshold(r) => r.reduce(results), } - Ok(base_result) - } - - fn reduce_with_threshold(self, min: u8) -> Result> { - assert!(min > 0, "BUG: min must be greater than 0"); - if self.ok_results.len() < min as usize { - // At least total >= min were queried, - // so there is at least one error - return Err(self.expect_error()); - } - let distribution = ResponseDistribution::from_non_empty_iter(self.ok_results.clone()); - let (most_likely_response, providers) = distribution - .most_frequent() - .expect("BUG: distribution should be non-empty"); - if providers.len() >= min as usize { - Ok(most_likely_response.clone()) - } else { - log!( - Priority::Info, - "[reduce_with_threshold]: too many inconsistent ok responses to reach threshold of {min}, results: {self:?}" - ); - Err(MultiCallError::InconsistentResults(self)) - } - } -} - -/// Distribution of responses observed from different providers. -/// -/// From the API point of view, it emulates a map from a response instance to a set of providers that returned it. -/// At the implementation level, to avoid requiring `T` to have a total order (i.e., must implements `Ord` if it were to be used as keys in a `BTreeMap`) which might not always be meaningful, -/// we use as key the hash of the serialized response instance. -struct ResponseDistribution { - hashes: BTreeMap<[u8; 32], T>, - responses: BTreeMap<[u8; 32], BTreeSet>, -} - -impl Default for ResponseDistribution { - fn default() -> Self { - Self::new() } } -impl ResponseDistribution { - pub fn new() -> Self { - Self { - hashes: BTreeMap::new(), - responses: BTreeMap::new(), - } - } - - /// Returns the most frequent response and the set of providers that returned it. - pub fn most_frequent(&self) -> Option<(&T, &BTreeSet)> { - self.responses - .iter() - .max_by_key(|(_hash, providers)| providers.len()) - .map(|(hash, providers)| { - ( - self.hashes.get(hash).expect("BUG: hash should be present"), - providers, - ) - }) - } -} - -impl ResponseDistribution { - pub fn from_non_empty_iter>(iter: I) -> Self { - let mut distribution = Self::new(); - for (provider, result) in iter { - distribution.insert_once(provider, result); - } - distribution - } - - pub fn insert_once(&mut self, provider: RpcSource, result: T) { - use ic_sha3::Keccak256; - let hash = Keccak256::hash(serde_json::to_vec(&result).expect("BUG: failed to serialize")); - match self.hashes.get(&hash) { - Some(existing_result) => { - assert_eq!( - existing_result, &result, - "BUG: different results once serialized have the same hash" - ); - let providers = self - .responses - .get_mut(&hash) - .expect("BUG: hash is guaranteed to be present"); - assert!( - providers.insert(provider), - "BUG: provider is already present" - ); - } - None => { - assert_eq!(self.hashes.insert(hash, result), None); - let providers = BTreeSet::from_iter(std::iter::once(provider)); - assert_eq!(self.responses.insert(hash, providers), None); - } - } - } -} +pub type MultiCallResults = MultiResults; +pub type ReducedResult = canhttp::multi::ReducedResult; diff --git a/canister/src/rpc_client/sol_rpc/mod.rs b/canister/src/rpc_client/sol_rpc/mod.rs index bd12649f..97b320cc 100644 --- a/canister/src/rpc_client/sol_rpc/mod.rs +++ b/canister/src/rpc_client/sol_rpc/mod.rs @@ -1,23 +1,14 @@ #[cfg(test)] mod tests; -use crate::{ - http::http_client, - providers::{request_builder, resolve_rpc_provider}, - state::read_state, -}; use candid::candid_method; -use canhttp::{ - http::json::{JsonRpcRequest, JsonRpcResponse}, - MaxResponseBytesRequestExtension, TransformContextRequestExtension, -}; +use canhttp::http::json::JsonRpcResponse; use ic_cdk::{ - api::management_canister::http_request::{HttpResponse, TransformArgs, TransformContext}, + api::management_canister::http_request::{HttpResponse, TransformArgs}, query, }; use minicbor::{Decode, Encode}; use serde::{de::DeserializeOwned, Serialize}; -use sol_rpc_types::{JsonRpcError, RpcError, RpcSource}; use solana_clock::Slot; use std::{fmt, fmt::Debug}; @@ -115,51 +106,3 @@ impl fmt::Display for ResponseSizeEstimate { write!(f, "{}", self.0) } } - -/// Calls a JSON-RPC method at the specified URL. -pub async fn call( - retry: bool, - provider: &RpcSource, - request_body: JsonRpcRequest, - response_size_estimate: ResponseSizeEstimate, - response_transform: &Option, -) -> Result -where - I: Serialize + Clone + Debug, - O: Debug + DeserializeOwned, -{ - use tower::Service; - - let transform_op = response_transform - .as_ref() - .map(|t| { - let mut buf = vec![]; - minicbor::encode(t, &mut buf).unwrap(); - buf - }) - .unwrap_or_default(); - - let effective_size_estimate = response_size_estimate.get(); - let request = request_builder( - resolve_rpc_provider(provider.clone()), - &read_state(|state| state.get_override_provider()), - )? - .max_response_bytes(effective_size_estimate) - .transform_context(TransformContext::from_name( - "cleanup_response".to_owned(), - transform_op.clone(), - )) - .body(request_body) - .expect("BUG: invalid request"); - - let mut client = http_client(retry); - let response = client.call(request).await?; - match response.into_body().into_result() { - Ok(r) => Ok(r), - Err(canhttp::http::json::JsonRpcError { - code, - message, - data: _, - }) => Err(JsonRpcError { code, message }.into()), - } -} diff --git a/canister/src/types/mod.rs b/canister/src/types/mod.rs index 970bc8e0..8cfcc081 100644 --- a/canister/src/types/mod.rs +++ b/canister/src/types/mod.rs @@ -3,7 +3,7 @@ mod tests; use crate::{constants::API_KEY_REPLACE_STRING, validate::validate_api_key}; use serde::{Deserialize, Serialize}; -use sol_rpc_types::{RegexSubstitution, RpcEndpoint, RpcResult, RpcSource}; +use sol_rpc_types::{RegexSubstitution, RpcEndpoint}; use std::{fmt, fmt::Debug}; use zeroize::{Zeroize, ZeroizeOnDrop}; @@ -74,69 +74,3 @@ impl OverrideProvider { } } } - -/// Copy of [`sol_rpc_types::MultiRpcResult`] to keep the implementation details out of the -/// [`sol_rpc_types`] crate. -pub enum MultiRpcResult { - Consistent(RpcResult), - Inconsistent(Vec<(RpcSource, RpcResult)>), -} - -impl MultiRpcResult { - pub fn map(self, mut f: impl FnMut(T) -> R) -> MultiRpcResult { - match self { - MultiRpcResult::Consistent(result) => MultiRpcResult::Consistent(result.map(f)), - MultiRpcResult::Inconsistent(results) => MultiRpcResult::Inconsistent( - results - .into_iter() - .map(|(service, result)| { - ( - service, - match result { - Ok(ok) => Ok(f(ok)), - Err(err) => Err(err), - }, - ) - }) - .collect(), - ), - } - } -} - -impl MultiRpcResult { - pub fn expect_consistent(self) -> RpcResult { - match self { - MultiRpcResult::Consistent(result) => result, - MultiRpcResult::Inconsistent(inconsistent_result) => { - panic!("Expected consistent, but got: {:?}", inconsistent_result) - } - } - } - - pub fn expect_inconsistent(self) -> Vec<(RpcSource, RpcResult)> { - match self { - MultiRpcResult::Consistent(consistent_result) => { - panic!("Expected inconsistent:, but got: {:?}", consistent_result) - } - MultiRpcResult::Inconsistent(results) => results, - } - } -} - -impl From> for MultiRpcResult { - fn from(result: RpcResult) -> Self { - MultiRpcResult::Consistent(result) - } -} - -impl From> for sol_rpc_types::MultiRpcResult { - fn from(value: MultiRpcResult) -> Self { - match value { - MultiRpcResult::Consistent(result) => sol_rpc_types::MultiRpcResult::Consistent(result), - MultiRpcResult::Inconsistent(result) => { - sol_rpc_types::MultiRpcResult::Inconsistent(result) - } - } - } -} diff --git a/integration_tests/Cargo.toml b/integration_tests/Cargo.toml index f01313aa..e3b8235a 100644 --- a/integration_tests/Cargo.toml +++ b/integration_tests/Cargo.toml @@ -11,6 +11,7 @@ license.workspace = true async-trait = { workspace = true } candid = { workspace = true } canlog = { path = "../canlog" } +const_format = { workspace = true } ic-cdk = { workspace = true } ic-test-utilities-load-wasm = { workspace = true } pocket-ic = { workspace = true } diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 81bcc50b..661492d6 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -17,9 +17,12 @@ use sol_rpc_canister::{ logs::Priority, }; use sol_rpc_client::{Runtime, SolRpcClient}; -use sol_rpc_types::{InstallArgs, SupportedRpcProviderId}; -use std::env::var; -use std::{env::set_var, path::PathBuf, time::Duration}; +use sol_rpc_types::{InstallArgs, RpcSources, SolanaCluster, SupportedRpcProviderId}; +use std::{ + env::{set_var, var}, + path::PathBuf, + time::Duration, +}; pub mod mock; use mock::MockOutcall; @@ -118,11 +121,19 @@ impl Setup { } pub fn client(&self) -> SolRpcClient { - SolRpcClient::new(self.new_pocket_ic(), self.sol_rpc_canister_id) + SolRpcClient::new( + self.new_pocket_ic(), + self.sol_rpc_canister_id, + RpcSources::Default(SolanaCluster::Devnet), + ) } pub fn client_live_mode(&self) -> SolRpcClient { - SolRpcClient::new(self.new_live_pocket_ic(), self.sol_rpc_canister_id) + SolRpcClient::new( + self.new_live_pocket_ic(), + self.sol_rpc_canister_id, + RpcSources::Default(SolanaCluster::Devnet), + ) } fn new_pocket_ic(&self) -> PocketIcRuntime { @@ -292,6 +303,11 @@ impl PocketIcRuntime<'_> { Some(MockStrategy::MockOnce(mock)) => { self.mock_http_once_inner(mock).await; } + Some(MockStrategy::MockSequence(mocks)) => { + for mock in mocks { + self.mock_http_once_inner(mock).await; + } + } } } @@ -454,6 +470,7 @@ pub trait SolRpcTestClient { async fn retrieve_logs(&self, priority: &str) -> Vec>; fn mock_http(self, mock: impl Into) -> Self; fn mock_http_once(self, mock: impl Into) -> Self; + fn mock_http_sequence(self, mocks: Vec>) -> Self; } #[async_trait] @@ -497,12 +514,37 @@ impl SolRpcTestClient> for SolRpcClient> ..self } } + + fn mock_http_sequence(self, mocks: Vec>) -> Self { + Self { + runtime: self.runtime.with_strategy(MockStrategy::MockSequence( + mocks.into_iter().map(|mock| mock.into()).collect(), + )), + ..self + } + } +} + +pub fn json_rpc_sequential_id( + response: serde_json::Value, +) -> [serde_json::Value; N] { + let first_id = response["id"].as_u64().expect("missing request ID"); + let mut requests = Vec::with_capacity(N); + requests.push(response.clone()); + for i in 1..N { + let mut next_request = response.clone(); + let new_id = first_id + i as u64; + *next_request.get_mut("id").unwrap() = serde_json::Value::Number(new_id.into()); + requests.push(next_request); + } + requests.try_into().unwrap() } #[derive(Clone, Debug)] enum MockStrategy { Mock(MockOutcall), MockOnce(MockOutcall), + MockSequence(Vec), } /// Argument to the wallet canister `wallet_call128` method. diff --git a/integration_tests/tests/tests.rs b/integration_tests/tests/tests.rs index 1fe2e5ee..ab62f1a8 100644 --- a/integration_tests/tests/tests.rs +++ b/integration_tests/tests/tests.rs @@ -1,22 +1,29 @@ +use assert_matches::*; +use const_format::formatcp; +use ic_cdk::api::management_canister::http_request::HttpHeader; +use pocket_ic::common::rest::CanisterHttpMethod; +use serde_json::json; use sol_rpc_canister::constants::*; -use sol_rpc_int_tests::{Setup, SolRpcTestClient, DEFAULT_CALLER_TEST_ID}; +use sol_rpc_int_tests::{ + json_rpc_sequential_id, mock::MockOutcallBuilder, Setup, SolRpcTestClient, + DEFAULT_CALLER_TEST_ID, +}; use sol_rpc_types::{ - InstallArgs, Mode, ProviderError, RpcAccess, RpcAuth, RpcEndpoint, RpcError, RpcSource, - SolanaCluster, SupportedRpcProviderId, + InstallArgs, Mode, ProviderError, RpcAccess, RpcAuth, RpcConfig, RpcEndpoint, RpcError, + RpcSource, RpcSources, SolanaCluster, SupportedRpcProvider, SupportedRpcProviderId, }; const MOCK_REQUEST_URL: &str = "https://api.devnet.solana.com/"; -const MOCK_REQUEST_PAYLOAD: &str = r#"{"jsonrpc":"2.0","id":1,"method":"getVersion"}"#; -const MOCK_REQUEST_RESPONSE: &str = - r#"{"jsonrpc":"2.0","id":1,"result":{"feature-set":2891131721,"solana-core":"1.16.7"}}"#; +const MOCK_REQUEST_PAYLOAD: &str = r#"{"jsonrpc":"2.0","id":0,"method":"getVersion"}"#; +const MOCK_RESPONSE_RESULT: &str = r#"{"feature-set":2891131721,"solana-core":"1.16.7"}"#; +const MOCK_RESPONSE: &str = formatcp!( + "{{\"jsonrpc\":\"2.0\",\"id\":0,\"result\":{}}}", + MOCK_RESPONSE_RESULT +); const MOCK_REQUEST_MAX_RESPONSE_BYTES: u64 = 1000; mod mock_request_tests { use super::*; - use assert_matches::*; - use ic_cdk::api::management_canister::http_request::HttpHeader; - use pocket_ic::common::rest::CanisterHttpMethod; - use sol_rpc_int_tests::mock::*; async fn mock_request(builder_fn: impl Fn(MockOutcallBuilder) -> MockOutcallBuilder) { let setup = Setup::with_args(InstallArgs { @@ -24,29 +31,26 @@ mod mock_request_tests { ..Default::default() }) .await; - let client = setup.client(); - let expected_result: serde_json::Value = - serde_json::from_str(MOCK_REQUEST_RESPONSE).unwrap(); + let client = setup + .client() + .with_rpc_config(RpcConfig { + response_size_estimate: Some(MOCK_REQUEST_MAX_RESPONSE_BYTES), + ..RpcConfig::default() + }) + .with_rpc_sources(RpcSources::Custom(vec![RpcSource::Custom(RpcEndpoint { + url: MOCK_REQUEST_URL.to_string(), + headers: Some(vec![HttpHeader { + name: "custom".to_string(), + value: "Value".to_string(), + }]), + })])); + let expected_result: serde_json::Value = serde_json::from_str(MOCK_RESPONSE).unwrap(); assert_matches!( client - .mock_http(builder_fn(MockOutcallBuilder::new( - 200, - MOCK_REQUEST_RESPONSE - ))) - .request( - RpcSource::Custom(RpcEndpoint { - url: MOCK_REQUEST_URL.to_string(), - headers: Some(vec![HttpHeader { - name: "custom".to_string(), - value: "Value".to_string(), - }]), - }), - MOCK_REQUEST_PAYLOAD, - MOCK_REQUEST_MAX_RESPONSE_BYTES, - 0, - ) + .mock_http(builder_fn(MockOutcallBuilder::new(200, MOCK_RESPONSE))) + .request(MOCK_REQUEST_PAYLOAD, 0) .await, - Ok(msg) if msg == serde_json::Value::to_string(&expected_result["result"]) + sol_rpc_types::MultiRpcResult::Consistent(Ok(msg)) if msg == serde_json::Value::to_string(&expected_result["result"]) ); } @@ -105,7 +109,6 @@ mod mock_request_tests { mod get_provider_tests { use super::*; - use sol_rpc_types::SupportedRpcProvider; #[tokio::test] async fn should_get_providers() { @@ -139,36 +142,40 @@ mod get_provider_tests { mod generic_request_tests { use super::*; - use assert_matches::*; - use sol_rpc_int_tests::mock::MockOutcallBuilder; + use std::str::FromStr; #[tokio::test] async fn request_should_require_cycles() { let setup = Setup::new().await; let client = setup.client(); - let result = client - .request( - RpcSource::Supported(SupportedRpcProviderId::AlchemyMainnet), - MOCK_REQUEST_PAYLOAD, - MOCK_REQUEST_MAX_RESPONSE_BYTES, - 0, - ) - .await; - - assert_matches!( - result, - Err(RpcError::ProviderError(ProviderError::TooFewCycles { - expected: _, - received: 0 - })) - ); + let results = client + .request(MOCK_REQUEST_PAYLOAD, 0) + .await + // The result is expected to be inconsistent because the different provider URLs means + // the request and hence expected number of cycles for each provider is different. + .expect_inconsistent(); + + for (_provider, result) in results { + assert_matches!( + result, + Err(RpcError::ProviderError(ProviderError::TooFewCycles { + expected: _, + received: 0 + })) + ); + } setup.drop().await; } #[tokio::test] async fn request_should_succeed_in_demo_mode() { + let [response_0, response_1, response_2] = json_rpc_sequential_id(json!({ + "id": 0, + "jsonrpc": "2.0", + "result": serde_json::Value::from_str(MOCK_RESPONSE_RESULT).unwrap() + })); let setup = Setup::with_args(InstallArgs { mode: Some(Mode::Demo), ..Default::default() @@ -177,18 +184,16 @@ mod generic_request_tests { let client = setup.client(); let result = client - .mock_http(MockOutcallBuilder::new(200, MOCK_REQUEST_RESPONSE)) - .request( - RpcSource::Supported(SupportedRpcProviderId::AlchemyMainnet), - MOCK_REQUEST_PAYLOAD, - MOCK_REQUEST_MAX_RESPONSE_BYTES, - 0, - ) - .await; + .mock_http_sequence(vec![ + MockOutcallBuilder::new(200, &response_0), + MockOutcallBuilder::new(200, &response_1), + MockOutcallBuilder::new(200, &response_2), + ]) + .request(MOCK_REQUEST_PAYLOAD, 0) + .await + .expect_consistent(); - let expected_result: serde_json::Value = - serde_json::from_str(MOCK_REQUEST_RESPONSE).unwrap(); - assert_matches!(result, Ok(msg) if msg == serde_json::Value::to_string(&expected_result["result"])); + assert_matches!(result, Ok(msg) if msg == MOCK_RESPONSE_RESULT); setup.drop().await; } diff --git a/libs/client/src/lib.rs b/libs/client/src/lib.rs index 71ad6ab3..d93cb37d 100644 --- a/libs/client/src/lib.rs +++ b/libs/client/src/lib.rs @@ -8,8 +8,7 @@ use candid::{utils::ArgumentEncoder, CandidType, Principal}; use ic_cdk::api::call::RejectionCode; use serde::de::DeserializeOwned; use sol_rpc_types::{ - GetSlotParams, RpcConfig, RpcResult, RpcSource, RpcSources, SolanaCluster, - SupportedRpcProvider, SupportedRpcProviderId, + GetSlotParams, RpcConfig, RpcSources, SupportedRpcProvider, SupportedRpcProviderId, }; use solana_clock::Slot; @@ -50,16 +49,22 @@ pub struct SolRpcClient { pub runtime: R, /// The [`Principal`] of the SOL RPC canister. pub sol_rpc_canister: Principal, + /// Configuration for how to perform RPC HTTP calls. + pub rpc_config: Option, + /// Defines what RPC sources to fetch from. + pub rpc_sources: RpcSources, } impl SolRpcClient { /// Instantiate a new client to be used by a canister on the Internet Computer. /// /// To use another runtime, see [`Self::new`]. - pub fn new_for_ic(sol_rpc_canister: Principal) -> Self { + pub fn new_for_ic(sol_rpc_canister: Principal, rpc_sources: RpcSources) -> Self { Self { runtime: IcRuntime {}, sol_rpc_canister, + rpc_config: None, + rpc_sources, } } } @@ -68,10 +73,28 @@ impl SolRpcClient { /// Instantiate a new client with a specific runtime. /// /// To use the client inside a canister, see [`SolRpcClient::new_for_ic`]. - pub fn new(runtime: R, sol_rpc_canister: Principal) -> Self { + pub fn new(runtime: R, sol_rpc_canister: Principal, rpc_sources: RpcSources) -> Self { Self { runtime, sol_rpc_canister, + rpc_config: None, + rpc_sources, + } + } + + /// Returns a new client with the given [`RpcSources`]. + pub fn with_rpc_sources(self, rpc_sources: RpcSources) -> Self { + SolRpcClient { + rpc_sources, + ..self + } + } + + /// Returns a new client with the given [`RpcConfig`]. + pub fn with_rpc_config(self, rpc_config: RpcConfig) -> Self { + SolRpcClient { + rpc_config: Some(rpc_config), + ..self } } @@ -105,11 +128,7 @@ impl SolRpcClient { .update_call( self.sol_rpc_canister, "getSlot", - ( - RpcSources::Default(SolanaCluster::Devnet), - None::, - params, - ), + (self.rpc_sources.clone(), self.rpc_config.clone(), params), 10_000_000_000, ) .await @@ -119,16 +138,18 @@ impl SolRpcClient { /// Call `request` on the SOL RPC canister. pub async fn request( &self, - service: RpcSource, json_rpc_payload: &str, - max_response_bytes: u64, cycles: u128, - ) -> RpcResult { + ) -> sol_rpc_types::MultiRpcResult { self.runtime .update_call( self.sol_rpc_canister, "request", - (service, json_rpc_payload, max_response_bytes), + ( + self.rpc_sources.clone(), + self.rpc_config.clone(), + json_rpc_payload, + ), cycles, ) .await diff --git a/libs/types/src/response/mod.rs b/libs/types/src/response/mod.rs index 813b3626..7d40eb4e 100644 --- a/libs/types/src/response/mod.rs +++ b/libs/types/src/response/mod.rs @@ -1,6 +1,7 @@ use crate::{RpcResult, RpcSource}; use candid::CandidType; use serde::Deserialize; +use std::fmt::Debug; /// Represents an aggregated result from multiple RPC calls to different RPC providers. /// The results are aggregated using a [`crate::ConsensusStrategy`]. @@ -17,3 +18,46 @@ impl From> for MultiRpcResult { MultiRpcResult::Consistent(result) } } + +impl MultiRpcResult { + /// Maps a [`MultiRpcResult`] containing values of type `T` to a [`MultiRpcResult`] containing + /// values of type `R`. + pub fn map(self, f: F) -> MultiRpcResult + where + F: FnOnce(T) -> R + Clone, + { + match self { + MultiRpcResult::Consistent(result) => MultiRpcResult::Consistent(result.map(f)), + MultiRpcResult::Inconsistent(results) => MultiRpcResult::Inconsistent( + results + .into_iter() + .map(|(source, result)| (source, result.map(f.clone()))) + .collect(), + ), + } + } +} + +impl MultiRpcResult { + /// Returns the contents of a [`MultiRpcResult`] if it is an instance of + /// [`MultiRpcResult::Consistent`] and panics otherwise. + pub fn expect_consistent(self) -> RpcResult { + match self { + MultiRpcResult::Consistent(result) => result, + MultiRpcResult::Inconsistent(inconsistent_result) => { + panic!("Expected consistent, but got: {:?}", inconsistent_result) + } + } + } + + /// Returns the contents of a [`MultiRpcResult`] if it is an instance of + /// [`MultiRpcResult::Inconsistent`] and panics otherwise. + pub fn expect_inconsistent(self) -> Vec<(RpcSource, RpcResult)> { + match self { + MultiRpcResult::Consistent(consistent_result) => { + panic!("Expected inconsistent:, but got: {:?}", consistent_result) + } + MultiRpcResult::Inconsistent(results) => results, + } + } +} diff --git a/libs/types/src/rpc_client/mod.rs b/libs/types/src/rpc_client/mod.rs index a43af238..644cb08f 100644 --- a/libs/types/src/rpc_client/mod.rs +++ b/libs/types/src/rpc_client/mod.rs @@ -201,7 +201,9 @@ pub enum RpcSource { } /// Defines a collection of Solana RPC sources. -#[derive(Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Serialize, Deserialize, CandidType)] +#[derive( + Clone, PartialEq, Eq, Ord, PartialOrd, Hash, Serialize, Deserialize, CandidType, Debug, +)] pub enum RpcSources { /// A collection of [`RpcSource`] (either [`RpcSource::Supported`] or [`RpcSource::Custom`]). Custom(Vec),