Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions client/rpc-servers/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,36 +32,41 @@ use futures::{future::Either, Future};
/// Metrics for RPC middleware
#[derive(Debug, Clone)]
pub struct RpcMetrics {
rpc_calls: CounterVec<U64>,
rpc_calls: Option<CounterVec<U64>>,
}

impl RpcMetrics {
/// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
metrics_registry.and_then(|r| {
Some(RpcMetrics {
rpc_calls: register(CounterVec::new(
Opts::new(
"rpc_calls_total",
"Number of rpc calls received",
),
&["protocol"]
).ok()?, r).ok()?,
})
}).ok_or(PrometheusError::Msg("Cannot register metric".to_string()))
Ok(Self {
rpc_calls: metrics_registry.map(|r|
register(
CounterVec::new(
Opts::new(
"rpc_calls_total",
"Number of rpc calls received",
),
&["protocol"]
)?,
r,
)
).transpose()?,
})
}
}

/// Middleware for RPC calls
pub struct RpcMiddleware {
metrics: Option<RpcMetrics>,
metrics: RpcMetrics,
transport_label: String,
}

impl RpcMiddleware {
/// Create an instance of middleware with provided metrics
/// transport_label is used as a label for Prometheus collector
pub fn new(metrics: Option<RpcMetrics>, transport_label: &str) -> Self {
/// Create an instance of middleware.
///
/// - `metrics`: Will be used to report statistics.
/// - `transport_label`: The label that is used when reporting the statistics.
pub fn new(metrics: RpcMetrics, transport_label: &str) -> Self {
RpcMiddleware {
metrics,
transport_label: String::from(transport_label),
Expand All @@ -78,8 +83,8 @@ impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
F: Fn(Request, M) -> X + Send + Sync,
X: Future<Item = Option<Response>, Error = ()> + Send + 'static,
{
if let Some(ref metrics) = self.metrics {
metrics.rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc();
if let Some(ref rpc_calls) = self.metrics.rpc_calls {
rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc();
}

Either::B(next(request, meta))
Expand Down
6 changes: 3 additions & 3 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,12 +603,12 @@ pub fn spawn_tasks<TBl, TBackend, TExPool, TRpc, TCl>(
on_demand.clone(), remote_blockchain.clone(), &*rpc_extensions_builder,
backend.offchain_storage(), system_rpc_tx.clone()
);
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry()).ok();
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.as_ref())?;
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone())?;
// This is used internally, so don't restrict access to unsafe RPC
let rpc_handlers = RpcHandlers(Arc::new(gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics.as_ref().cloned(), "inbrowser")
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "inbrowser")
).into()));

// Telemetry
Expand Down
10 changes: 5 additions & 5 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ fn start_rpc_servers<
>(
config: &Configuration,
mut gen_handler: H,
rpc_metrics: Option<&sc_rpc_server::RpcMetrics>
rpc_metrics: sc_rpc_server::RpcMetrics,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
fn maybe_start_server<T, F>(address: Option<SocketAddr>, mut start: F) -> Result<Option<T>, io::Error>
where F: FnMut(&SocketAddr) -> Result<T, io::Error>,
Expand Down Expand Up @@ -434,7 +434,7 @@ fn start_rpc_servers<
config.rpc_ipc.as_ref().map(|path| sc_rpc_server::start_ipc(
&*path, gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics.cloned(), "ipc")
sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "ipc")
)
)),
maybe_start_server(
Expand All @@ -444,7 +444,7 @@ fn start_rpc_servers<
config.rpc_cors.as_ref(),
gen_handler(
deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics.cloned(), "http")
sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "http")
),
),
)?.map(|s| waiting::HttpServer(Some(s))),
Expand All @@ -456,7 +456,7 @@ fn start_rpc_servers<
config.rpc_cors.as_ref(),
gen_handler(
deny_unsafe(&address, &config.rpc_methods),
sc_rpc_server::RpcMiddleware::new(rpc_metrics.cloned(), "ws")
sc_rpc_server::RpcMiddleware::new(rpc_metrics.clone(), "ws")
),
),
)?.map(|s| waiting::WsServer(Some(s))),
Expand All @@ -471,7 +471,7 @@ fn start_rpc_servers<
>(
_: &Configuration,
_: H,
_: Option<&sc_rpc_server::RpcMetrics>
_: sc_rpc_server::RpcMetrics,
) -> Result<Box<dyn std::any::Any + Send + Sync>, error::Error> {
Ok(Box::new(()))
}
Expand Down