Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
328 changes: 205 additions & 123 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ http = "1.2.0"
ic-canister-log = "0.2.0"
ic-cdk = "0.17.1"
ic-ed25519 = "0.1.0"
ic-metrics-encoder = "1.1"
ic-sha3 = "1.0.0"
ic-stable-structures = "0.6.7"
ic-test-utilities-load-wasm = { git = "https://github.com/dfinity/ic", tag = "release-2025-01-23_03-04-base" }
Expand Down
1 change: 1 addition & 0 deletions canister/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ futures = { workspace = true }
hex = { workspace = true }
http = { workspace = true }
ic-cdk = { workspace = true }
ic-metrics-encoder = { workspace = true }
ic-sha3 = { workspace = true }
ic-stable-structures = { workspace = true }
maplit = { workspace = true }
Expand Down
44 changes: 37 additions & 7 deletions canister/src/candid_rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,56 @@
use crate::rpc_client::{ReducedResult, SolRpcClient};
use crate::{
add_metric_entry,
metrics::RpcMethod,
providers::get_provider,
rpc_client::{ReducedResult, SolRpcClient},
util::hostname_from_url,
};
use canhttp::multi::ReductionError;
use serde::Serialize;
use sol_rpc_types::{GetSlotParams, MultiRpcResult, RpcConfig, RpcResult, RpcSources};
use sol_rpc_types::{
GetSlotParams, MultiRpcResult, RpcAccess, RpcAuth, RpcConfig, RpcResult, RpcSource, RpcSources,
SupportedRpcProvider,
};
use solana_clock::Slot;
use std::fmt::Debug;

fn process_result<T>(result: ReducedResult<T>) -> MultiRpcResult<T> {
fn process_result<T>(method: RpcMethod, result: ReducedResult<T>) -> MultiRpcResult<T> {
match result {
Ok(value) => MultiRpcResult::Consistent(Ok(value)),
Err(err) => match err {
ReductionError::ConsistentError(err) => MultiRpcResult::Consistent(Err(err)),
ReductionError::InconsistentResults(multi_call_results) => {
let results: Vec<_> = multi_call_results.into_iter().collect();
results.iter().for_each(|(_service, _service_result)| {
// TODO XC-296: Add metrics for inconsistent providers
results.iter().for_each(|(source, _service_result)| {
if let RpcSource::Supported(provider_id) = source {
if let Some(provider) = get_provider(provider_id) {
if let Some(host) = hostname(provider.clone()) {
add_metric_entry!(
inconsistent_responses,
(method.into(), host.into()),
1
)
}
}
}
});
MultiRpcResult::Inconsistent(results)
}
},
}
}

pub fn hostname(provider: SupportedRpcProvider) -> Option<String> {
let url = match provider.access {
RpcAccess::Authenticated { auth, .. } => match auth {
RpcAuth::BearerToken { url } => url,
RpcAuth::UrlParameter { url_pattern } => url_pattern,
},
RpcAccess::Unauthenticated { public_url } => public_url,
};
hostname_from_url(url.as_str())
}

/// Adapt the `EthRpcClient` to the `Candid` interface used by the EVM-RPC canister.
pub struct CandidRpcClient {
client: SolRpcClient,
Expand All @@ -34,7 +64,7 @@ impl CandidRpcClient {
}

pub async fn get_slot(&self, params: GetSlotParams) -> MultiRpcResult<Slot> {
process_result(self.client.get_slot(params).await)
process_result(RpcMethod::GetSlot, self.client.get_slot(params).await)
}

pub async fn raw_request<I>(
Expand All @@ -44,6 +74,6 @@ impl CandidRpcClient {
where
I: Serialize + Clone + Debug,
{
process_result(self.client.raw_request(request).await)
process_result(RpcMethod::Generic, self.client.raw_request(request).await)
}
}
109 changes: 96 additions & 13 deletions canister/src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
mod errors;

use crate::{
add_metric_entry,
constants::{COLLATERAL_CYCLES_PER_NODE, CONTENT_TYPE_VALUE},
http::errors::HttpClientError,
logs::Priority,
state::{next_request_id, read_state, State},
memory::next_request_id,
memory::{read_state, State},
metrics::{MetricRpcHost, MetricRpcMethod},
};
use canhttp::{
convert::ConvertRequestLayer,
http::{
json::{
CreateJsonRpcIdFilter, HttpJsonRpcRequest, HttpJsonRpcResponse, JsonRequestConverter,
ConsistentResponseIdFilterError, CreateJsonRpcIdFilter, HttpJsonRpcRequest,
HttpJsonRpcResponse, Id, JsonRequestConverter, JsonResponseConversionError,
JsonResponseConverter,
},
FilterNonSuccessfulHttpResponse, HttpRequestConverter, HttpResponseConverter,
FilterNonSuccessfulHttpResponse, FilterNonSuccessfulHttpResponseError,
HttpRequestConverter, HttpResponseConverter,
},
observability::ObservabilityLayer,
retry::DoubleMaxResponseBytes,
ConvertServiceBuilder, CyclesAccounting, CyclesChargingPolicy,
ConvertServiceBuilder, CyclesAccounting, CyclesChargingPolicy, IcError,
};
use canlog::log;
use http::{header::CONTENT_TYPE, HeaderValue};
Expand All @@ -34,6 +39,7 @@ use tower::{
use tower_http::{set_header::SetRequestHeaderLayer, ServiceBuilderExt};

pub fn http_client<I, O>(
rpc_method: MetricRpcMethod,
retry: bool,
) -> impl Service<HttpJsonRpcRequest<I>, Response = HttpJsonRpcResponse<O>, Error = RpcError>
where
Expand All @@ -54,25 +60,91 @@ where
.map_err(|e: HttpClientError| RpcError::from(e))
.option_layer(maybe_retry)
.option_layer(maybe_unique_id)
// TODO XC-296: Flesh out observability layer
.layer(
ObservabilityLayer::new()
.on_request(move |req: &HttpJsonRpcRequest<I>| {
log!(
Priority::TraceHttp,
"JSON-RPC request with id `{}` to {}: {:?}",
req.body().id().clone(),
req.uri().host().unwrap().to_string(),
let req_data = MetricData {
method: rpc_method.clone(),
host: MetricRpcHost(req.uri().host().unwrap().to_string()),
request_id: req.body().id().clone(),
};
add_metric_entry!(
requests,
(req_data.method.clone(), req_data.host.clone()),
1
);
log!(Priority::TraceHttp, "JSON-RPC request with id `{}` to {}: {:?}",
req_data.request_id,
req_data.host.0,
req.body()
);
req_data
})
.on_response(|_req_data: (), response: &HttpJsonRpcResponse<O>| {
.on_response(|req_data: MetricData, response: &HttpJsonRpcResponse<O>| {
observe_response(req_data.method, req_data.host, response.status().as_u16());
log!(
Priority::TraceHttp,
"JSON-RPC response: {:?}",
"Got response for request with id `{}`. Response with status {}: {:?}",
req_data.request_id,
response.status(),
response.body()
);
}),
})
.on_error(
|req_data: MetricData, error: &HttpClientError| match error {
HttpClientError::IcError(IcError { code, message: _ }) => {
add_metric_entry!(
err_http_outcall,
(req_data.method, req_data.host, *code),
1
);
}
HttpClientError::UnsuccessfulHttpResponse(
FilterNonSuccessfulHttpResponseError::UnsuccessfulResponse(response),
) => {
observe_response(
req_data.method,
req_data.host,
response.status().as_u16(),
);
log!(
Priority::TraceHttp,
"Unsuccessful HTTP response for request with id `{}`. Response with status {}: {}",
req_data.request_id,
response.status(),
String::from_utf8_lossy(response.body())
);
}
HttpClientError::InvalidJsonResponse(
JsonResponseConversionError::InvalidJsonResponse {
status,
body: _,
parsing_error: _,
},
) => {
observe_response(req_data.method, req_data.host, *status);
log!(
Priority::TraceHttp,
"Invalid JSON RPC response for request with id `{}`: {}",
req_data.request_id,
error
);
}
HttpClientError::InvalidJsonResponseId(ConsistentResponseIdFilterError::InconsistentId { status, request_id: _, response_id: _ }) => {
observe_response(req_data.method, req_data.host, *status);
log!(
Priority::TraceHttp,
"Invalid JSON RPC response for request with id `{}`: {}",
req_data.request_id,
error
);
}
HttpClientError::NotHandledError(e) => {
log!(Priority::Info, "BUG: Unexpected error: {}", e);
}
HttpClientError::CyclesAccountingError(_) => {}
},
),
)
.filter_response(CreateJsonRpcIdFilter::new())
.layer(service_request_builder())
Expand All @@ -92,6 +164,17 @@ fn generate_request_id<I>(request: HttpJsonRpcRequest<I>) -> HttpJsonRpcRequest<
http::Request::from_parts(parts, body)
}

fn observe_response(method: MetricRpcMethod, host: MetricRpcHost, status: u16) {
let status: u32 = status as u32;
add_metric_entry!(responses, (method, host, status.into()), 1);
}

struct MetricData {
method: MetricRpcMethod,
host: MetricRpcHost,
request_id: Id,
}

type JsonRpcServiceBuilder<I> = ServiceBuilder<
Stack<
ConvertRequestLayer<HttpRequestConverter>,
Expand Down
3 changes: 2 additions & 1 deletion canister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ pub mod http;
pub mod http_types;
pub mod lifecycle;
pub mod logs;
pub mod memory;
pub mod metrics;
pub mod providers;
pub mod rpc_client;
pub mod state;
pub mod types;
pub mod util;
pub mod validate;
2 changes: 1 addition & 1 deletion canister/src/lifecycle/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
logs::Priority,
state::{init_state, mutate_state, State},
memory::{init_state, mutate_state, State},
};
use canlog::log;
use sol_rpc_types::InstallArgs;
Expand Down
2 changes: 1 addition & 1 deletion canister/src/logs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::state::read_state;
use crate::memory::read_state;
use canlog::{GetLogFilter, LogFilter, LogPriorityLevels};
use serde::{Deserialize, Serialize};
use std::str::FromStr;
Expand Down
19 changes: 18 additions & 1 deletion canister/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use candid::candid_method;
use canhttp::http::json::JsonRpcRequest;
use canlog::{log, Log, Sort};
use ic_cdk::{api::is_controller, query, update};
use ic_metrics_encoder::MetricsEncoder;
use sol_rpc_canister::{
candid_rpc::CandidRpcClient,
http_types, lifecycle,
logs::Priority,
memory::{mutate_state, read_state},
metrics::encode_metrics,
providers::{get_provider, PROVIDERS},
state::{mutate_state, read_state},
};
use sol_rpc_types::{
GetSlotParams, MultiRpcResult, RpcAccess, RpcConfig, RpcError, RpcSources,
Expand Down Expand Up @@ -113,6 +115,21 @@ async fn request(
#[query(hidden = true)]
fn http_request(request: http_types::HttpRequest) -> http_types::HttpResponse {
match request.path() {
"/metrics" => {
let mut writer = MetricsEncoder::new(vec![], ic_cdk::api::time() as i64 / 1_000_000);

match encode_metrics(&mut writer) {
Ok(()) => http_types::HttpResponseBuilder::ok()
.header("Content-Type", "text/plain; version=0.0.4")
.with_body_and_content_length(writer.into_inner())
.build(),
Err(err) => http_types::HttpResponseBuilder::server_error(format!(
"Failed to encode metrics: {}",
err
))
.build(),
}
}
"/logs" => {
let max_skip_timestamp = match request.raw_query_param("time") {
Some(arg) => match u64::from_str(arg) {
Expand Down
Loading