diff --git a/crates/astria-bridge-withdrawer/src/lib.rs b/crates/astria-bridge-withdrawer/src/lib.rs index 15e25f0c2e..a2af44ab9e 100644 --- a/crates/astria-bridge-withdrawer/src/lib.rs +++ b/crates/astria-bridge-withdrawer/src/lib.rs @@ -1,7 +1,7 @@ pub(crate) mod api; mod build_info; pub(crate) mod config; -pub mod metrics_init; +pub(crate) mod metrics; pub mod withdrawer; pub use build_info::BUILD_INFO; diff --git a/crates/astria-bridge-withdrawer/src/main.rs b/crates/astria-bridge-withdrawer/src/main.rs index 53ef465bd9..a3b87a36df 100644 --- a/crates/astria-bridge-withdrawer/src/main.rs +++ b/crates/astria-bridge-withdrawer/src/main.rs @@ -1,7 +1,6 @@ use std::process::ExitCode; use astria_bridge_withdrawer::{ - metrics_init, Config, Service, BUILD_INFO, @@ -35,8 +34,7 @@ async fn main() -> ExitCode { if !cfg.no_metrics { telemetry_conf = telemetry_conf .metrics_addr(&cfg.metrics_http_listener_addr) - .service_name(env!("CARGO_PKG_NAME")) - .register_metrics(metrics_init::register); + .service_name(env!("CARGO_PKG_NAME")); } if let Err(e) = telemetry_conf diff --git a/crates/astria-bridge-withdrawer/src/metrics.rs b/crates/astria-bridge-withdrawer/src/metrics.rs new file mode 100644 index 0000000000..a409768d8e --- /dev/null +++ b/crates/astria-bridge-withdrawer/src/metrics.rs @@ -0,0 +1,142 @@ +use std::time::Duration; + +use metrics::{ + counter, + describe_counter, + describe_gauge, + describe_histogram, + gauge, + histogram, + Counter, + Gauge, + Histogram, + Unit, +}; +use telemetry::metric_names; + +pub(crate) struct Metrics { + nonce_fetch_count: Counter, + nonce_fetch_failure_count: Counter, + nonce_fetch_latency: Histogram, + current_nonce: Gauge, + sequencer_submission_failure_count: Counter, + sequencer_submission_latency: Histogram, +} + +impl Metrics { + #[must_use] + pub(crate) fn new() -> Self { + describe_counter!( + NONCE_FETCH_COUNT, + Unit::Count, + "The number of times we have attempted to fetch the nonce" + ); + let nonce_fetch_count = counter!(NONCE_FETCH_COUNT); + + describe_counter!( + NONCE_FETCH_FAILURE_COUNT, + Unit::Count, + "The number of times we have failed to fetch the nonce" + ); + let nonce_fetch_failure_count = counter!(NONCE_FETCH_FAILURE_COUNT); + + describe_histogram!( + NONCE_FETCH_LATENCY, + Unit::Seconds, + "The latency of nonce fetch" + ); + let nonce_fetch_latency = histogram!(NONCE_FETCH_LATENCY); + + describe_gauge!(CURRENT_NONCE, Unit::Count, "The current nonce"); + let current_nonce = gauge!(CURRENT_NONCE); + + describe_counter!( + SEQUENCER_SUBMISSION_FAILURE_COUNT, + Unit::Count, + "The number of failed transaction submissions to the sequencer" + ); + let sequencer_submission_failure_count = counter!(SEQUENCER_SUBMISSION_FAILURE_COUNT); + + describe_histogram!( + SEQUENCER_SUBMISSION_LATENCY, + Unit::Seconds, + "The latency of submitting a transaction to the sequencer" + ); + let sequencer_submission_latency = histogram!(SEQUENCER_SUBMISSION_LATENCY); + + Self { + nonce_fetch_count, + nonce_fetch_failure_count, + nonce_fetch_latency, + current_nonce, + sequencer_submission_failure_count, + sequencer_submission_latency, + } + } + + pub(crate) fn increment_nonce_fetch_count(&self) { + self.nonce_fetch_count.increment(1); + } + + pub(crate) fn increment_nonce_fetch_failure_count(&self) { + self.nonce_fetch_failure_count.increment(1); + } + + pub(crate) fn record_nonce_fetch_latency(&self, latency: Duration) { + self.nonce_fetch_latency.record(latency); + } + + pub(crate) fn set_current_nonce(&self, nonce: u32) { + self.current_nonce.set(nonce); + } + + pub(crate) fn record_sequencer_submission_latency(&self, latency: Duration) { + self.sequencer_submission_latency.record(latency); + } + + pub(crate) fn increment_sequencer_submission_failure_count(&self) { + self.sequencer_submission_failure_count.increment(1); + } +} + +metric_names!(pub const METRICS_NAMES: + CURRENT_NONCE, + NONCE_FETCH_COUNT, + NONCE_FETCH_FAILURE_COUNT, + NONCE_FETCH_LATENCY, + SEQUENCER_SUBMISSION_FAILURE_COUNT, + SEQUENCER_SUBMISSION_LATENCY +); + +#[cfg(test)] +mod tests { + use super::{ + CURRENT_NONCE, + NONCE_FETCH_COUNT, + NONCE_FETCH_FAILURE_COUNT, + NONCE_FETCH_LATENCY, + SEQUENCER_SUBMISSION_FAILURE_COUNT, + SEQUENCER_SUBMISSION_LATENCY, + }; + + #[track_caller] + fn assert_const(actual: &'static str, suffix: &str) { + // XXX: hard-code this so the crate name isn't accidentally changed. + const CRATE_NAME: &str = "astria_bridge_withdrawer"; + let expected = format!("{CRATE_NAME}_{suffix}"); + assert_eq!(expected, actual); + } + + #[test] + fn metrics_are_as_expected() { + assert_const(CURRENT_NONCE, "current_nonce"); + assert_const(NONCE_FETCH_COUNT, "nonce_fetch_count"); + assert_const(NONCE_FETCH_FAILURE_COUNT, "nonce_fetch_failure_count"); + assert_const(NONCE_FETCH_LATENCY, "nonce_fetch_latency"); + assert_const( + SEQUENCER_SUBMISSION_FAILURE_COUNT, + "sequencer_submission_failure_count", + ); + assert_const(SEQUENCER_SUBMISSION_LATENCY, "sequencer_submission_latency"); + } +} diff --git a/crates/astria-bridge-withdrawer/src/metrics_init.rs b/crates/astria-bridge-withdrawer/src/metrics_init.rs deleted file mode 100644 index b5892d0955..0000000000 --- a/crates/astria-bridge-withdrawer/src/metrics_init.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! Crate-specific metrics functionality. -//! -//! Registers metrics & lists constants to be used as metric names throughout crate. - -use metrics::{ - describe_counter, - describe_gauge, - describe_histogram, - Unit, -}; -use telemetry::metric_names; - -/// Registers all metrics used by this crate. -pub fn register() { - describe_counter!( - NONCE_FETCH_COUNT, - Unit::Count, - "The number of times we have attempted to fetch the nonce" - ); - describe_counter!( - NONCE_FETCH_FAILURE_COUNT, - Unit::Count, - "The number of times we have failed to fetch the nonce" - ); - describe_histogram!( - NONCE_FETCH_LATENCY, - Unit::Milliseconds, - "The latency of nonce fetch" - ); - describe_gauge!(CURRENT_NONCE, Unit::Count, "The current nonce"); - describe_histogram!( - SEQUENCER_SUBMISSION_LATENCY, - Unit::Milliseconds, - "The latency of submitting a transaction to the sequencer" - ); - describe_counter!( - SEQUENCER_SUBMISSION_FAILURE_COUNT, - Unit::Count, - "The number of failed transaction submissions to the sequencer" - ); -} - -metric_names!(pub const METRICS_NAMES: - CURRENT_NONCE, - NONCE_FETCH_COUNT, - NONCE_FETCH_FAILURE_COUNT, - NONCE_FETCH_LATENCY, - SEQUENCER_SUBMISSION_FAILURE_COUNT, - SEQUENCER_SUBMISSION_LATENCY -); - -#[cfg(test)] -mod tests { - use super::{ - CURRENT_NONCE, - NONCE_FETCH_COUNT, - NONCE_FETCH_FAILURE_COUNT, - NONCE_FETCH_LATENCY, - SEQUENCER_SUBMISSION_FAILURE_COUNT, - SEQUENCER_SUBMISSION_LATENCY, - }; - - #[track_caller] - fn assert_const(actual: &'static str, suffix: &str) { - // XXX: hard-code this so the crate name isn't accidentally changed. - const CRATE_NAME: &str = "astria_bridge_withdrawer"; - let expected = format!("{CRATE_NAME}_{suffix}"); - assert_eq!(expected, actual); - } - - #[test] - fn metrics_are_as_expected() { - assert_const(CURRENT_NONCE, "current_nonce"); - assert_const(NONCE_FETCH_COUNT, "nonce_fetch_count"); - assert_const(NONCE_FETCH_FAILURE_COUNT, "nonce_fetch_failure_count"); - assert_const(NONCE_FETCH_LATENCY, "nonce_fetch_latency"); - assert_const( - SEQUENCER_SUBMISSION_FAILURE_COUNT, - "sequencer_submission_failure_count", - ); - assert_const(SEQUENCER_SUBMISSION_LATENCY, "sequencer_submission_latency"); - } -} diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs b/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs index 9b9c2f6ce1..30219cc743 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/mod.rs @@ -1,6 +1,9 @@ use std::{ net::SocketAddr, - sync::Arc, + sync::{ + Arc, + OnceLock, + }, time::Duration, }; @@ -39,6 +42,7 @@ use self::{ use crate::{ api, config::Config, + metrics::Metrics, }; mod batch; @@ -62,6 +66,9 @@ impl Service { /// /// - If the provided `api_addr` string cannot be parsed as a socket address. pub fn new(cfg: Config) -> eyre::Result<(Self, ShutdownHandle)> { + static METRICS: OnceLock = OnceLock::new(); + let metrics = METRICS.get_or_init(Metrics::new); + let shutdown_handle = ShutdownHandle::new(); let Config { api_addr, @@ -87,6 +94,7 @@ impl Service { state: state.clone(), expected_fee_asset_id: asset::Id::from_denom(&fee_asset_denomination), min_expected_fee_asset_balance: u128::from(min_expected_fee_asset_balance), + metrics, } .build() .wrap_err("failed to initialize submitter")?; diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs index 47f85b69a1..227a34b23f 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/builder.rs @@ -13,9 +13,12 @@ use tokio_util::sync::CancellationToken; use tracing::info; use super::state::State; -use crate::withdrawer::{ - submitter::Batch, - SequencerStartupInfo, +use crate::{ + metrics::Metrics, + withdrawer::{ + submitter::Batch, + SequencerStartupInfo, + }, }; const BATCH_QUEUE_SIZE: usize = 256; @@ -60,6 +63,7 @@ pub(crate) struct Builder { pub(crate) state: Arc, pub(crate) expected_fee_asset_id: asset::Id, pub(crate) min_expected_fee_asset_balance: u128, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -73,6 +77,7 @@ impl Builder { state, expected_fee_asset_id, min_expected_fee_asset_balance, + metrics, } = self; let signer = super::signer::SequencerKey::try_from_path(sequencer_key_path) @@ -98,6 +103,7 @@ impl Builder { startup_tx, expected_fee_asset_id, min_expected_fee_asset_balance, + metrics, }, handle, )) diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs index 55c2522e5e..e40beb9192 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/mod.rs @@ -69,7 +69,10 @@ use super::{ state, SequencerStartupInfo, }; -use crate::withdrawer::ethereum::convert::BridgeUnlockMemo; +use crate::{ + metrics::Metrics, + withdrawer::ethereum::convert::BridgeUnlockMemo, +}; mod builder; mod signer; @@ -86,6 +89,7 @@ pub(super) struct Submitter { startup_tx: oneshot::Sender, expected_fee_asset_id: asset::Id, min_expected_fee_asset_balance: u128, + metrics: &'static Metrics, } impl Submitter { @@ -118,6 +122,7 @@ impl Submitter { &self.sequencer_chain_id, actions, rollup_height, + self.metrics ).await { break Err(e); } @@ -307,12 +312,14 @@ async fn process_batch( sequencer_chain_id: &str, actions: Vec, rollup_height: u64, + metrics: &'static Metrics, ) -> eyre::Result<()> { // get nonce and make unsigned transaction let nonce = get_latest_nonce( sequencer_cometbft_client.clone(), sequencer_key.address, state.clone(), + metrics, ) .await?; debug!(nonce, "fetched latest nonce"); @@ -334,9 +341,14 @@ async fn process_batch( debug!(tx_hash = %telemetry::display::hex(&signed.sha256_of_proto_encoding()), "signed transaction"); // submit transaction and handle response - let rsp = submit_tx(sequencer_cometbft_client.clone(), signed, state.clone()) - .await - .context("failed to submit transaction to to cometbft")?; + let rsp = submit_tx( + sequencer_cometbft_client.clone(), + signed, + state.clone(), + metrics, + ) + .await + .context("failed to submit transaction to to cometbft")?; if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code { error!( abci.code = check_tx_code, @@ -376,9 +388,10 @@ async fn get_latest_nonce( client: sequencer_client::HttpClient, address: Address, state: Arc, + metrics: &'static Metrics, ) -> eyre::Result { debug!("fetching latest nonce from sequencer"); - metrics::counter!(crate::metrics_init::NONCE_FETCH_COUNT).increment(1); + metrics.increment_nonce_fetch_count(); let span = Span::current(); let start = Instant::now(); let retry_config = tryhard::RetryFutureConfig::new(1024) @@ -388,7 +401,7 @@ async fn get_latest_nonce( |attempt, next_delay: Option, err: &sequencer_client::extension_trait::Error| { - metrics::counter!(crate::metrics_init::NONCE_FETCH_FAILURE_COUNT).increment(1); + metrics.increment_nonce_fetch_failure_count(); let state = Arc::clone(&state); state.set_sequencer_connected(false); @@ -417,7 +430,7 @@ async fn get_latest_nonce( state.set_sequencer_connected(res.is_ok()); - metrics::histogram!(crate::metrics_init::NONCE_FETCH_LATENCY).record(start.elapsed()); + metrics.record_nonce_fetch_latency(start.elapsed()); res } @@ -435,9 +448,10 @@ async fn submit_tx( client: sequencer_client::HttpClient, tx: SignedTransaction, state: Arc, + metrics: &'static Metrics, ) -> eyre::Result { let nonce = tx.nonce(); - metrics::gauge!(crate::metrics_init::CURRENT_NONCE).set(f64::from(nonce)); + metrics.set_current_nonce(nonce); let start = std::time::Instant::now(); debug!("submitting signed transaction to sequencer"); let span = Span::current(); @@ -448,8 +462,7 @@ async fn submit_tx( |attempt, next_delay: Option, err: &sequencer_client::extension_trait::Error| { - metrics::counter!(crate::metrics_init::SEQUENCER_SUBMISSION_FAILURE_COUNT) - .increment(1); + metrics.increment_sequencer_submission_failure_count(); let state = Arc::clone(&state); state.set_sequencer_connected(false); @@ -479,7 +492,7 @@ async fn submit_tx( state.set_sequencer_connected(res.is_ok()); - metrics::histogram!(crate::metrics_init::SEQUENCER_SUBMISSION_LATENCY).record(start.elapsed()); + metrics.record_sequencer_submission_latency(start.elapsed()); res } diff --git a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs index 38ce986939..9d94048b27 100644 --- a/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs +++ b/crates/astria-bridge-withdrawer/src/withdrawer/submitter/tests.rs @@ -81,11 +81,14 @@ use wiremock::{ }; use super::Submitter; -use crate::withdrawer::{ - batch::Batch, - ethereum::convert::BridgeUnlockMemo, - state, - submitter, +use crate::{ + metrics::Metrics, + withdrawer::{ + batch::Batch, + ethereum::convert::BridgeUnlockMemo, + state, + submitter, + }, }; const SEQUENCER_CHAIN_ID: &str = "test_sequencer-1000"; @@ -145,6 +148,8 @@ impl TestSubmitter { // not testing watcher here so just set it to ready state.set_watcher_ready(); + let metrics = Box::leak(Box::new(Metrics::new())); + let (submitter, submitter_handle) = submitter::Builder { shutdown_token: shutdown_token.clone(), sequencer_key_path, @@ -153,6 +158,7 @@ impl TestSubmitter { state, expected_fee_asset_id: default_native_asset().id(), min_expected_fee_asset_balance: 1_000_000, + metrics, } .build() .unwrap(); diff --git a/crates/astria-composer/src/collectors/geth.rs b/crates/astria-composer/src/collectors/geth.rs index 68ed882ad7..0256822236 100644 --- a/crates/astria-composer/src/collectors/geth.rs +++ b/crates/astria-composer/src/collectors/geth.rs @@ -33,6 +33,7 @@ use ethers::providers::{ ProviderError, Ws, }; +use metrics::Counter; use tokio::{ select, sync::{ @@ -50,15 +51,9 @@ use tracing::{ }; use crate::{ - collectors::{ - EXECUTOR_SEND_TIMEOUT, - GETH, - }, + collectors::EXECUTOR_SEND_TIMEOUT, executor, - metrics_init::{ - COLLECTOR_TYPE_LABEL, - ROLLUP_ID_LABEL, - }, + metrics::Metrics, }; type StdError = dyn std::error::Error; @@ -85,6 +80,7 @@ pub(crate) struct Geth { url: String, // Token to signal the geth collector to stop upon shutdown. shutdown_token: CancellationToken, + metrics: &'static Metrics, } #[derive(Debug)] @@ -109,6 +105,7 @@ pub(crate) struct Builder { pub(crate) url: String, pub(crate) executor_handle: executor::Handle, pub(crate) shutdown_token: CancellationToken, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -118,6 +115,7 @@ impl Builder { url, executor_handle, shutdown_token, + metrics, } = self; let (status, _) = watch::channel(Status::new()); let rollup_id = RollupId::from_unhashed_bytes(&chain_name); @@ -133,6 +131,7 @@ impl Builder { status, url, shutdown_token, + metrics, } } } @@ -159,8 +158,30 @@ impl Geth { url, shutdown_token, chain_name, + metrics, } = self; + let txs_received_counter = metrics + .geth_txs_received(&chain_name) + .cloned() + .unwrap_or_else(|| { + error!( + rollup_chain_name = %chain_name, + "failed to get geth transactions_received counter" + ); + Counter::noop() + }); + let txs_dropped_counter = metrics + .geth_txs_dropped(&chain_name) + .cloned() + .unwrap_or_else(|| { + error!( + rollup_chain_name = %chain_name, + "failed to get geth transactions_dropped counter" + ); + Counter::noop() + }); + let retry_config = tryhard::RetryFutureConfig::new(1024) .exponential_backoff(Duration::from_millis(500)) .max_delay(Duration::from_secs(60)) @@ -214,12 +235,7 @@ impl Geth { fee_asset_id: default_native_asset().id(), }; - metrics::counter!( - crate::metrics_init::TRANSACTIONS_RECEIVED, - &[ - (ROLLUP_ID_LABEL, chain_name.clone()), - (COLLECTOR_TYPE_LABEL, GETH.to_string()) - ]).increment(1); + txs_received_counter.increment(1); match executor_handle .send_timeout(seq_action, EXECUTOR_SEND_TIMEOUT) @@ -232,13 +248,7 @@ impl Geth { timeout_ms = EXECUTOR_SEND_TIMEOUT.as_millis(), "timed out sending new transaction to executor; dropping tx", ); - metrics::counter!( - crate::metrics_init::TRANSACTIONS_DROPPED, - &[ - (ROLLUP_ID_LABEL, chain_name.clone()), - (COLLECTOR_TYPE_LABEL, GETH.to_string()) - ] - ).increment(1); + txs_dropped_counter.increment(1); } Err(SendTimeoutError::Closed(_seq_action)) => { warn!( @@ -246,13 +256,7 @@ impl Geth { "executor channel closed while sending transaction; dropping transaction \ and exiting event loop" ); - metrics::counter!( - crate::metrics_init::TRANSACTIONS_DROPPED, - &[ - (ROLLUP_ID_LABEL, chain_name.clone()), - (COLLECTOR_TYPE_LABEL, GETH.to_string()) - ] - ).increment(1); + txs_dropped_counter.increment(1); break Err(eyre!("executor channel closed while sending transaction")); } } diff --git a/crates/astria-composer/src/collectors/grpc.rs b/crates/astria-composer/src/collectors/grpc.rs index fd57579cd1..260520ab52 100644 --- a/crates/astria-composer/src/collectors/grpc.rs +++ b/crates/astria-composer/src/collectors/grpc.rs @@ -22,15 +22,9 @@ use tonic::{ }; use crate::{ - collectors::{ - EXECUTOR_SEND_TIMEOUT, - GRPC, - }, + collectors::EXECUTOR_SEND_TIMEOUT, executor, - metrics_init::{ - COLLECTOR_TYPE_LABEL, - ROLLUP_ID_LABEL, - }, + metrics::Metrics, }; /// Implements the `GrpcCollectorService` which listens for incoming gRPC requests and @@ -38,12 +32,14 @@ use crate::{ /// to the Astria Shared Sequencer. pub(crate) struct Grpc { executor: executor::Handle, + metrics: &'static Metrics, } impl Grpc { - pub(crate) fn new(executor: executor::Handle) -> Self { + pub(crate) fn new(executor: executor::Handle, metrics: &'static Metrics) -> Self { Self { executor, + metrics, } } } @@ -57,7 +53,7 @@ impl GrpcCollectorService for Grpc { let submit_rollup_tx_request = request.into_inner(); let Ok(rollup_id) = RollupId::try_from_slice(&submit_rollup_tx_request.rollup_id) else { - return Err(tonic::Status::invalid_argument("invalid rollup id")); + return Err(Status::invalid_argument("invalid rollup id")); }; let sequence_action = SequenceAction { @@ -66,14 +62,7 @@ impl GrpcCollectorService for Grpc { fee_asset_id: default_native_asset().id(), }; - metrics::counter!( - crate::metrics_init::TRANSACTIONS_RECEIVED, - &[ - (ROLLUP_ID_LABEL, rollup_id.to_string()), - (COLLECTOR_TYPE_LABEL, GRPC.to_string()) - ] - ) - .increment(1); + self.metrics.increment_grpc_txs_received(&rollup_id); match self .executor .send_timeout(sequence_action, EXECUTOR_SEND_TIMEOUT) @@ -81,32 +70,12 @@ impl GrpcCollectorService for Grpc { { Ok(()) => {} Err(SendTimeoutError::Timeout(_seq_action)) => { - metrics::counter!( - crate::metrics_init::TRANSACTIONS_DROPPED, - &[ - (ROLLUP_ID_LABEL, rollup_id.to_string()), - (COLLECTOR_TYPE_LABEL, GRPC.to_string()) - ] - ) - .increment(1); - - return Err(tonic::Status::unavailable( - "timeout while sending txs to composer", - )); + self.metrics.increment_grpc_txs_dropped(&rollup_id); + return Err(Status::unavailable("timeout while sending txs to composer")); } Err(SendTimeoutError::Closed(_seq_action)) => { - metrics::counter!( - crate::metrics_init::TRANSACTIONS_DROPPED, - &[ - (ROLLUP_ID_LABEL, rollup_id.to_string()), - (COLLECTOR_TYPE_LABEL, GRPC.to_string()) - ] - ) - .increment(1); - - return Err(tonic::Status::failed_precondition( - "composer is not available", - )); + self.metrics.increment_grpc_txs_dropped(&rollup_id); + return Err(Status::failed_precondition("composer is not available")); } } diff --git a/crates/astria-composer/src/collectors/mod.rs b/crates/astria-composer/src/collectors/mod.rs index 35bf5a2489..606131f108 100644 --- a/crates/astria-composer/src/collectors/mod.rs +++ b/crates/astria-composer/src/collectors/mod.rs @@ -5,8 +5,5 @@ use std::time::Duration; const EXECUTOR_SEND_TIMEOUT: Duration = Duration::from_millis(500); -const GETH: &str = "geth"; -const GRPC: &str = "grpc"; - pub(crate) use geth::Geth; pub(crate) use grpc::Grpc; diff --git a/crates/astria-composer/src/composer.rs b/crates/astria-composer/src/composer.rs index e22bba7244..9dcea0582a 100644 --- a/crates/astria-composer/src/composer.rs +++ b/crates/astria-composer/src/composer.rs @@ -1,6 +1,7 @@ use std::{ collections::HashMap, net::SocketAddr, + sync::OnceLock, time::Duration, }; @@ -44,7 +45,7 @@ use crate::{ executor::Executor, grpc, grpc::GrpcServer, - rollup::Rollup, + metrics::Metrics, Config, }; @@ -83,6 +84,7 @@ pub struct Composer { grpc_server: GrpcServer, /// Used to signal the Composer to shut down. shutdown_token: CancellationToken, + metrics: &'static Metrics, } /// Announces the current status of the Composer for other modules in the crate to use @@ -114,6 +116,11 @@ impl Composer { /// An error is returned if the composer fails to be initialized. /// See `[from_config]` for its error scenarios. pub async fn from_config(cfg: &Config) -> eyre::Result { + static METRICS: OnceLock = OnceLock::new(); + + let rollups = cfg.parse_rollups()?; + let metrics = METRICS.get_or_init(|| Metrics::new(rollups.keys())); + let (composer_status_sender, _) = watch::channel(Status::default()); let shutdown_token = CancellationToken::new(); @@ -125,6 +132,7 @@ impl Composer { max_bytes_per_bundle: cfg.max_bytes_per_bundle, bundle_queue_capacity: cfg.bundle_queue_capacity, shutdown_token: shutdown_token.clone(), + metrics, } .build() .wrap_err("executor construction from config failed")?; @@ -133,6 +141,7 @@ impl Composer { grpc_addr: cfg.grpc_addr, executor: executor_handle.clone(), shutdown_token: shutdown_token.clone(), + metrics, } .build() .await @@ -150,14 +159,6 @@ impl Composer { "API server listening" ); - let rollups = cfg - .rollups - .split(',') - .filter(|s| !s.is_empty()) - .map(|s| Rollup::parse(s).map(Rollup::into_parts)) - .collect::, _>>() - .wrap_err("failed parsing provided :: pairs as rollups")?; - let geth_collectors = rollups .iter() .map(|(rollup_name, url)| { @@ -166,6 +167,7 @@ impl Composer { url: url.clone(), executor_handle: executor_handle.clone(), shutdown_token: shutdown_token.clone(), + metrics, } .build(); (rollup_name.clone(), collector) @@ -188,6 +190,7 @@ impl Composer { geth_collector_tasks: JoinMap::new(), grpc_server, shutdown_token, + metrics, }) } @@ -210,6 +213,9 @@ impl Composer { /// /// # Panics /// It panics if the Composer cannot set the SIGTERM listener. + // allow: it seems splitting this into smaller functions makes the code less readable due to + // the high number of params needed for these functions. + #[allow(clippy::too_many_lines)] pub async fn run_until_stopped(self) -> eyre::Result<()> { let Self { api_server, @@ -222,6 +228,7 @@ impl Composer { mut geth_collector_statuses, grpc_server, shutdown_token, + metrics, } = self; // we need the API server to shutdown at the end, since it is used by k8s @@ -311,15 +318,24 @@ impl Composer { }; }, Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => { - reconnect_exited_collector( - &mut geth_collector_statuses, - &mut geth_collector_tasks, - executor_handle.clone(), - &rollups, - rollup, - collector_exit, - shutdown_token.clone() - ); + report_exit("collector", collector_exit); + if let Some(url) = rollups.get(&rollup) { + let collector = geth::Builder { + chain_name: rollup.clone(), + url: url.clone(), + executor_handle: executor_handle.clone(), + shutdown_token: shutdown_token.clone(), + metrics, + } + .build(); + geth_collector_statuses.insert(rollup.clone(), collector.subscribe()); + geth_collector_tasks.spawn(rollup, collector.run_until_stopped()); + } else { + error!( + "rollup should have had an entry in the rollup->url map but doesn't; not reconnecting \ + it" + ); + } }); }; @@ -505,35 +521,6 @@ async fn wait_for_collectors( Ok(()) } -pub(super) fn reconnect_exited_collector( - collector_statuses: &mut HashMap>, - collector_tasks: &mut JoinMap>, - executor_handle: executor::Handle, - rollups: &HashMap, - rollup: String, - exit_result: Result, JoinError>, - shutdown_token: CancellationToken, -) { - report_exit("collector", exit_result); - let Some(url) = rollups.get(&rollup) else { - error!( - "rollup should have had an entry in the rollup->url map but doesn't; not reconnecting \ - it" - ); - return; - }; - - let collector = geth::Builder { - chain_name: rollup.clone(), - url: url.clone(), - executor_handle, - shutdown_token, - } - .build(); - collector_statuses.insert(rollup.clone(), collector.subscribe()); - collector_tasks.spawn(rollup, collector.run_until_stopped()); -} - fn report_exit(task_name: &str, outcome: Result, JoinError>) { match outcome { Ok(Ok(())) => info!(task = task_name, "task exited successfully"), diff --git a/crates/astria-composer/src/config.rs b/crates/astria-composer/src/config.rs index 778bd84ccb..07a4a5d09c 100644 --- a/crates/astria-composer/src/config.rs +++ b/crates/astria-composer/src/config.rs @@ -1,10 +1,16 @@ -use std::net::SocketAddr; +use std::{ + collections::HashMap, + net::SocketAddr, +}; +use astria_eyre::eyre::WrapErr; use serde::{ Deserialize, Serialize, }; +use crate::rollup::Rollup; + // this is a config, may have many boolean values #[allow(clippy::struct_excessive_bools)] #[derive(Debug, Deserialize, Serialize)] @@ -58,6 +64,17 @@ pub struct Config { pub grpc_addr: SocketAddr, } +impl Config { + pub(crate) fn parse_rollups(&self) -> astria_eyre::eyre::Result> { + self.rollups + .split(',') + .filter(|s| !s.is_empty()) + .map(|s| Rollup::parse(s).map(Rollup::into_parts)) + .collect::, _>>() + .wrap_err("failed parsing provided :: pairs as rollups") + } +} + impl config::Config for Config { const PREFIX: &'static str = "ASTRIA_COMPOSER_"; } diff --git a/crates/astria-composer/src/executor/builder.rs b/crates/astria-composer/src/executor/builder.rs index e24e281b31..2509c0963c 100644 --- a/crates/astria-composer/src/executor/builder.rs +++ b/crates/astria-composer/src/executor/builder.rs @@ -23,6 +23,7 @@ use tokio_util::sync::CancellationToken; use crate::{ executor, executor::Status, + metrics::Metrics, }; pub(crate) struct Builder { @@ -33,6 +34,7 @@ pub(crate) struct Builder { pub(crate) max_bytes_per_bundle: usize, pub(crate) bundle_queue_capacity: usize, pub(crate) shutdown_token: CancellationToken, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -45,6 +47,7 @@ impl Builder { max_bytes_per_bundle, bundle_queue_capacity, shutdown_token, + metrics, } = self; let sequencer_client = sequencer_client::HttpClient::new(sequencer_url.as_str()) .wrap_err("failed constructing sequencer client")?; @@ -75,6 +78,7 @@ impl Builder { max_bytes_per_bundle, bundle_queue_capacity, shutdown_token, + metrics, }, executor::Handle::new(serialized_rollup_transaction_tx), )) diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index 131446583b..a769a1f3cc 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -71,9 +71,12 @@ use tracing::{ }; use self::bundle_factory::SizedBundle; -use crate::executor::bundle_factory::{ - BundleFactory, - SizedBundleReport, +use crate::{ + executor::bundle_factory::{ + BundleFactory, + SizedBundleReport, + }, + metrics::Metrics, }; mod bundle_factory; @@ -84,8 +87,6 @@ mod tests; pub(crate) use builder::Builder; -use crate::metrics_init::ROLLUP_ID_LABEL; - // Duration to wait for the executor to drain all the remaining bundles before shutting down. // This is 16s because the timeout for the higher level executor task is 17s to shut down. // The extra second is to prevent the higher level executor task from timing out before the @@ -99,7 +100,6 @@ type StdError = dyn std::error::Error; /// The `Executor` receives `Vec` from the bundling logic, packages them with a nonce into /// an `Unsigned`, then signs them with the sequencer key and submits to the sequencer. /// Its `status` field indicates that connection to the sequencer node has been established. -#[derive(Debug)] pub(super) struct Executor { // The status of this executor status: watch::Sender, @@ -122,6 +122,7 @@ pub(super) struct Executor { bundle_queue_capacity: usize, // Token to signal the executor to stop upon shutdown. shutdown_token: CancellationToken, + metrics: &'static Metrics, } #[derive(Clone)] @@ -172,7 +173,12 @@ impl Executor { /// Create a future to submit a bundle to the sequencer. #[instrument(skip_all, fields(nonce.initial = %nonce))] - fn submit_bundle(&self, nonce: u32, bundle: SizedBundle) -> Fuse> { + fn submit_bundle( + &self, + nonce: u32, + bundle: SizedBundle, + metrics: &'static Metrics, + ) -> Fuse> { SubmitFut { client: self.sequencer_client.clone(), address: self.address, @@ -181,6 +187,7 @@ impl Executor { signing_key: self.sequencer_key.clone(), state: SubmitState::NotStarted, bundle, + metrics, } .in_current_span() .fuse() @@ -193,11 +200,11 @@ impl Executor { #[instrument(skip_all, fields(address = %self.address))] pub(super) async fn run_until_stopped(mut self) -> eyre::Result<()> { let mut submission_fut: Fuse> = Fuse::terminated(); - let mut nonce = get_latest_nonce(self.sequencer_client.clone(), self.address) + let mut nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics) .await .wrap_err("failed getting initial nonce from sequencer")?; - metrics::gauge!(crate::metrics_init::CURRENT_NONCE).set(nonce); + self.metrics.set_current_nonce(nonce); self.status.send_modify(|status| status.is_connected = true); @@ -234,7 +241,7 @@ impl Executor { Some(next_bundle) = future::ready(bundle_factory.next_finished()), if submission_fut.is_terminated() => { let bundle = next_bundle.pop(); if !bundle.is_empty() { - submission_fut = self.submit_bundle(nonce, bundle); + submission_fut = self.submit_bundle(nonce, bundle, self.metrics); } } @@ -243,16 +250,12 @@ impl Executor { let rollup_id = seq_action.rollup_id; if let Err(e) = bundle_factory.try_push(seq_action) { - metrics::gauge!( - crate::metrics_init::TRANSACTIONS_DROPPED_TOO_LARGE, - ROLLUP_ID_LABEL => rollup_id.to_string() - ) - .increment(1); - warn!( - rollup_id = %rollup_id, - error = &e as &StdError, - "failed to bundle transaction, dropping it." - ); + self.metrics.increment_txs_dropped_too_large(&rollup_id); + warn!( + rollup_id = %rollup_id, + error = &e as &StdError, + "failed to bundle transaction, dropping it." + ); } } @@ -266,7 +269,7 @@ impl Executor { debug!( "forcing bundle submission to sequencer due to block timer" ); - submission_fut = self.submit_bundle(nonce, bundle); + submission_fut = self.submit_bundle(nonce, bundle, self.metrics); } } } @@ -301,11 +304,7 @@ impl Executor { let rollup_id = seq_action.rollup_id; if let Err(e) = bundle_factory.try_push(seq_action) { - metrics::gauge!( - crate::metrics_init::TRANSACTIONS_DROPPED_TOO_LARGE, - ROLLUP_ID_LABEL => rollup_id.to_string() - ) - .increment(1); + self.metrics.increment_txs_dropped_too_large(&rollup_id); warn!( rollup_id = %rollup_id, error = &e as &StdError, @@ -355,7 +354,10 @@ impl Executor { } while let Some(bundle) = bundles_to_drain.pop_front() { - match self.submit_bundle(nonce, bundle.clone()).await { + match self + .submit_bundle(nonce, bundle.clone(), self.metrics) + .await + { Ok(new_nonce) => { debug!( bundle = %telemetry::display::json(&SizedBundleReport(&bundle)), @@ -422,9 +424,9 @@ impl Executor { async fn get_latest_nonce( client: sequencer_client::HttpClient, address: Address, + metrics: &Metrics, ) -> eyre::Result { debug!("fetching latest nonce from sequencer"); - metrics::counter!(crate::metrics_init::NONCE_FETCH_COUNT).increment(1); let span = Span::current(); let start = Instant::now(); let retry_config = tryhard::RetryFutureConfig::new(1024) @@ -434,7 +436,7 @@ async fn get_latest_nonce( |attempt, next_delay: Option, err: &sequencer_client::extension_trait::Error| { - metrics::counter!(crate::metrics_init::NONCE_FETCH_FAILURE_COUNT).increment(1); + metrics.increment_nonce_fetch_failure_count(); let wait_duration = next_delay .map(humantime::format_duration) @@ -452,13 +454,14 @@ async fn get_latest_nonce( let res = tryhard::retry_fn(|| { let client = client.clone(); let span = info_span!(parent: span.clone(), "attempt get nonce"); + metrics.increment_nonce_fetch_count(); async move { client.get_latest_nonce(address).await.map(|rsp| rsp.nonce) }.instrument(span) }) .with_config(retry_config) .await .wrap_err("failed getting latest nonce from sequencer after 1024 attempts"); - metrics::histogram!(crate::metrics_init::NONCE_FETCH_LATENCY).record(start.elapsed()); + metrics.record_nonce_fetch_latency(start.elapsed()); res } @@ -474,9 +477,10 @@ async fn get_latest_nonce( async fn submit_tx( client: sequencer_client::HttpClient, tx: SignedTransaction, + metrics: &Metrics, ) -> eyre::Result { let nonce = tx.nonce(); - metrics::gauge!(crate::metrics_init::CURRENT_NONCE).set(nonce); + metrics.set_current_nonce(nonce); // TODO: change to info and log tx hash (to match info log in `SubmitFut`'s response handling // logic) @@ -490,8 +494,7 @@ async fn submit_tx( |attempt, next_delay: Option, err: &sequencer_client::extension_trait::Error| { - metrics::counter!(crate::metrics_init::SEQUENCER_SUBMISSION_FAILURE_COUNT) - .increment(1); + metrics.increment_sequencer_submission_failure_count(); let wait_duration = next_delay .map(humantime::format_duration) @@ -516,7 +519,7 @@ async fn submit_tx( .await .wrap_err("failed sending transaction after 1024 attempts"); - metrics::histogram!(crate::metrics_init::SEQUENCER_SUBMISSION_LATENCY).record(start.elapsed()); + metrics.record_sequencer_submission_latency(start.elapsed()); res } @@ -540,6 +543,7 @@ pin_project! { #[pin] state: SubmitState, bundle: SizedBundle, + metrics: &'static Metrics, } } @@ -585,7 +589,7 @@ impl Future for SubmitFut { "submitting transaction to sequencer", ); SubmitState::WaitingForSend { - fut: submit_tx(this.client.clone(), tx).boxed(), + fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(), } } @@ -596,17 +600,11 @@ impl Future for SubmitFut { let tendermint::abci::Code::Err(code) = rsp.code else { info!("sequencer responded with ok; submission successful"); - // allow: precision loss is unlikely (values too small) but also - // unimportant in histograms - #[allow(clippy::cast_precision_loss)] - metrics::histogram!(crate::metrics_init::BYTES_PER_SUBMISSION) - .record(this.bundle.get_size() as f64); + this.metrics + .record_bytes_per_submission(this.bundle.get_size()); - // allow: precision loss is unlikely (values too small) but also - // unimportant in histograms - #[allow(clippy::cast_precision_loss)] - metrics::histogram!(crate::metrics_init::TRANSACTIONS_PER_SUBMISSION) - .record(this.bundle.actions_count() as f64); + this.metrics + .record_txs_per_submission(this.bundle.actions_count()); return Poll::Ready(Ok(this .nonce @@ -620,8 +618,12 @@ impl Future for SubmitFut { fetching new nonce" ); SubmitState::WaitingForNonce { - fut: get_latest_nonce(this.client.clone(), *this.address) - .boxed(), + fut: get_latest_nonce( + this.client.clone(), + *this.address, + self.metrics, + ) + .boxed(), } } _other => { @@ -631,10 +633,7 @@ impl Future for SubmitFut { "sequencer rejected the transaction; the bundle is likely lost", ); - metrics::counter!( - crate::metrics_init::SEQUENCER_SUBMISSION_FAILURE_COUNT - ) - .increment(1); + this.metrics.increment_sequencer_submission_failure_count(); return Poll::Ready(Ok(*this.nonce)); } @@ -671,7 +670,7 @@ impl Future for SubmitFut { "resubmitting transaction to sequencer with new nonce", ); SubmitState::WaitingForSend { - fut: submit_tx(this.client.clone(), tx).boxed(), + fut: submit_tx(this.client.clone(), tx, self.metrics).boxed(), } } Err(error) => { diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index 0e37b1da9b..900b884299 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -44,6 +44,7 @@ use wiremock::{ use crate::{ executor, + metrics::Metrics, Config, }; @@ -208,6 +209,7 @@ async fn full_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg, _keyfile) = setup().await; let shutdown_token = CancellationToken::new(); + let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); let (executor, executor_handle) = executor::Builder { sequencer_url: cfg.sequencer_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), @@ -216,6 +218,7 @@ async fn full_bundle() { max_bytes_per_bundle: cfg.max_bytes_per_bundle, bundle_queue_capacity: cfg.bundle_queue_capacity, shutdown_token: shutdown_token.clone(), + metrics, } .build() .unwrap(); @@ -299,6 +302,7 @@ async fn bundle_triggered_by_block_timer() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg, _keyfile) = setup().await; let shutdown_token = CancellationToken::new(); + let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); let (executor, executor_handle) = executor::Builder { sequencer_url: cfg.sequencer_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), @@ -307,6 +311,7 @@ async fn bundle_triggered_by_block_timer() { max_bytes_per_bundle: cfg.max_bytes_per_bundle, bundle_queue_capacity: cfg.bundle_queue_capacity, shutdown_token: shutdown_token.clone(), + metrics, } .build() .unwrap(); @@ -383,6 +388,7 @@ async fn two_seq_actions_single_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock let (sequencer, nonce_guard, cfg, _keyfile) = setup().await; let shutdown_token = CancellationToken::new(); + let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); let (executor, executor_handle) = executor::Builder { sequencer_url: cfg.sequencer_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), @@ -391,6 +397,7 @@ async fn two_seq_actions_single_bundle() { max_bytes_per_bundle: cfg.max_bytes_per_bundle, bundle_queue_capacity: cfg.bundle_queue_capacity, shutdown_token: shutdown_token.clone(), + metrics, } .build() .unwrap(); diff --git a/crates/astria-composer/src/grpc.rs b/crates/astria-composer/src/grpc.rs index 9f504ab724..478537b876 100644 --- a/crates/astria-composer/src/grpc.rs +++ b/crates/astria-composer/src/grpc.rs @@ -22,6 +22,7 @@ use tokio_util::sync::CancellationToken; use crate::{ collectors, executor, + metrics::Metrics, }; /// Listens for incoming gRPC requests and sends the Rollup transactions to the @@ -38,6 +39,7 @@ pub(crate) struct Builder { pub(crate) grpc_addr: SocketAddr, pub(crate) executor: executor::Handle, pub(crate) shutdown_token: CancellationToken, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -46,12 +48,13 @@ impl Builder { grpc_addr, executor, shutdown_token, + metrics, } = self; let listener = TcpListener::bind(grpc_addr) .await .wrap_err("failed to bind socket address")?; - let grpc_collector = collectors::Grpc::new(executor.clone()); + let grpc_collector = collectors::Grpc::new(executor.clone(), metrics); Ok(GrpcServer { listener, diff --git a/crates/astria-composer/src/lib.rs b/crates/astria-composer/src/lib.rs index 8ab1fb96e4..881c2ebc96 100644 --- a/crates/astria-composer/src/lib.rs +++ b/crates/astria-composer/src/lib.rs @@ -45,7 +45,7 @@ mod composer; pub mod config; mod executor; mod grpc; -pub mod metrics_init; +pub(crate) mod metrics; mod rollup; pub use build_info::BUILD_INFO; diff --git a/crates/astria-composer/src/main.rs b/crates/astria-composer/src/main.rs index 38f9f21d77..803301886b 100644 --- a/crates/astria-composer/src/main.rs +++ b/crates/astria-composer/src/main.rs @@ -1,7 +1,6 @@ use std::process::ExitCode; use astria_composer::{ - metrics_init, Composer, Config, BUILD_INFO, @@ -38,8 +37,7 @@ async fn main() -> ExitCode { if !cfg.no_metrics { telemetry_conf = telemetry_conf .metrics_addr(&cfg.metrics_http_listener_addr) - .service_name(env!("CARGO_PKG_NAME")) - .register_metrics(metrics_init::register); + .service_name(env!("CARGO_PKG_NAME")); } let _telemetry_guard = match telemetry_conf diff --git a/crates/astria-composer/src/metrics.rs b/crates/astria-composer/src/metrics.rs new file mode 100644 index 0000000000..f400356a13 --- /dev/null +++ b/crates/astria-composer/src/metrics.rs @@ -0,0 +1,341 @@ +use std::{ + collections::HashMap, + time::Duration, +}; + +use astria_core::primitive::v1::RollupId; +use metrics::{ + counter, + describe_counter, + describe_gauge, + describe_histogram, + gauge, + histogram, + Counter, + Gauge, + Histogram, + Unit, +}; +use telemetry::metric_names; +use tracing::error; + +const ROLLUP_CHAIN_NAME_LABEL: &str = "rollup_chain_name"; +const ROLLUP_ID_LABEL: &str = "rollup_id"; +const COLLECTOR_TYPE_LABEL: &str = "collector_type"; + +pub(crate) struct Metrics { + geth_txs_received: HashMap, + geth_txs_dropped: HashMap, + grpc_txs_received: HashMap, + grpc_txs_dropped: HashMap, + txs_dropped_too_large: HashMap, + nonce_fetch_count: Counter, + nonce_fetch_failure_count: Counter, + nonce_fetch_latency: Histogram, + current_nonce: Gauge, + sequencer_submission_latency: Histogram, + sequencer_submission_failure_count: Counter, + txs_per_submission: Histogram, + bytes_per_submission: Histogram, +} + +impl Metrics { + #[must_use] + pub(crate) fn new<'a>(rollup_chain_names: impl Iterator + Clone) -> Self { + let (geth_txs_received, grpc_txs_received) = + register_txs_received(rollup_chain_names.clone()); + let (geth_txs_dropped, grpc_txs_dropped) = register_txs_dropped(rollup_chain_names.clone()); + let txs_dropped_too_large = register_txs_dropped_too_large(rollup_chain_names); + + describe_counter!( + NONCE_FETCH_COUNT, + Unit::Count, + "The number of times we have attempted to fetch the nonce" + ); + let nonce_fetch_count = counter!(NONCE_FETCH_COUNT); + + describe_counter!( + NONCE_FETCH_FAILURE_COUNT, + Unit::Count, + "The number of times we have failed to fetch the nonce" + ); + let nonce_fetch_failure_count = counter!(NONCE_FETCH_FAILURE_COUNT); + + describe_histogram!( + NONCE_FETCH_LATENCY, + Unit::Seconds, + "The latency of fetching the nonce, in seconds" + ); + let nonce_fetch_latency = histogram!(NONCE_FETCH_LATENCY); + + describe_gauge!(CURRENT_NONCE, Unit::Count, "The current nonce"); + let current_nonce = gauge!(CURRENT_NONCE); + + describe_histogram!( + SEQUENCER_SUBMISSION_LATENCY, + Unit::Seconds, + "The latency of submitting a transaction to the sequencer, in seconds" + ); + let sequencer_submission_latency = histogram!(SEQUENCER_SUBMISSION_LATENCY); + + describe_counter!( + SEQUENCER_SUBMISSION_FAILURE_COUNT, + Unit::Count, + "The number of failed transaction submissions to the sequencer" + ); + let sequencer_submission_failure_count = counter!(SEQUENCER_SUBMISSION_FAILURE_COUNT); + + describe_histogram!( + TRANSACTIONS_PER_SUBMISSION, + Unit::Count, + "The number of rollup transactions successfully sent to the sequencer in a single \ + submission" + ); + let txs_per_submission = histogram!(TRANSACTIONS_PER_SUBMISSION); + + describe_histogram!( + BYTES_PER_SUBMISSION, + Unit::Bytes, + "The total bytes successfully sent to the sequencer in a single submission" + ); + let bytes_per_submission = histogram!(BYTES_PER_SUBMISSION); + + Self { + geth_txs_received, + geth_txs_dropped, + grpc_txs_received, + grpc_txs_dropped, + txs_dropped_too_large, + nonce_fetch_count, + nonce_fetch_failure_count, + nonce_fetch_latency, + current_nonce, + sequencer_submission_latency, + sequencer_submission_failure_count, + txs_per_submission, + bytes_per_submission, + } + } + + pub(crate) fn geth_txs_received(&self, id: &String) -> Option<&Counter> { + self.geth_txs_received.get(id) + } + + pub(crate) fn geth_txs_dropped(&self, id: &String) -> Option<&Counter> { + self.geth_txs_dropped.get(id) + } + + pub(crate) fn increment_grpc_txs_received(&self, id: &RollupId) { + let Some(counter) = self.grpc_txs_received.get(id) else { + error!(rollup_id = %id, "failed to get grpc transactions_received counter"); + return; + }; + counter.increment(1); + } + + pub(crate) fn increment_grpc_txs_dropped(&self, id: &RollupId) { + let Some(counter) = self.grpc_txs_dropped.get(id) else { + error!(rollup_id = %id, "failed to get grpc transactions_dropped counter"); + return; + }; + counter.increment(1); + } + + pub(crate) fn increment_txs_dropped_too_large(&self, id: &RollupId) { + let Some(counter) = self.txs_dropped_too_large.get(id) else { + error!(rollup_id = %id, "failed to get transactions_dropped_too_large counter"); + return; + }; + counter.increment(1); + } + + pub(crate) fn increment_nonce_fetch_count(&self) { + self.nonce_fetch_count.increment(1); + } + + pub(crate) fn increment_nonce_fetch_failure_count(&self) { + self.nonce_fetch_failure_count.increment(1); + } + + pub(crate) fn record_nonce_fetch_latency(&self, latency: Duration) { + self.nonce_fetch_latency.record(latency); + } + + pub(crate) fn set_current_nonce(&self, nonce: u32) { + self.current_nonce.set(nonce); + } + + pub(crate) fn record_sequencer_submission_latency(&self, latency: Duration) { + self.sequencer_submission_latency.record(latency); + } + + pub(crate) fn increment_sequencer_submission_failure_count(&self) { + self.sequencer_submission_failure_count.increment(1); + } + + pub(crate) fn record_txs_per_submission(&self, count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.txs_per_submission.record(count as f64); + } + + pub(crate) fn record_bytes_per_submission(&self, byte_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.bytes_per_submission.record(byte_count as f64); + } +} + +fn register_txs_received<'a>( + rollup_chain_names: impl Iterator, +) -> (HashMap, HashMap) { + describe_counter!( + TRANSACTIONS_RECEIVED, + Unit::Count, + "The number of transactions successfully received from collectors and bundled, labelled \ + by rollup and collector type" + ); + + let mut geth_counters = HashMap::new(); + let mut grpc_counters = HashMap::new(); + + for chain_name in rollup_chain_names { + let rollup_id = RollupId::from_unhashed_bytes(chain_name.as_bytes()); + + let geth_counter = counter!( + TRANSACTIONS_RECEIVED, + ROLLUP_CHAIN_NAME_LABEL => chain_name.clone(), + ROLLUP_ID_LABEL => rollup_id.to_string(), + COLLECTOR_TYPE_LABEL => "geth", + ); + geth_counters.insert(chain_name.clone(), geth_counter.clone()); + + let grpc_counter = counter!( + TRANSACTIONS_RECEIVED, + ROLLUP_CHAIN_NAME_LABEL => chain_name.clone(), + ROLLUP_ID_LABEL => rollup_id.to_string(), + COLLECTOR_TYPE_LABEL => "grpc", + ); + grpc_counters.insert(rollup_id, grpc_counter); + } + (geth_counters, grpc_counters) +} + +fn register_txs_dropped<'a>( + rollup_chain_names: impl Iterator, +) -> (HashMap, HashMap) { + describe_counter!( + TRANSACTIONS_DROPPED, + Unit::Count, + "The number of transactions dropped by the collectors before bundling, labelled by rollup \ + and collector type" + ); + + let mut geth_counters = HashMap::new(); + let mut grpc_counters = HashMap::new(); + + for chain_name in rollup_chain_names { + let rollup_id = RollupId::from_unhashed_bytes(chain_name.as_bytes()); + + let geth_counter = counter!( + TRANSACTIONS_DROPPED, + ROLLUP_CHAIN_NAME_LABEL => chain_name.clone(), + ROLLUP_ID_LABEL => rollup_id.to_string(), + COLLECTOR_TYPE_LABEL => "geth", + ); + geth_counters.insert(chain_name.clone(), geth_counter.clone()); + + let grpc_counter = counter!( + TRANSACTIONS_DROPPED, + ROLLUP_CHAIN_NAME_LABEL => chain_name.clone(), + ROLLUP_ID_LABEL => rollup_id.to_string(), + COLLECTOR_TYPE_LABEL => "grpc", + ); + grpc_counters.insert(rollup_id, grpc_counter); + } + (geth_counters, grpc_counters) +} + +fn register_txs_dropped_too_large<'a>( + rollup_chain_names: impl Iterator, +) -> HashMap { + describe_counter!( + TRANSACTIONS_DROPPED_TOO_LARGE, + Unit::Count, + "The number of transactions dropped because they were too large, labelled by rollup" + ); + + let mut counters = HashMap::new(); + + for chain_name in rollup_chain_names { + let rollup_id = RollupId::from_unhashed_bytes(chain_name.as_bytes()); + + let counter = counter!( + TRANSACTIONS_DROPPED_TOO_LARGE, + ROLLUP_CHAIN_NAME_LABEL => chain_name.clone(), + ROLLUP_ID_LABEL => rollup_id.to_string(), + ); + counters.insert(rollup_id, counter); + } + counters +} + +metric_names!(pub const METRICS_NAMES: + TRANSACTIONS_RECEIVED, + TRANSACTIONS_DROPPED, + TRANSACTIONS_DROPPED_TOO_LARGE, + NONCE_FETCH_COUNT, + NONCE_FETCH_FAILURE_COUNT, + NONCE_FETCH_LATENCY, + CURRENT_NONCE, + SEQUENCER_SUBMISSION_LATENCY, + SEQUENCER_SUBMISSION_FAILURE_COUNT, + TRANSACTIONS_PER_SUBMISSION, + BYTES_PER_SUBMISSION +); + +#[cfg(test)] +mod tests { + use super::{ + BYTES_PER_SUBMISSION, + CURRENT_NONCE, + NONCE_FETCH_COUNT, + NONCE_FETCH_FAILURE_COUNT, + NONCE_FETCH_LATENCY, + SEQUENCER_SUBMISSION_FAILURE_COUNT, + SEQUENCER_SUBMISSION_LATENCY, + TRANSACTIONS_DROPPED, + TRANSACTIONS_DROPPED_TOO_LARGE, + TRANSACTIONS_PER_SUBMISSION, + TRANSACTIONS_RECEIVED, + }; + + #[track_caller] + fn assert_const(actual: &'static str, suffix: &str) { + // XXX: hard-code this so the crate name isn't accidentally changed. + const CRATE_NAME: &str = "astria_composer"; + let expected = format!("{CRATE_NAME}_{suffix}"); + assert_eq!(expected, actual); + } + + #[test] + fn metrics_are_as_expected() { + assert_const(TRANSACTIONS_RECEIVED, "transactions_received"); + assert_const(TRANSACTIONS_DROPPED, "transactions_dropped"); + assert_const( + TRANSACTIONS_DROPPED_TOO_LARGE, + "transactions_dropped_too_large", + ); + assert_const(NONCE_FETCH_COUNT, "nonce_fetch_count"); + assert_const(NONCE_FETCH_FAILURE_COUNT, "nonce_fetch_failure_count"); + assert_const(NONCE_FETCH_LATENCY, "nonce_fetch_latency"); + assert_const(CURRENT_NONCE, "current_nonce"); + assert_const(SEQUENCER_SUBMISSION_LATENCY, "sequencer_submission_latency"); + assert_const( + SEQUENCER_SUBMISSION_FAILURE_COUNT, + "sequencer_submission_failure_count", + ); + assert_const(TRANSACTIONS_PER_SUBMISSION, "transactions_per_submission"); + assert_const(BYTES_PER_SUBMISSION, "bytes_per_submission"); + } +} diff --git a/crates/astria-composer/src/metrics_init.rs b/crates/astria-composer/src/metrics_init.rs deleted file mode 100644 index a9b35f2347..0000000000 --- a/crates/astria-composer/src/metrics_init.rs +++ /dev/null @@ -1,135 +0,0 @@ -//! Crate-specific metrics functionality. -//! -//! Registers metrics & lists constants to be used as metric names throughout crate. - -use metrics::{ - describe_counter, - describe_gauge, - describe_histogram, - Unit, -}; -use telemetry::metric_names; - -/// Labels -pub(crate) const ROLLUP_ID_LABEL: &str = "rollup_id"; -pub(crate) const COLLECTOR_TYPE_LABEL: &str = "collector_type"; - -/// Registers all metrics used by this crate. -// allow: refactor this. being tracked in https://github.com/astriaorg/astria/issues/1027 -#[allow(clippy::too_many_lines)] -pub fn register() { - describe_counter!( - TRANSACTIONS_RECEIVED, - Unit::Count, - "The number of transactions successfully received from collectors and bundled labelled by \ - rollup" - ); - describe_counter!( - TRANSACTIONS_DROPPED, - Unit::Count, - "The number of transactions dropped by the collectors before bundling it labelled by \ - rollup and collector type" - ); - describe_counter!( - TRANSACTIONS_DROPPED_TOO_LARGE, - Unit::Count, - "The number of transactions dropped because they were too large" - ); - describe_counter!( - NONCE_FETCH_COUNT, - Unit::Count, - "The number of times we have attempted to fetch the nonce" - ); - describe_counter!( - NONCE_FETCH_FAILURE_COUNT, - Unit::Count, - "The number of times we have failed to fetch the nonce" - ); - describe_histogram!( - NONCE_FETCH_LATENCY, - Unit::Milliseconds, - "The latency of nonce fetch" - ); - describe_gauge!(CURRENT_NONCE, Unit::Count, "The current nonce"); - describe_histogram!( - SEQUENCER_SUBMISSION_LATENCY, - Unit::Milliseconds, - "The latency of submitting a transaction to the sequencer" - ); - describe_counter!( - SEQUENCER_SUBMISSION_FAILURE_COUNT, - Unit::Count, - "The number of failed transaction submissions to the sequencer" - ); - describe_histogram!( - TRANSACTIONS_PER_SUBMISSION, - Unit::Count, - "The number of rollup transactions successfully sent to the sequencer in a single \ - submission" - ); - describe_histogram!( - BYTES_PER_SUBMISSION, - Unit::Bytes, - "The total bytes successfully sent to the sequencer in a single submission" - ); -} - -metric_names!(pub const METRICS_NAMES: - TRANSACTIONS_RECEIVED, - TRANSACTIONS_DROPPED, - TRANSACTIONS_DROPPED_TOO_LARGE, - NONCE_FETCH_COUNT, - NONCE_FETCH_FAILURE_COUNT, - NONCE_FETCH_LATENCY, - CURRENT_NONCE, - SEQUENCER_SUBMISSION_LATENCY, - SEQUENCER_SUBMISSION_FAILURE_COUNT, - TRANSACTIONS_PER_SUBMISSION, - BYTES_PER_SUBMISSION -); - -#[cfg(test)] -mod tests { - use super::{ - BYTES_PER_SUBMISSION, - CURRENT_NONCE, - NONCE_FETCH_COUNT, - NONCE_FETCH_FAILURE_COUNT, - NONCE_FETCH_LATENCY, - SEQUENCER_SUBMISSION_FAILURE_COUNT, - SEQUENCER_SUBMISSION_LATENCY, - TRANSACTIONS_DROPPED, - TRANSACTIONS_DROPPED_TOO_LARGE, - TRANSACTIONS_PER_SUBMISSION, - TRANSACTIONS_RECEIVED, - }; - - #[track_caller] - fn assert_const(actual: &'static str, suffix: &str) { - // XXX: hard-code this so the crate name isn't accidentally changed. - const CRATE_NAME: &str = "astria_composer"; - let expected = format!("{CRATE_NAME}_{suffix}"); - assert_eq!(expected, actual); - } - - #[test] - fn metrics_are_as_expected() { - assert_const(TRANSACTIONS_RECEIVED, "transactions_received"); - assert_const(TRANSACTIONS_DROPPED, "transactions_dropped"); - assert_const( - TRANSACTIONS_DROPPED_TOO_LARGE, - "transactions_dropped_too_large", - ); - assert_const(NONCE_FETCH_COUNT, "nonce_fetch_count"); - assert_const(NONCE_FETCH_FAILURE_COUNT, "nonce_fetch_failure_count"); - assert_const(NONCE_FETCH_LATENCY, "nonce_fetch_latency"); - assert_const(CURRENT_NONCE, "current_nonce"); - assert_const(SEQUENCER_SUBMISSION_LATENCY, "sequencer_submission_latency"); - assert_const( - SEQUENCER_SUBMISSION_FAILURE_COUNT, - "sequencer_submission_failure_count", - ); - assert_const(TRANSACTIONS_PER_SUBMISSION, "transactions_per_submission"); - assert_const(BYTES_PER_SUBMISSION, "bytes_per_submission"); - } -} diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 99cf090c92..a14246ddee 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -11,7 +11,10 @@ use tendermint_rpc::HttpClient as SequencerClient; use tokio_util::sync::CancellationToken; use super::Reader; -use crate::executor; +use crate::{ + executor, + metrics::Metrics, +}; pub(crate) struct Builder { pub(crate) celestia_block_time: Duration, @@ -21,6 +24,7 @@ pub(crate) struct Builder { pub(crate) sequencer_cometbft_client: SequencerClient, pub(crate) sequencer_requests_per_second: u32, pub(crate) shutdown: CancellationToken, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -34,6 +38,7 @@ impl Builder { sequencer_cometbft_client, sequencer_requests_per_second, shutdown, + metrics, } = self; let celestia_client = create_celestia_client(celestia_http_endpoint, &celestia_token) @@ -46,6 +51,7 @@ impl Builder { sequencer_cometbft_client, sequencer_requests_per_second, shutdown, + metrics, }) } } diff --git a/crates/astria-conductor/src/celestia/fetch.rs b/crates/astria-conductor/src/celestia/fetch.rs index 503c0eef2b..4787cf1e43 100644 --- a/crates/astria-conductor/src/celestia/fetch.rs +++ b/crates/astria-conductor/src/celestia/fetch.rs @@ -26,7 +26,7 @@ use tryhard::{ RetryPolicy, }; -use crate::metrics_init::CELESTIA_BLOB_FETCH_ERROR_COUNT; +use crate::metrics::Metrics; pub(super) struct RawBlobs { pub(super) celestia_height: u64, @@ -58,14 +58,20 @@ pub(super) async fn fetch_new_blobs( celestia_height: u64, rollup_namespace: Namespace, sequencer_namespace: Namespace, + metrics: &'static Metrics, ) -> eyre::Result { let header_blobs = async { - fetch_blobs_with_retry(client.clone(), celestia_height, sequencer_namespace) - .await - .wrap_err("failed to fetch header blobs") + fetch_blobs_with_retry( + client.clone(), + celestia_height, + sequencer_namespace, + metrics, + ) + .await + .wrap_err("failed to fetch header blobs") }; let rollup_blobs = async { - fetch_blobs_with_retry(client.clone(), celestia_height, rollup_namespace) + fetch_blobs_with_retry(client.clone(), celestia_height, rollup_namespace, metrics) .await .wrap_err("failed to fetch rollup blobs") }; @@ -83,6 +89,7 @@ async fn fetch_blobs_with_retry( client: CelestiaClient, height: u64, namespace: Namespace, + metrics: &'static Metrics, ) -> eyre::Result> { use celestia_rpc::BlobClient as _; @@ -102,7 +109,7 @@ async fn fetch_blobs_with_retry( error = error as &dyn std::error::Error, "attempt to fetch Celestia Blobs failed; retrying after delay", ); - metrics::counter!(CELESTIA_BLOB_FETCH_ERROR_COUNT).increment(1); + metrics.increment_celestia_blob_fetch_error_count(); futures::future::ready(()) }, ); diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 9db1dfa6c8..96e57eea20 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -23,7 +23,6 @@ use futures::{ FutureExt as _, }; use jsonrpsee::http_client::HttpClient as CelestiaClient; -use metrics::histogram; use sequencer_client::{ tendermint, tendermint::block::Height as SequencerHeight, @@ -61,15 +60,7 @@ use crate::{ FirmTrySendError, StateIsInit, }, - metrics_init::{ - BLOBS_PER_CELESTIA_FETCH, - DECODED_ITEMS_PER_CELESTIA_FETCH, - NAMESPACE_TYPE_LABEL, - NAMESPACE_TYPE_METADATA, - NAMESPACE_TYPE_ROLLUP_DATA, - SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, - SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, - }, + metrics::Metrics, utils::flatten, }; @@ -148,6 +139,8 @@ pub(crate) struct Reader { /// Token to listen for Conductor being shut down. shutdown: CancellationToken, + + metrics: &'static Metrics, } impl Reader { @@ -250,6 +243,8 @@ struct RunningReader { /// The Celestia namespace for which Sequencer header blobs will be requested. Derived from /// `sequencer_chain_id`. sequencer_namespace: Namespace, + + metrics: &'static Metrics, } impl RunningReader { @@ -264,6 +259,7 @@ impl RunningReader { sequencer_cometbft_client, shutdown, sequencer_requests_per_second, + metrics, .. } = exposed_reader; let block_cache = @@ -302,6 +298,7 @@ impl RunningReader { rollup_namespace, sequencer_chain_id, sequencer_namespace, + metrics, }) } @@ -428,6 +425,7 @@ impl RunningReader { rollup_namespace: self.rollup_namespace, sequencer_namespace: self.sequencer_namespace, executor: self.executor.clone(), + metrics: self.metrics, }; self.reconstruction_tasks.spawn(height, task.execute()); scheduled.push(height); @@ -500,10 +498,11 @@ struct FetchConvertVerifyAndReconstruct { rollup_namespace: Namespace, sequencer_namespace: Namespace, executor: executor::Handle, + metrics: &'static Metrics, } impl FetchConvertVerifyAndReconstruct { - #[instrument( skip_all, fields( + #[instrument(skip_all, fields( celestia_height = self.celestia_height, rollup_namespace = %base64(self.rollup_namespace.as_bytes()), sequencer_namespace = %base64(self.sequencer_namespace.as_bytes()), @@ -517,6 +516,7 @@ impl FetchConvertVerifyAndReconstruct { rollup_namespace, sequencer_namespace, executor, + metrics, } = self; let new_blobs = fetch_new_blobs( @@ -524,30 +524,13 @@ impl FetchConvertVerifyAndReconstruct { celestia_height, rollup_namespace, sequencer_namespace, + metrics, ) .await .wrap_err("failed fetching blobs from Celestia")?; - { - // allow: histograms require f64; precision loss would be no problem - #![allow(clippy::cast_precision_loss)] - histogram!( - BLOBS_PER_CELESTIA_FETCH, - NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_METADATA, - ) - .record(new_blobs.len_header_blobs() as f64); - } - - { - // allow: histograms require f64; precision loss would be no problem - #![allow(clippy::cast_precision_loss)] - histogram!( - BLOBS_PER_CELESTIA_FETCH, - NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_ROLLUP_DATA, - ) - .record(new_blobs.len_rollup_blobs() as f64); - } - + metrics.record_metadata_blobs_per_celestia_fetch(new_blobs.len_header_blobs()); + metrics.record_rollup_data_blobs_per_celestia_fetch(new_blobs.len_rollup_blobs()); info!( number_of_metadata_blobs = new_blobs.len_header_blobs(), number_of_rollup_blobs = new_blobs.len_rollup_blobs(), @@ -562,26 +545,10 @@ impl FetchConvertVerifyAndReconstruct { .await .wrap_err("encountered panic while decoding raw Celestia blobs")?; - { - // allow: histograms require f64; precision loss would be no problem - #![allow(clippy::cast_precision_loss)] - histogram!( - DECODED_ITEMS_PER_CELESTIA_FETCH, - NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_METADATA, - ) - .record(decoded_blobs.len_headers() as f64); - } - - { - // allow: histograms require f64; precision loss would be no problem - #![allow(clippy::cast_precision_loss)] - histogram!( - DECODED_ITEMS_PER_CELESTIA_FETCH, - NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_ROLLUP_DATA, - ) - .record(decoded_blobs.len_rollup_data_entries() as f64); - } - + metrics.record_decoded_metadata_items_per_celestia_fetch(decoded_blobs.len_headers()); + metrics.record_decoded_rollup_data_items_per_celestia_fetch( + decoded_blobs.len_rollup_data_entries(), + ); info!( number_of_metadata_blobs = decoded_blobs.len_headers(), number_of_rollup_blobs = decoded_blobs.len_rollup_data_entries(), @@ -590,13 +557,9 @@ impl FetchConvertVerifyAndReconstruct { let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, executor).await; - { - // allow: histograms require f64; precision loss would be no problem - #![allow(clippy::cast_precision_loss)] - histogram!(SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH,) - .record(verified_blobs.len_header_blobs() as f64); - } - + metrics.record_sequencer_blocks_metadata_verified_per_celestia_fetch( + verified_blobs.len_header_blobs(), + ); info!( number_of_verified_header_blobs = verified_blobs.len_header_blobs(), number_of_rollup_blobs = verified_blobs.len_rollup_blobs(), @@ -610,19 +573,14 @@ impl FetchConvertVerifyAndReconstruct { }) .await .wrap_err("encountered panic while reconstructing blocks from verified blobs")?; - - { - // allow: histograms require f64; precision loss would be no problem - #![allow(clippy::cast_precision_loss)] - histogram!(SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH,) - .record(reconstructed.len() as f64); - } - let reconstructed_blocks = ReconstructedBlocks { celestia_height, blocks: reconstructed, }; + metrics.record_sequencer_block_information_reconstructed_per_celestia_fetch( + reconstructed_blocks.blocks.len(), + ); info!( number_of_final_reconstructed_blocks = reconstructed_blocks.blocks.len(), blocks = %json(&ReportReconstructedBlocks(&reconstructed_blocks)), diff --git a/crates/astria-conductor/src/conductor.rs b/crates/astria-conductor/src/conductor.rs index 4945fae975..b924294db2 100644 --- a/crates/astria-conductor/src/conductor.rs +++ b/crates/astria-conductor/src/conductor.rs @@ -1,5 +1,6 @@ use std::{ future::Future, + sync::OnceLock, time::Duration, }; @@ -29,6 +30,7 @@ use tracing::{ use crate::{ celestia, executor, + metrics::Metrics, sequencer, utils::flatten, Config, @@ -94,6 +96,9 @@ impl Conductor { /// actors could not be spawned (executor, sequencer reader, or data availability reader). /// This usually happens if the actors failed to connect to their respective endpoints. pub fn new(cfg: Config) -> eyre::Result { + static METRICS: OnceLock = OnceLock::new(); + let metrics = METRICS.get_or_init(Metrics::new); + let mut tasks = JoinMap::new(); let sequencer_cometbft_client = HttpClient::new(&*cfg.sequencer_cometbft_url) @@ -107,6 +112,7 @@ impl Conductor { mode: cfg.execution_commit_level, rollup_address: cfg.execution_rpc_url, shutdown: shutdown.clone(), + metrics, } .build() .wrap_err("failed constructing executor")?; @@ -143,6 +149,7 @@ impl Conductor { sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_requests_per_second: cfg.sequencer_requests_per_second, shutdown: shutdown.clone(), + metrics, } .build() .wrap_err("failed to build Celestia Reader")?; diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index 67941479a0..62559f30b9 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -13,12 +13,16 @@ use super::{ Handle, StateNotInit, }; -use crate::config::CommitLevel; +use crate::{ + config::CommitLevel, + metrics::Metrics, +}; pub(crate) struct Builder { pub(crate) mode: CommitLevel, pub(crate) rollup_address: String, pub(crate) shutdown: CancellationToken, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -27,6 +31,7 @@ impl Builder { mode, rollup_address, shutdown, + metrics, } = self; let client = super::client::Client::connect_lazy(&rollup_address).wrap_err_with(|| { @@ -67,6 +72,7 @@ impl Builder { blocks_pending_finalization: HashMap::new(), max_spread: None, + metrics, }; let handle = Handle { firm_blocks: firm_block_tx, diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 56eb5abbf5..2d55ce1fd7 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -40,11 +40,7 @@ use tracing::{ use crate::{ celestia::ReconstructedBlock, config::CommitLevel, - metrics_init::{ - EXECUTED_FIRM_BLOCK_NUMBER, - EXECUTED_SOFT_BLOCK_NUMBER, - TRANSACTIONS_PER_EXECUTED_BLOCK, - }, + metrics::Metrics, }; mod builder; @@ -112,7 +108,7 @@ pub(crate) enum SoftTrySendError { /// A handle to the executor. /// -/// To be be useful, [`Handle::wait_for_init`] must be called in +/// To be useful, [`Handle::wait_for_init`] must be called in /// order to obtain a [`Handle`]. This is to ensure that the executor /// state was primed before using its other methods. See [`State`] for more /// information. @@ -251,6 +247,8 @@ pub(crate) struct Executor { /// The maximum permitted spread between firm and soft blocks. max_spread: Option, + + metrics: &'static Metrics, } impl Executor { @@ -456,8 +454,8 @@ impl Executor { // XXX: We set an absolute number value here to avoid any potential issues of the remote // rollup state and the local state falling out of lock-step. - metrics::counter!(crate::metrics_init::EXECUTED_SOFT_BLOCK_NUMBER) - .absolute(block_number.into()); + self.metrics + .absolute_set_executed_soft_block_number(block_number); Ok(()) } @@ -531,8 +529,8 @@ impl Executor { // XXX: We set an absolute number value here to avoid any potential issues of the remote // rollup state and the local state falling out of lock-step. - metrics::counter!(crate::metrics_init::EXECUTED_FIRM_BLOCK_NUMBER) - .absolute(block_number.into()); + self.metrics + .absolute_set_executed_soft_block_number(block_number); Ok(()) } @@ -558,9 +556,7 @@ impl Executor { .. } = block; - // allow: used for recording a histogram, which requires f64. - #[allow(clippy::cast_precision_loss)] - let n_transactions = transactions.len() as f64; + let n_transactions = transactions.len(); let executed_block = self .client @@ -568,7 +564,8 @@ impl Executor { .await .wrap_err("failed to run execute_block RPC")?; - metrics::histogram!(TRANSACTIONS_PER_EXECUTED_BLOCK).record(n_transactions); + self.metrics + .record_transactions_per_executed_block(n_transactions); info!( executed_block.hash = %telemetry::display::base64(&executed_block.hash()), @@ -604,8 +601,10 @@ impl Executor { .try_init(genesis_info, commitment_state) .wrap_err("failed initializing state tracking")?; - metrics::counter!(EXECUTED_FIRM_BLOCK_NUMBER).absolute(self.state.firm_number().into()); - metrics::counter!(EXECUTED_SOFT_BLOCK_NUMBER).absolute(self.state.soft_number().into()); + self.metrics + .absolute_set_executed_firm_block_number(self.state.firm_number()); + self.metrics + .absolute_set_executed_soft_block_number(self.state.soft_number()); info!( initial_state = serde_json::to_string(&*self.state.get()) .expect("writing json to a string should not fail"), diff --git a/crates/astria-conductor/src/lib.rs b/crates/astria-conductor/src/lib.rs index 257b922d7c..ff40fef07f 100644 --- a/crates/astria-conductor/src/lib.rs +++ b/crates/astria-conductor/src/lib.rs @@ -14,7 +14,7 @@ pub(crate) mod celestia; pub mod conductor; pub mod config; pub(crate) mod executor; -pub mod metrics_init; +pub(crate) mod metrics; pub(crate) mod sequencer; mod utils; diff --git a/crates/astria-conductor/src/main.rs b/crates/astria-conductor/src/main.rs index 4efb856762..7468e0ee2a 100644 --- a/crates/astria-conductor/src/main.rs +++ b/crates/astria-conductor/src/main.rs @@ -1,7 +1,6 @@ use std::process::ExitCode; use astria_conductor::{ - metrics_init, Conductor, Config, BUILD_INFO, @@ -54,8 +53,7 @@ async fn main() -> ExitCode { if !cfg.no_metrics { telemetry_conf = telemetry_conf .metrics_addr(&cfg.metrics_http_listener_addr) - .service_name(env!("CARGO_PKG_NAME")) - .register_metrics(metrics_init::register); + .service_name(env!("CARGO_PKG_NAME")); } let _telemetry_guard = match telemetry_conf diff --git a/crates/astria-conductor/src/metrics.rs b/crates/astria-conductor/src/metrics.rs new file mode 100644 index 0000000000..d992611e23 --- /dev/null +++ b/crates/astria-conductor/src/metrics.rs @@ -0,0 +1,251 @@ +use metrics::{ + counter, + describe_counter, + describe_histogram, + histogram, + Counter, + Histogram, + Unit, +}; +use telemetry::metric_names; + +const NAMESPACE_TYPE_LABEL: &str = "namespace_type"; +const NAMESPACE_TYPE_METADATA: &str = "metadata"; +const NAMESPACE_TYPE_ROLLUP_DATA: &str = "rollup_data"; + +pub(crate) struct Metrics { + metadata_blobs_per_celestia_fetch: Histogram, + rollup_data_blobs_per_celestia_fetch: Histogram, + celestia_blob_fetch_error_count: Counter, + decoded_metadata_items_per_celestia_fetch: Histogram, + decoded_rollup_data_items_per_celestia_fetch: Histogram, + sequencer_blocks_metadata_verified_per_celestia_fetch: Histogram, + sequencer_block_information_reconstructed_per_celestia_fetch: Histogram, + executed_firm_block_number: Counter, + executed_soft_block_number: Counter, + transactions_per_executed_block: Histogram, +} + +impl Metrics { + #[must_use] + pub(crate) fn new() -> Self { + describe_histogram!( + BLOBS_PER_CELESTIA_FETCH, + Unit::Count, + "The number of Celestia blobs received per request sent" + ); + let metadata_blobs_per_celestia_fetch = histogram!( + BLOBS_PER_CELESTIA_FETCH, + NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_METADATA, + ); + let rollup_data_blobs_per_celestia_fetch = histogram!( + BLOBS_PER_CELESTIA_FETCH, + NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_ROLLUP_DATA, + ); + + describe_counter!( + CELESTIA_BLOB_FETCH_ERROR_COUNT, + Unit::Count, + "The number of calls made to fetch a blob from Celestia which have failed" + ); + let celestia_blob_fetch_error_count = counter!(CELESTIA_BLOB_FETCH_ERROR_COUNT); + + describe_histogram!( + DECODED_ITEMS_PER_CELESTIA_FETCH, + Unit::Count, + "The number of items decoded from the Celestia blobs received per request sent" + ); + let decoded_metadata_items_per_celestia_fetch = histogram!( + DECODED_ITEMS_PER_CELESTIA_FETCH, + NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_METADATA, + ); + let decoded_rollup_data_items_per_celestia_fetch = histogram!( + DECODED_ITEMS_PER_CELESTIA_FETCH, + NAMESPACE_TYPE_LABEL => NAMESPACE_TYPE_ROLLUP_DATA, + ); + + describe_histogram!( + SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, + Unit::Count, + "The number of sequencer blocks in a single Celestia blob fetch whose metadata was \ + verified" + ); + let sequencer_blocks_metadata_verified_per_celestia_fetch = + histogram!(SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH); + + describe_histogram!( + SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, + Unit::Count, + "The number of sequencer blocks (or specifically, the subset pertaining to the \ + rollup) reconstructed from a single Celestia blob fetch" + ); + let sequencer_block_information_reconstructed_per_celestia_fetch = + histogram!(SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH); + + describe_counter!( + EXECUTED_FIRM_BLOCK_NUMBER, + Unit::Count, + "The number/rollup height of the last executed or confirmed firm block" + ); + let executed_firm_block_number = counter!(EXECUTED_FIRM_BLOCK_NUMBER); + + describe_counter!( + EXECUTED_SOFT_BLOCK_NUMBER, + Unit::Count, + "The number/rollup height of the last executed soft block" + ); + let executed_soft_block_number = counter!(EXECUTED_SOFT_BLOCK_NUMBER); + + describe_histogram!( + TRANSACTIONS_PER_EXECUTED_BLOCK, + Unit::Count, + "The number of transactions that were included in the latest block executed against \ + the rollup" + ); + let transactions_per_executed_block = histogram!(TRANSACTIONS_PER_EXECUTED_BLOCK); + + Self { + metadata_blobs_per_celestia_fetch, + rollup_data_blobs_per_celestia_fetch, + celestia_blob_fetch_error_count, + decoded_metadata_items_per_celestia_fetch, + decoded_rollup_data_items_per_celestia_fetch, + sequencer_blocks_metadata_verified_per_celestia_fetch, + sequencer_block_information_reconstructed_per_celestia_fetch, + executed_firm_block_number, + executed_soft_block_number, + transactions_per_executed_block, + } + } + + pub(crate) fn record_metadata_blobs_per_celestia_fetch(&self, blob_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.metadata_blobs_per_celestia_fetch + .record(blob_count as f64); + } + + pub(crate) fn record_rollup_data_blobs_per_celestia_fetch(&self, blob_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.rollup_data_blobs_per_celestia_fetch + .record(blob_count as f64); + } + + pub(crate) fn increment_celestia_blob_fetch_error_count(&self) { + self.celestia_blob_fetch_error_count.increment(1); + } + + pub(crate) fn record_decoded_metadata_items_per_celestia_fetch(&self, item_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.decoded_metadata_items_per_celestia_fetch + .record(item_count as f64); + } + + pub(crate) fn record_decoded_rollup_data_items_per_celestia_fetch(&self, item_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.decoded_rollup_data_items_per_celestia_fetch + .record(item_count as f64); + } + + pub(crate) fn record_sequencer_blocks_metadata_verified_per_celestia_fetch( + &self, + block_count: usize, + ) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.sequencer_blocks_metadata_verified_per_celestia_fetch + .record(block_count as f64); + } + + pub(crate) fn record_sequencer_block_information_reconstructed_per_celestia_fetch( + &self, + block_count: usize, + ) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.sequencer_block_information_reconstructed_per_celestia_fetch + .record(block_count as f64); + } + + pub(crate) fn absolute_set_executed_firm_block_number(&self, block_number: u32) { + self.executed_firm_block_number + .absolute(u64::from(block_number)); + } + + pub(crate) fn absolute_set_executed_soft_block_number(&self, block_number: u32) { + self.executed_soft_block_number + .absolute(u64::from(block_number)); + } + + pub(crate) fn record_transactions_per_executed_block(&self, tx_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.transactions_per_executed_block.record(tx_count as f64); + } +} + +metric_names!(pub const METRICS_NAMES: + BLOBS_PER_CELESTIA_FETCH, + CELESTIA_BLOB_FETCH_ERROR_COUNT, + DECODED_ITEMS_PER_CELESTIA_FETCH, + SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, + SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, + + EXECUTED_FIRM_BLOCK_NUMBER, + EXECUTED_SOFT_BLOCK_NUMBER, + TRANSACTIONS_PER_EXECUTED_BLOCK +); + +#[cfg(test)] +mod tests { + use super::TRANSACTIONS_PER_EXECUTED_BLOCK; + use crate::metrics::{ + BLOBS_PER_CELESTIA_FETCH, + CELESTIA_BLOB_FETCH_ERROR_COUNT, + DECODED_ITEMS_PER_CELESTIA_FETCH, + EXECUTED_FIRM_BLOCK_NUMBER, + EXECUTED_SOFT_BLOCK_NUMBER, + SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, + SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, + }; + + #[track_caller] + fn assert_const(actual: &'static str, suffix: &str) { + // XXX: hard-code this so the crate name isn't accidentally changed. + const CRATE_NAME: &str = "astria_conductor"; + let expected = format!("{CRATE_NAME}_{suffix}"); + assert_eq!(expected, actual); + } + + #[test] + fn metrics_are_as_expected() { + assert_const(BLOBS_PER_CELESTIA_FETCH, "blobs_per_celestia_fetch"); + assert_const( + CELESTIA_BLOB_FETCH_ERROR_COUNT, + "celestia_blob_fetch_error_count", + ); + assert_const( + DECODED_ITEMS_PER_CELESTIA_FETCH, + "decoded_items_per_celestia_fetch", + ); + + assert_const( + SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, + "sequencer_blocks_metadata_verified_per_celestia_fetch", + ); + + assert_const( + SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, + "sequencer_block_information_reconstructed_per_celestia_fetch", + ); + assert_const(EXECUTED_FIRM_BLOCK_NUMBER, "executed_firm_block_number"); + assert_const(EXECUTED_SOFT_BLOCK_NUMBER, "executed_soft_block_number"); + assert_const( + TRANSACTIONS_PER_EXECUTED_BLOCK, + "transactions_per_executed_block", + ); + } +} diff --git a/crates/astria-conductor/src/metrics_init.rs b/crates/astria-conductor/src/metrics_init.rs deleted file mode 100644 index ce16714fd8..0000000000 --- a/crates/astria-conductor/src/metrics_init.rs +++ /dev/null @@ -1,123 +0,0 @@ -//! Crate-specific metrics functionality. -//! -//! Registers metrics & lists constants to be used as metric names throughout crate. - -use metrics::{ - describe_counter, - describe_histogram, - Unit, -}; -use telemetry::metric_names; - -pub(crate) const NAMESPACE_TYPE_LABEL: &str = "namespace_type"; -pub(crate) const NAMESPACE_TYPE_METADATA: &str = "metadata"; -pub(crate) const NAMESPACE_TYPE_ROLLUP_DATA: &str = "rollup_data"; - -pub fn register() { - describe_histogram!( - BLOBS_PER_CELESTIA_FETCH, - Unit::Count, - "The number of Celestia blobs received per request sent" - ); - - describe_counter!( - CELESTIA_BLOB_FETCH_ERROR_COUNT, - Unit::Count, - "The number of calls made to fetch a blob from Celestia which have failed" - ); - - describe_histogram!( - DECODED_ITEMS_PER_CELESTIA_FETCH, - Unit::Count, - "The number of items decoded from the Celestia blobs received per request sent" - ); - - describe_counter!( - EXECUTED_FIRM_BLOCK_NUMBER, - Unit::Count, - "The number/rollup height of the last executed or confirmed firm block" - ); - - describe_counter!( - EXECUTED_SOFT_BLOCK_NUMBER, - Unit::Count, - "The number/rollup height of the last executed soft block" - ); - - describe_histogram!( - SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, - Unit::Count, - "The number of sequencer blocks (or specifically, the subset pertaining to the rollup) \ - reconstructed from a single Celestia blob fetch" - ); - - describe_histogram!( - TRANSACTIONS_PER_EXECUTED_BLOCK, - Unit::Count, - "The number of transactions that were included in the latest block executed against the \ - rollup" - ); -} - -metric_names!(pub const METRICS_NAMES: - BLOBS_PER_CELESTIA_FETCH, - CELESTIA_BLOB_FETCH_ERROR_COUNT, - DECODED_ITEMS_PER_CELESTIA_FETCH, - SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, - SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, - - EXECUTED_FIRM_BLOCK_NUMBER, - EXECUTED_SOFT_BLOCK_NUMBER, - TRANSACTIONS_PER_EXECUTED_BLOCK -); - -#[cfg(test)] -mod tests { - use super::TRANSACTIONS_PER_EXECUTED_BLOCK; - use crate::metrics_init::{ - BLOBS_PER_CELESTIA_FETCH, - CELESTIA_BLOB_FETCH_ERROR_COUNT, - DECODED_ITEMS_PER_CELESTIA_FETCH, - EXECUTED_FIRM_BLOCK_NUMBER, - EXECUTED_SOFT_BLOCK_NUMBER, - SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, - SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, - }; - - #[track_caller] - fn assert_const(actual: &'static str, suffix: &str) { - // XXX: hard-code this so the crate name isn't accidentally changed. - const CRATE_NAME: &str = "astria_conductor"; - let expected = format!("{CRATE_NAME}_{suffix}"); - assert_eq!(expected, actual); - } - - #[test] - fn metrics_are_as_expected() { - assert_const(BLOBS_PER_CELESTIA_FETCH, "blobs_per_celestia_fetch"); - assert_const( - CELESTIA_BLOB_FETCH_ERROR_COUNT, - "celestia_blob_fetch_error_count", - ); - assert_const( - DECODED_ITEMS_PER_CELESTIA_FETCH, - "decoded_items_per_celestia_fetch", - ); - - assert_const( - SEQUENCER_BLOCKS_METADATA_VERIFIED_PER_CELESTIA_FETCH, - "sequencer_blocks_metadata_verified_per_celestia_fetch", - ); - - assert_const( - SEQUENCER_BLOCK_INFORMATION_RECONSTRUCTED_PER_CELESTIA_FETCH, - "sequencer_block_information_reconstructed_per_celestia_fetch", - ); - assert_const(EXECUTED_FIRM_BLOCK_NUMBER, "executed_firm_block_number"); - assert_const(EXECUTED_SOFT_BLOCK_NUMBER, "executed_soft_block_number"); - assert_const( - TRANSACTIONS_PER_EXECUTED_BLOCK, - "transactions_per_executed_block", - ); - } -} diff --git a/crates/astria-sequencer-relayer/src/lib.rs b/crates/astria-sequencer-relayer/src/lib.rs index 0f93186249..52d922708e 100644 --- a/crates/astria-sequencer-relayer/src/lib.rs +++ b/crates/astria-sequencer-relayer/src/lib.rs @@ -1,7 +1,7 @@ pub(crate) mod api; mod build_info; pub mod config; -pub mod metrics_init; +pub(crate) mod metrics; pub(crate) mod relayer; pub mod sequencer_relayer; pub(crate) mod utils; diff --git a/crates/astria-sequencer-relayer/src/main.rs b/crates/astria-sequencer-relayer/src/main.rs index 1f5a7df973..abd203a253 100644 --- a/crates/astria-sequencer-relayer/src/main.rs +++ b/crates/astria-sequencer-relayer/src/main.rs @@ -2,7 +2,6 @@ use std::process::ExitCode; use astria_eyre::eyre::WrapErr as _; use astria_sequencer_relayer::{ - metrics_init, Config, SequencerRelayer, BUILD_INFO, @@ -35,8 +34,7 @@ async fn main() -> ExitCode { if !cfg.no_metrics { telemetry_conf = telemetry_conf .metrics_addr(&cfg.metrics_http_listener_addr) - .service_name(env!("CARGO_PKG_NAME")) - .register_metrics(metrics_init::register); + .service_name(env!("CARGO_PKG_NAME")); } let _telemetry_guard = match telemetry_conf diff --git a/crates/astria-sequencer-relayer/src/metrics.rs b/crates/astria-sequencer-relayer/src/metrics.rs new file mode 100644 index 0000000000..1ab32eb3ab --- /dev/null +++ b/crates/astria-sequencer-relayer/src/metrics.rs @@ -0,0 +1,263 @@ +use std::time::Duration; + +use metrics::{ + counter, + describe_counter, + describe_gauge, + describe_histogram, + gauge, + histogram, + Counter, + Gauge, + Histogram, + Unit, +}; +use telemetry::metric_names; + +pub(crate) struct Metrics { + celestia_submission_height: Counter, + celestia_submission_count: Counter, + celestia_submission_failure_count: Counter, + blocks_per_celestia_tx: Histogram, + blobs_per_celestia_tx: Histogram, + bytes_per_celestia_tx: Histogram, + celestia_payload_creation_latency: Histogram, + celestia_submission_latency: Histogram, + sequencer_block_fetch_failure_count: Counter, + sequencer_height_fetch_failure_count: Counter, + sequencer_submission_height: Counter, + compression_ratio_for_astria_block: Gauge, +} + +impl Metrics { + #[must_use] + pub(crate) fn new() -> Self { + describe_counter!( + CELESTIA_SUBMISSION_HEIGHT, + Unit::Count, + "The height of the last blob successfully submitted to Celestia" + ); + let celestia_submission_height = counter!(CELESTIA_SUBMISSION_HEIGHT); + + describe_counter!( + CELESTIA_SUBMISSION_COUNT, + Unit::Count, + "The number of calls made to submit to Celestia" + ); + let celestia_submission_count = counter!(CELESTIA_SUBMISSION_COUNT); + + describe_counter!( + CELESTIA_SUBMISSION_FAILURE_COUNT, + Unit::Count, + "The number of calls made to submit to Celestia which have failed" + ); + let celestia_submission_failure_count = counter!(CELESTIA_SUBMISSION_FAILURE_COUNT); + + describe_histogram!( + BLOCKS_PER_CELESTIA_TX, + Unit::Count, + "The number of Astria blocks per Celestia submission" + ); + let blocks_per_celestia_tx = histogram!(BLOCKS_PER_CELESTIA_TX); + + describe_histogram!( + BLOBS_PER_CELESTIA_TX, + Unit::Count, + "The number of blobs (Astria Sequencer blocks converted to Celestia blobs) per \ + Celestia submission" + ); + let blobs_per_celestia_tx = histogram!(BLOBS_PER_CELESTIA_TX); + + describe_histogram!( + BYTES_PER_CELESTIA_TX, + Unit::Bytes, + "The total number of payload bytes (Astria Sequencer blocks converted to Celestia \ + blobs) per Celestia submission" + ); + let bytes_per_celestia_tx = histogram!(BYTES_PER_CELESTIA_TX); + + describe_histogram!( + CELESTIA_PAYLOAD_CREATION_LATENCY, + Unit::Seconds, + "The time it takes to create a new payload for submitting to Celestia (encoding to \ + protobuf, compression, creating blobs)" + ); + let celestia_payload_creation_latency = histogram!(CELESTIA_PAYLOAD_CREATION_LATENCY); + + describe_histogram!( + CELESTIA_SUBMISSION_LATENCY, + Unit::Seconds, + "The time it takes to submit a blob to Celestia" + ); + let celestia_submission_latency = histogram!(CELESTIA_SUBMISSION_LATENCY); + + describe_counter!( + SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, + Unit::Count, + "The number of calls made to fetch a block from sequencer which have failed" + ); + let sequencer_block_fetch_failure_count = counter!(SEQUENCER_BLOCK_FETCH_FAILURE_COUNT); + + describe_counter!( + SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, + Unit::Count, + "The number of calls made to fetch the current height from sequencer which have failed" + ); + let sequencer_height_fetch_failure_count = counter!(SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT); + + describe_counter!( + SEQUENCER_SUBMISSION_HEIGHT, + Unit::Count, + "The height of the highest sequencer block successfully submitted to Celestia" + ); + let sequencer_submission_height = counter!(SEQUENCER_SUBMISSION_HEIGHT); + + describe_gauge!( + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, + Unit::Count, + "Ratio of uncompressed:compressed data size for all `blob.data`s in an Astria block" + ); + let compression_ratio_for_astria_block = gauge!(COMPRESSION_RATIO_FOR_ASTRIA_BLOCK); + + Self { + celestia_submission_height, + celestia_submission_count, + celestia_submission_failure_count, + blocks_per_celestia_tx, + blobs_per_celestia_tx, + bytes_per_celestia_tx, + celestia_payload_creation_latency, + celestia_submission_latency, + sequencer_block_fetch_failure_count, + sequencer_height_fetch_failure_count, + sequencer_submission_height, + compression_ratio_for_astria_block, + } + } + + pub(crate) fn absolute_set_celestia_submission_height(&self, height: u64) { + self.celestia_submission_height.absolute(height); + } + + pub(crate) fn increment_celestia_submission_count(&self) { + self.celestia_submission_count.increment(1); + } + + pub(crate) fn increment_celestia_submission_failure_count(&self) { + self.celestia_submission_failure_count.increment(1); + } + + pub(crate) fn record_blocks_per_celestia_tx(&self, block_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.blocks_per_celestia_tx.record(block_count as f64); + } + + pub(crate) fn record_blobs_per_celestia_tx(&self, blob_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.blobs_per_celestia_tx.record(blob_count as f64); + } + + pub(crate) fn record_bytes_per_celestia_tx(&self, byte_count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.bytes_per_celestia_tx.record(byte_count as f64); + } + + pub(crate) fn record_celestia_payload_creation_latency(&self, latency: Duration) { + self.celestia_payload_creation_latency.record(latency); + } + + pub(crate) fn record_celestia_submission_latency(&self, latency: Duration) { + self.celestia_submission_latency.record(latency); + } + + pub(crate) fn increment_sequencer_block_fetch_failure_count(&self) { + self.sequencer_block_fetch_failure_count.increment(1); + } + + pub(crate) fn increment_sequencer_height_fetch_failure_count(&self) { + self.sequencer_height_fetch_failure_count.increment(1); + } + + pub(crate) fn absolute_set_sequencer_submission_height(&self, height: u64) { + self.sequencer_submission_height.absolute(height); + } + + pub(crate) fn set_compression_ratio_for_astria_block(&self, ratio: f64) { + self.compression_ratio_for_astria_block.set(ratio); + } +} + +metric_names!(pub const METRICS_NAMES: + CELESTIA_SUBMISSION_HEIGHT, + CELESTIA_SUBMISSION_COUNT, + CELESTIA_SUBMISSION_FAILURE_COUNT, + BLOCKS_PER_CELESTIA_TX, + BLOBS_PER_CELESTIA_TX, + BYTES_PER_CELESTIA_TX, + CELESTIA_PAYLOAD_CREATION_LATENCY, + CELESTIA_SUBMISSION_LATENCY, + SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, + SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, + SEQUENCER_SUBMISSION_HEIGHT, + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK +); + +#[cfg(test)] +mod tests { + use super::{ + BLOBS_PER_CELESTIA_TX, + BLOCKS_PER_CELESTIA_TX, + BYTES_PER_CELESTIA_TX, + CELESTIA_PAYLOAD_CREATION_LATENCY, + CELESTIA_SUBMISSION_COUNT, + CELESTIA_SUBMISSION_FAILURE_COUNT, + CELESTIA_SUBMISSION_HEIGHT, + CELESTIA_SUBMISSION_LATENCY, + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, + SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, + SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, + SEQUENCER_SUBMISSION_HEIGHT, + }; + + #[track_caller] + fn assert_const(actual: &'static str, suffix: &str) { + // XXX: hard-code this so the crate name isn't accidentally changed. + const CRATE_NAME: &str = "astria_sequencer_relayer"; + let expected = format!("{CRATE_NAME}_{suffix}"); + assert_eq!(expected, actual); + } + + #[test] + fn metrics_are_as_expected() { + assert_const(CELESTIA_SUBMISSION_HEIGHT, "celestia_submission_height"); + assert_const(CELESTIA_SUBMISSION_COUNT, "celestia_submission_count"); + assert_const( + CELESTIA_SUBMISSION_FAILURE_COUNT, + "celestia_submission_failure_count", + ); + assert_const(BLOCKS_PER_CELESTIA_TX, "blocks_per_celestia_tx"); + assert_const(BLOBS_PER_CELESTIA_TX, "blobs_per_celestia_tx"); + assert_const(BYTES_PER_CELESTIA_TX, "bytes_per_celestia_tx"); + assert_const( + CELESTIA_PAYLOAD_CREATION_LATENCY, + "celestia_payload_creation_latency", + ); + assert_const(CELESTIA_SUBMISSION_LATENCY, "celestia_submission_latency"); + assert_const( + SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, + "sequencer_block_fetch_failure_count", + ); + assert_const( + SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, + "sequencer_height_fetch_failure_count", + ); + assert_const(SEQUENCER_SUBMISSION_HEIGHT, "sequencer_submission_height"); + assert_const( + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, + "compression_ratio_for_astria_block", + ); + } +} diff --git a/crates/astria-sequencer-relayer/src/metrics_init.rs b/crates/astria-sequencer-relayer/src/metrics_init.rs deleted file mode 100644 index 2b44026d03..0000000000 --- a/crates/astria-sequencer-relayer/src/metrics_init.rs +++ /dev/null @@ -1,166 +0,0 @@ -//! Crate-specific metrics functionality. -//! -//! Registers metrics & lists constants to be used as metric names throughout crate. - -use metrics::{ - describe_counter, - describe_gauge, - describe_histogram, - Unit, -}; -use telemetry::metric_names; - -/// Registers all metrics used by this crate. -pub fn register() { - describe_counter!( - CELESTIA_SUBMISSION_COUNT, - Unit::Count, - "The number of calls made to submit to Celestia" - ); - - describe_counter!( - CELESTIA_SUBMISSION_HEIGHT, - Unit::Count, - "The height of the last blob successfully submitted to Celestia" - ); - - describe_counter!( - CELESTIA_SUBMISSION_FAILURE_COUNT, - Unit::Count, - "The number of calls made to submit to Celestia which have failed" - ); - - describe_counter!( - SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, - Unit::Count, - "The number of calls made to fetch a block from sequencer which have failed" - ); - - describe_counter!( - SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, - Unit::Count, - "The number of calls made to fetch the current height from sequencer which have failed" - ); - - describe_counter!( - SEQUENCER_SUBMISSION_HEIGHT, - Unit::Count, - "The height of the highest sequencer block successfully submitted to Celestia" - ); - - describe_histogram!( - BLOCKS_PER_CELESTIA_TX, - Unit::Count, - "The number of Astria blocks per Celestia submission" - ); - - describe_histogram!( - BLOBS_PER_CELESTIA_TX, - Unit::Count, - "The number of blobs (Astria Sequencer blocks converted to Celestia blobs) per Celestia \ - submission" - ); - - describe_histogram!( - BYTES_PER_CELESTIA_TX, - Unit::Bytes, - "The total number of payload bytes (Astria Sequencer blocks converted to Celestia blobs) \ - per Celestia submission" - ); - - describe_histogram!( - CELESTIA_SUBMISSION_LATENCY, - Unit::Seconds, - "The time it takes to submit a blob to Celestia" - ); - - describe_histogram!( - CELESTIA_PAYLOAD_CREATION_LATENCY, - Unit::Microseconds, - "The time it takes to create a new payload for submitting to Celestia (encoding to \ - protobuf, compression, creating blobs)" - ); - - describe_gauge!( - COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, - Unit::Count, - "Ratio of uncompressed:compressed data size for all `blob.data`s in an Astria block" - ); -} - -// We configure buckets for manually, in order to ensure Prometheus metrics are structured as a -// Histogram, rather than as a Summary. These values are loosely based on the initial Summary -// output, and may need to be updated over time. -pub const HISTOGRAM_BUCKETS: &[f64; 5] = &[0.00001, 0.0001, 0.001, 0.01, 0.1]; - -metric_names!(pub const METRICS_NAMES: - CELESTIA_SUBMISSION_HEIGHT, - CELESTIA_SUBMISSION_COUNT, - CELESTIA_SUBMISSION_FAILURE_COUNT, - BLOCKS_PER_CELESTIA_TX, - BLOBS_PER_CELESTIA_TX, - BYTES_PER_CELESTIA_TX, - CELESTIA_PAYLOAD_CREATION_LATENCY, - CELESTIA_SUBMISSION_LATENCY, - SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, - SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, - SEQUENCER_SUBMISSION_HEIGHT, - COMPRESSION_RATIO_FOR_ASTRIA_BLOCK -); - -#[cfg(test)] -mod tests { - use super::{ - BLOBS_PER_CELESTIA_TX, - BLOCKS_PER_CELESTIA_TX, - BYTES_PER_CELESTIA_TX, - CELESTIA_PAYLOAD_CREATION_LATENCY, - CELESTIA_SUBMISSION_COUNT, - CELESTIA_SUBMISSION_FAILURE_COUNT, - CELESTIA_SUBMISSION_HEIGHT, - CELESTIA_SUBMISSION_LATENCY, - COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, - SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, - SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, - SEQUENCER_SUBMISSION_HEIGHT, - }; - - #[track_caller] - fn assert_const(actual: &'static str, suffix: &str) { - // XXX: hard-code this so the crate name isn't accidentally changed. - const CRATE_NAME: &str = "astria_sequencer_relayer"; - let expected = format!("{CRATE_NAME}_{suffix}"); - assert_eq!(expected, actual); - } - - #[test] - fn metrics_are_as_expected() { - assert_const(CELESTIA_SUBMISSION_HEIGHT, "celestia_submission_height"); - assert_const(CELESTIA_SUBMISSION_COUNT, "celestia_submission_count"); - assert_const( - CELESTIA_SUBMISSION_FAILURE_COUNT, - "celestia_submission_failure_count", - ); - assert_const(BLOCKS_PER_CELESTIA_TX, "blocks_per_celestia_tx"); - assert_const(BLOBS_PER_CELESTIA_TX, "blobs_per_celestia_tx"); - assert_const(BYTES_PER_CELESTIA_TX, "bytes_per_celestia_tx"); - assert_const( - CELESTIA_PAYLOAD_CREATION_LATENCY, - "celestia_payload_creation_latency", - ); - assert_const(CELESTIA_SUBMISSION_LATENCY, "celestia_submission_latency"); - assert_const( - SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, - "sequencer_block_fetch_failure_count", - ); - assert_const( - SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, - "sequencer_height_fetch_failure_count", - ); - assert_const(SEQUENCER_SUBMISSION_HEIGHT, "sequencer_submission_height"); - assert_const( - COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, - "compression_ratio_for_astria_block", - ); - } -} diff --git a/crates/astria-sequencer-relayer/src/relayer/builder.rs b/crates/astria-sequencer-relayer/src/relayer/builder.rs index b96ac1da2b..12a7320e7d 100644 --- a/crates/astria-sequencer-relayer/src/relayer/builder.rs +++ b/crates/astria-sequencer-relayer/src/relayer/builder.rs @@ -20,7 +20,10 @@ use super::{ CelestiaClientBuilder, CelestiaKeys, }; -use crate::IncludeRollup; +use crate::{ + metrics::Metrics, + IncludeRollup, +}; pub(crate) struct Builder { pub(crate) shutdown_token: tokio_util::sync::CancellationToken, @@ -34,6 +37,7 @@ pub(crate) struct Builder { pub(crate) rollup_filter: IncludeRollup, pub(crate) pre_submit_path: PathBuf, pub(crate) post_submit_path: PathBuf, + pub(crate) metrics: &'static Metrics, } impl Builder { @@ -51,6 +55,7 @@ impl Builder { rollup_filter, pre_submit_path, post_submit_path, + metrics, } = self; let sequencer_cometbft_client = SequencerClient::new(&*cometbft_endpoint) .wrap_err("failed constructing cometbft http client")?; @@ -86,6 +91,7 @@ impl Builder { state, pre_submit_path, post_submit_path, + metrics, }) } } diff --git a/crates/astria-sequencer-relayer/src/relayer/mod.rs b/crates/astria-sequencer-relayer/src/relayer/mod.rs index bc631abdcb..2e284323f8 100644 --- a/crates/astria-sequencer-relayer/src/relayer/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/mod.rs @@ -53,8 +53,6 @@ use tracing::{ Span, }; -use crate::IncludeRollup; - mod builder; mod celestia_client; mod read; @@ -71,8 +69,12 @@ use celestia_client::{ }; use state::State; pub(crate) use state::StateSnapshot; +use submission::SubmissionState; -use self::submission::SubmissionState; +use crate::{ + metrics::Metrics, + IncludeRollup, +}; pub(crate) struct Relayer { /// A token to notify relayer that it should shut down. @@ -101,6 +103,7 @@ pub(crate) struct Relayer { pre_submit_path: PathBuf, post_submit_path: PathBuf, + metrics: &'static Metrics, } impl Relayer { @@ -142,9 +145,10 @@ impl Relayer { self.state.clone(), submission_state, self.shutdown_token.clone(), + self.metrics, ); - let mut block_stream = read::BlockStream::builder() + let mut block_stream = read::BlockStream::builder(self.metrics) .block_time(self.sequencer_poll_period) .client(self.sequencer_grpc_client.clone()) .set_last_fetched_height(last_submitted_sequencer_height) @@ -186,8 +190,7 @@ impl Relayer { block_stream.set_latest_sequencer_height(height); } Err(error) => { - metrics::counter!(crate::metrics_init::SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT) - .increment(1); + self.metrics.increment_sequencer_height_fetch_failure_count(); self.state.set_sequencer_connected(false); warn!( %error, @@ -357,6 +360,7 @@ fn spawn_submitter( state: Arc, submission_state: SubmissionState, shutdown_token: CancellationToken, + metrics: &'static Metrics, ) -> (JoinHandle>, write::BlobSubmitterHandle) { let (submitter, handle) = write::BlobSubmitter::new( client_builder, @@ -364,6 +368,7 @@ fn spawn_submitter( state, submission_state, shutdown_token, + metrics, ); (tokio::spawn(submitter.run()), handle) } diff --git a/crates/astria-sequencer-relayer/src/relayer/read.rs b/crates/astria-sequencer-relayer/src/relayer/read.rs index f2de26da06..880f3e675f 100644 --- a/crates/astria-sequencer-relayer/src/relayer/read.rs +++ b/crates/astria-sequencer-relayer/src/relayer/read.rs @@ -36,6 +36,8 @@ use tracing::{ Span, }; +use crate::metrics::Metrics; + /// Tracks the latest sequencer height and returns the next height the stream should fetch. /// /// This type exists primarily to make it convenient to determine the next height. Accessing @@ -86,12 +88,13 @@ pin_project! { paused: bool, block_time: Duration, state: Arc, + metrics: &'static Metrics, } } impl BlockStream { - pub(super) fn builder() -> BlockStreamBuilder { - BlockStreamBuilder::new() + pub(super) fn builder(metrics: &'static Metrics) -> BlockStreamBuilder { + BlockStreamBuilder::new(metrics) } pub(super) fn set_latest_sequencer_height(&mut self, height: Height) { @@ -138,6 +141,7 @@ impl Stream for BlockStream { height, *this.block_time, this.state.clone(), + this.metrics, ) .boxed(), )); @@ -185,6 +189,7 @@ async fn fetch_block( height: Height, block_time: Duration, state: Arc, + metrics: &'static Metrics, ) -> eyre::Result { // Moving the span into `on_retry`, because tryhard spawns these in a tokio // task, losing the span. @@ -194,8 +199,7 @@ async fn fetch_block( .max_delay(block_time) .on_retry( |attempt: u32, next_delay: Option, error: &eyre::Report| { - metrics::counter!(crate::metrics_init::SEQUENCER_BLOCK_FETCH_FAILURE_COUNT) - .increment(1); + metrics.increment_sequencer_block_fetch_failure_count(); let state = Arc::clone(&state); state.set_sequencer_connected(false); @@ -260,6 +264,7 @@ pub(super) struct BlockStreamBuilder, state: TState, + metrics: &'static Metrics, } impl BlockStreamBuilder { @@ -271,6 +276,7 @@ impl BlockStreamBuilder BlockStreamBuilder BlockStreamBuilder BlockStreamBuilder BlockStreamBuilder BlockStreamBuilder BlockStreamBuilder BlockStreamBuilder Self { + fn new(metrics: &'static Metrics) -> Self { BlockStreamBuilder { block_time: NoBlockTime, client: NoClient, last_fetched_height: None, state: NoState, + metrics, } } } @@ -354,6 +368,7 @@ impl BlockStreamBuilder { client: WithClient(client), last_fetched_height, state: WithState(state), + metrics, } = self; let next = match last_fetched_height { None => { @@ -384,6 +399,7 @@ impl BlockStreamBuilder { block_time, paused: false, state, + metrics, } } } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index dd0b6aa5f4..9bead7e0c2 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -32,7 +32,10 @@ use tracing::{ warn, }; -use crate::IncludeRollup; +use crate::{ + metrics::Metrics, + IncludeRollup, +}; /// The maximum permitted payload size in bytes that relayer will send to Celestia. /// @@ -299,11 +302,11 @@ impl Input { } } -#[derive(Debug)] pub(super) struct NextSubmission { rollup_filter: IncludeRollup, input: Input, payload: Payload, + metrics: &'static Metrics, } #[derive(Debug, thiserror::Error)] @@ -324,11 +327,12 @@ pub(super) enum TryAddError { } impl NextSubmission { - pub(super) fn new(rollup_filter: IncludeRollup) -> Self { + pub(super) fn new(rollup_filter: IncludeRollup, metrics: &'static Metrics) -> Self { Self { rollup_filter, input: Input::new(), payload: Payload::new(), + metrics, } } @@ -344,8 +348,8 @@ impl NextSubmission { let payload_creation_start = std::time::Instant::now(); let payload_candidate = input_candidate.clone().try_into_payload()?; - metrics::histogram!(crate::metrics_init::CELESTIA_PAYLOAD_CREATION_LATENCY) - .record(payload_creation_start.elapsed()); + self.metrics + .record_celestia_payload_creation_latency(payload_creation_start.elapsed()); if payload_candidate.compressed_size <= MAX_PAYLOAD_SIZE_BYTES { self.input = input_candidate; @@ -489,6 +493,7 @@ mod tests { NextSubmission, }; use crate::{ + metrics::Metrics, relayer::write::conversion::{ TryAddError, MAX_PAYLOAD_SIZE_BYTES, @@ -500,6 +505,10 @@ mod tests { IncludeRollup::parse("").unwrap() } + fn metrics() -> &'static Metrics { + Box::leak(Box::new(Metrics::new())) + } + fn block(height: u32) -> SequencerBlock { ConfigureSequencerBlock { chain_id: Some("sequencer-0".to_string()), @@ -531,7 +540,7 @@ mod tests { #[tokio::test] async fn add_sequencer_block_to_empty_next_submission() { - let mut next_submission = NextSubmission::new(include_all_rollups()); + let mut next_submission = NextSubmission::new(include_all_rollups(), metrics()); next_submission.try_add(block(1)).unwrap(); let submission = next_submission.take().await.unwrap(); assert_eq!(1, submission.num_blocks()); @@ -540,7 +549,7 @@ mod tests { #[test] fn adding_three_sequencer_blocks_with_same_ids_doesnt_change_number_of_blobs() { - let mut next_submission = NextSubmission::new(include_all_rollups()); + let mut next_submission = NextSubmission::new(include_all_rollups(), metrics()); next_submission.try_add(block(1)).unwrap(); next_submission.try_add(block(2)).unwrap(); next_submission.try_add(block(3)).unwrap(); @@ -554,7 +563,7 @@ mod tests { // this test makes use of the fact that random data is essentially incompressible so // that size(uncompressed_payload) ~= size(compressed_payload). let mut rng = ChaChaRng::seed_from_u64(0); - let mut next_submission = NextSubmission::new(include_all_rollups()); + let mut next_submission = NextSubmission::new(include_all_rollups(), metrics()); // adding 9 blocks with 100KB random data each, which gives a (compressed) payload slightly // above 900KB. let num_bytes = 100_000usize; @@ -576,7 +585,7 @@ mod tests { // this test makes use of the fact that random data is essentially incompressible so // that size(uncompressed_payload) ~= size(compressed_payload). let mut rng = ChaChaRng::seed_from_u64(0); - let mut next_submission = NextSubmission::new(include_all_rollups()); + let mut next_submission = NextSubmission::new(include_all_rollups(), metrics()); // using the upper limit defined in the constant and add 1KB of extra bytes to ensure // the block is too large diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index 28b23d4acc..01ba811f49 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -58,7 +58,7 @@ use super::{ TrySubmitError, }; use crate::{ - metrics_init, + metrics::Metrics, IncludeRollup, }; @@ -120,6 +120,8 @@ pub(super) struct BlobSubmitter { /// A block that could not be added to `next_submission` because it would overflow its /// hardcoded limit. pending_block: Option, + + metrics: &'static Metrics, } impl BlobSubmitter { @@ -129,6 +131,7 @@ impl BlobSubmitter { state: Arc, submission_state: SubmissionState, shutdown_token: CancellationToken, + metrics: &'static Metrics, ) -> (Self, BlobSubmitterHandle) { // XXX: The channel size here is just a number. It should probably be based on some // heuristic about the number of expected blobs in a block. @@ -136,11 +139,12 @@ impl BlobSubmitter { let submitter = Self { client_builder, blocks: rx, - next_submission: NextSubmission::new(rollup_filter), + next_submission: NextSubmission::new(rollup_filter, metrics), state, submission_state, shutdown_token, pending_block: None, + metrics, }; let handle = BlobSubmitterHandle { tx, @@ -199,6 +203,7 @@ impl BlobSubmitter { submission, self.state.clone(), self.submission_state.clone(), + self.metrics, ).boxed().fuse(); if let Some(block) = self.pending_block.take() { if let Err(error) = self.add_sequencer_block_to_next_submission(block) { @@ -272,6 +277,7 @@ async fn submit_blobs( data: conversion::Submission, state: Arc, submission_state: SubmissionState, + metrics: &'static Metrics, ) -> eyre::Result { info!( blocks = %telemetry::display::json(&data.input_metadata()), @@ -283,27 +289,11 @@ async fn submit_blobs( let start = std::time::Instant::now(); - // allow: gauges require f64, it's okay if the metrics get messed up by overflow or precision - // loss - #[allow(clippy::cast_precision_loss)] - let compressed_size = data.compressed_size() as f64; - metrics::histogram!(metrics_init::BYTES_PER_CELESTIA_TX).record(compressed_size); - - metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(data.compression_ratio()); - - metrics::counter!(crate::metrics_init::CELESTIA_SUBMISSION_COUNT).increment(1); - // XXX: The number of sequencer blocks per celestia tx is equal to the number of heights passed - // into this function. This comes from the way that `QueuedBlocks::take` is implemented. - // - // allow: the number of blocks should always be low enough to not cause precision loss - #[allow(clippy::cast_precision_loss)] - let blocks_per_celestia_tx = data.num_blocks() as f64; - metrics::histogram!(crate::metrics_init::BLOCKS_PER_CELESTIA_TX).record(blocks_per_celestia_tx); - - // allow: the number of blobs should always be low enough to not cause precision loss - #[allow(clippy::cast_precision_loss)] - let blobs_per_celestia_tx = data.num_blobs() as f64; - metrics::histogram!(crate::metrics_init::BLOBS_PER_CELESTIA_TX).record(blobs_per_celestia_tx); + metrics.record_bytes_per_celestia_tx(data.compressed_size()); + metrics.set_compression_ratio_for_astria_block(data.compression_ratio()); + metrics.increment_celestia_submission_count(); + metrics.record_blocks_per_celestia_tx(data.num_blocks()); + metrics.record_blobs_per_celestia_tx(data.num_blobs()); let largest_sequencer_height = data.greatest_sequencer_height(); let blobs = data.into_blobs(); @@ -320,7 +310,7 @@ async fn submit_blobs( Ok(state) => state, }; - let celestia_height = match submit_with_retry(client, blobs, state.clone()).await { + let celestia_height = match submit_with_retry(client, blobs, state.clone(), metrics).await { Err(error) => { let message = "failed submitting blobs to Celestia"; error!(%error, message); @@ -328,10 +318,9 @@ async fn submit_blobs( } Ok(height) => height, }; - metrics::counter!(crate::metrics_init::SEQUENCER_SUBMISSION_HEIGHT) - .absolute(largest_sequencer_height.value()); - metrics::counter!(crate::metrics_init::CELESTIA_SUBMISSION_HEIGHT).absolute(celestia_height); - metrics::histogram!(crate::metrics_init::CELESTIA_SUBMISSION_LATENCY).record(start.elapsed()); + metrics.absolute_set_sequencer_submission_height(largest_sequencer_height.value()); + metrics.absolute_set_celestia_submission_height(celestia_height); + metrics.record_celestia_submission_latency(start.elapsed()); info!(%celestia_height, "successfully submitted blobs to Celestia"); @@ -399,6 +388,7 @@ async fn submit_with_retry( client: CelestiaClient, blobs: Vec, state: Arc, + metrics: &'static Metrics, ) -> eyre::Result { // Moving the span into `on_retry`, because tryhard spawns these in a tokio // task, losing the span. @@ -414,8 +404,7 @@ async fn submit_with_retry( .max_delay(Duration::from_secs(12)) .on_retry( |attempt: u32, next_delay: Option, error: &TrySubmitError| { - metrics::counter!(crate::metrics_init::CELESTIA_SUBMISSION_FAILURE_COUNT) - .increment(1); + metrics.increment_celestia_submission_failure_count(); let state = Arc::clone(&state); state.set_celestia_connected(false); diff --git a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs index 16d4f2da83..5250297317 100644 --- a/crates/astria-sequencer-relayer/src/sequencer_relayer.rs +++ b/crates/astria-sequencer-relayer/src/sequencer_relayer.rs @@ -1,5 +1,6 @@ use std::{ net::SocketAddr, + sync::OnceLock, time::Duration, }; @@ -25,6 +26,7 @@ use tracing::{ use crate::{ api, config::Config, + metrics::Metrics, relayer::{ self, Relayer, @@ -44,6 +46,9 @@ impl SequencerRelayer { /// /// Returns an error if constructing the inner relayer type failed. pub fn new(cfg: Config) -> eyre::Result<(Self, ShutdownHandle)> { + static METRICS: OnceLock = OnceLock::new(); + let metrics = METRICS.get_or_init(Metrics::new); + let shutdown_handle = ShutdownHandle::new(); let rollup_filter = cfg.only_include_rollups()?; let Config { @@ -72,6 +77,7 @@ impl SequencerRelayer { rollup_filter, pre_submit_path, post_submit_path, + metrics, } .build() .wrap_err("failed to create relayer")?; diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 996a786ea4..e8aef971e7 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -89,7 +89,7 @@ use crate::{ genesis::GenesisState, ibc::component::IbcComponent, mempool::Mempool, - metrics_init, + metrics::Metrics, proposal::{ block_size_constraints::BlockSizeConstraints, commitment::{ @@ -162,11 +162,17 @@ pub(crate) struct App { // allow clippy because we need be specific as to what hash this is. #[allow(clippy::struct_field_names)] app_hash: AppHash, + + metrics: &'static Metrics, } impl App { - pub(crate) async fn new(snapshot: Snapshot, mempool: Mempool) -> anyhow::Result { - tracing::debug!("initializing App instance"); + pub(crate) async fn new( + snapshot: Snapshot, + mempool: Mempool, + metrics: &'static Metrics, + ) -> anyhow::Result { + debug!("initializing App instance"); let app_hash: AppHash = snapshot .root_hash() @@ -189,6 +195,7 @@ impl App { execution_results: None, write_batch: None, app_hash, + metrics, }) } @@ -299,17 +306,15 @@ impl App { .execute_transactions_prepare_proposal(&mut block_size_constraints) .await .context("failed to execute transactions")?; - #[allow(clippy::cast_precision_loss)] - metrics::histogram!(metrics_init::PROPOSAL_TRANSACTIONS) - .record(signed_txs_included.len() as f64); + self.metrics + .record_proposal_transactions(signed_txs_included.len()); let deposits = self .state .get_block_deposits() .await .context("failed to get block deposits in prepare_proposal")?; - #[allow(clippy::cast_precision_loss)] - metrics::histogram!(metrics_init::PROPOSAL_DEPOSITS).record(deposits.len() as f64); + self.metrics.record_proposal_deposits(deposits.len()); // generate commitment to sequence::Actions and deposits and commitment to the rollup IDs // included in the block @@ -341,7 +346,7 @@ impl App { self.executed_proposal_hash = process_proposal.hash; return Ok(()); } - metrics::counter!(metrics_init::PROCESS_PROPOSAL_SKIPPED_PROPOSAL).increment(1); + self.metrics.increment_process_proposal_skipped_proposal(); debug!( "our validator address was set but we're not the proposer, so our previous \ proposal was skipped, executing block" @@ -411,16 +416,14 @@ impl App { execution_results.len() == expected_txs_len, "transactions to be included do not match expected", ); - #[allow(clippy::cast_precision_loss)] - metrics::histogram!(metrics_init::PROPOSAL_TRANSACTIONS).record(signed_txs.len() as f64); + self.metrics.record_proposal_transactions(signed_txs.len()); let deposits = self .state .get_block_deposits() .await .context("failed to get block deposits in process_proposal")?; - #[allow(clippy::cast_precision_loss)] - metrics::histogram!(metrics_init::PROPOSAL_DEPOSITS).record(deposits.len() as f64); + self.metrics.record_proposal_deposits(deposits.len()); let GeneratedCommitments { rollup_datas_root: expected_rollup_datas_root, @@ -483,10 +486,8 @@ impl App { // don't include tx if it would make the cometBFT block too large if !block_size_constraints.cometbft_has_space(tx_len) { - metrics::counter!( - metrics_init::PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE - ) - .increment(1); + self.metrics + .increment_prepare_proposal_excluded_transactions_cometbft_space(); debug!( transaction_hash = %tx_hash_base64, block_size_constraints = %json(&block_size_constraints), @@ -537,10 +538,8 @@ impl App { included_signed_txs.push((*tx).clone()); } Err(e) => { - metrics::counter!( - metrics_init::PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE - ) - .increment(1); + self.metrics + .increment_prepare_proposal_excluded_transactions_decode_failure(); debug!( transaction_hash = %tx_hash_base64, error = AsRef::::as_ref(&e), @@ -613,10 +612,8 @@ impl App { .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())); if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { - metrics::counter!( - metrics_init::PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE - ) - .increment(1); + self.metrics + .increment_prepare_proposal_excluded_transactions_sequencer_space(); debug!( transaction_hash = %telemetry::display::base64(&tx_hash), block_size_constraints = %json(&block_size_constraints), @@ -642,10 +639,8 @@ impl App { .context("error growing cometBFT block size")?; } Err(e) => { - metrics::counter!( - metrics_init::PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION - ) - .increment(1); + self.metrics + .increment_prepare_proposal_excluded_transactions_failed_execution(); debug!( transaction_hash = %telemetry::display::base64(&tx_hash), error = AsRef::::as_ref(&e), @@ -657,8 +652,8 @@ impl App { } if excluded_tx_count > 0.0 { - metrics::gauge!(metrics_init::PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS) - .set(excluded_tx_count); + self.metrics + .set_prepare_proposal_excluded_transactions(excluded_tx_count); info!( excluded_tx_count = excluded_tx_count, included_tx_count = execution_results.len(), diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index be58fc6664..15072f61d5 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -27,6 +27,7 @@ use crate::{ GenesisState, }, mempool::Mempool, + metrics::Metrics, }; // attempts to decode the given hex string into an address. @@ -103,7 +104,8 @@ pub(crate) async fn initialize_app_with_storage( .expect("failed to create temp storage backing chain state"); let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); - let mut app = App::new(snapshot, mempool).await.unwrap(); + let metrics = Box::leak(Box::new(Metrics::new())); + let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); let genesis_state = genesis_state.unwrap_or_else(|| GenesisState { accounts: default_genesis_accounts(), diff --git a/crates/astria-sequencer/src/lib.rs b/crates/astria-sequencer/src/lib.rs index ebbe0bf43c..0425796406 100644 --- a/crates/astria-sequencer/src/lib.rs +++ b/crates/astria-sequencer/src/lib.rs @@ -12,7 +12,7 @@ pub(crate) mod genesis; pub(crate) mod grpc; pub(crate) mod ibc; mod mempool; -pub mod metrics_init; +pub(crate) mod metrics; pub(crate) mod proposal; pub(crate) mod sequence; mod sequencer; diff --git a/crates/astria-sequencer/src/main.rs b/crates/astria-sequencer/src/main.rs index 1adb08e16a..dc6dd1266f 100644 --- a/crates/astria-sequencer/src/main.rs +++ b/crates/astria-sequencer/src/main.rs @@ -2,7 +2,6 @@ use std::process::ExitCode; use anyhow::Context as _; use astria_sequencer::{ - metrics_init, Config, Sequencer, BUILD_INFO, @@ -36,8 +35,7 @@ async fn main() -> ExitCode { if !cfg.no_metrics { telemetry_conf = telemetry_conf .metrics_addr(&cfg.metrics_http_listener_addr) - .service_name(env!("CARGO_PKG_NAME")) - .register_metrics(metrics_init::register); + .service_name(env!("CARGO_PKG_NAME")); } let _telemetry_guard = match telemetry_conf diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs new file mode 100644 index 0000000000..b170b8dcd3 --- /dev/null +++ b/crates/astria-sequencer/src/metrics.rs @@ -0,0 +1,281 @@ +use metrics::{ + counter, + describe_counter, + describe_gauge, + describe_histogram, + gauge, + histogram, + Counter, + Gauge, + Histogram, + Unit, +}; +use telemetry::metric_names; + +pub(crate) struct Metrics { + prepare_proposal_excluded_transactions_decode_failure: Counter, + prepare_proposal_excluded_transactions_cometbft_space: Counter, + prepare_proposal_excluded_transactions_sequencer_space: Counter, + prepare_proposal_excluded_transactions_failed_execution: Counter, + prepare_proposal_excluded_transactions: Gauge, + proposal_deposits: Histogram, + proposal_transactions: Histogram, + process_proposal_skipped_proposal: Counter, + check_tx_removed_too_large: Counter, + check_tx_removed_failed_stateless: Counter, + check_tx_removed_stale_nonce: Counter, + check_tx_removed_account_balance: Counter, +} + +impl Metrics { + #[must_use] + pub(crate) fn new() -> Self { + describe_counter!( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, + Unit::Count, + "The number of transactions that have been excluded from blocks due to failing to \ + decode" + ); + let prepare_proposal_excluded_transactions_decode_failure = + counter!(PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE); + + describe_counter!( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, + Unit::Count, + "The number of transactions that have been excluded from blocks due to running out of \ + space in the cometbft block" + ); + let prepare_proposal_excluded_transactions_cometbft_space = + counter!(PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE); + + describe_counter!( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, + Unit::Count, + "The number of transactions that have been excluded from blocks due to running out of \ + space in the sequencer block" + ); + let prepare_proposal_excluded_transactions_sequencer_space = + counter!(PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE); + + describe_counter!( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, + Unit::Count, + "The number of transactions that have been excluded from blocks due to failing to \ + execute" + ); + let prepare_proposal_excluded_transactions_failed_execution = + counter!(PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION); + + describe_gauge!( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, + Unit::Count, + "The number of excluded transactions in a proposal being prepared" + ); + let prepare_proposal_excluded_transactions = gauge!(PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS); + + describe_histogram!( + PROPOSAL_DEPOSITS, + Unit::Count, + "The number of deposits in a proposal" + ); + let proposal_deposits = histogram!(PROPOSAL_DEPOSITS); + + describe_histogram!( + PROPOSAL_TRANSACTIONS, + Unit::Count, + "The number of transactions in a proposal" + ); + let proposal_transactions = histogram!(PROPOSAL_TRANSACTIONS); + + describe_counter!( + PROCESS_PROPOSAL_SKIPPED_PROPOSAL, + Unit::Count, + "The number of times our submitted prepared proposal was skipped in process proposal" + ); + let process_proposal_skipped_proposal = counter!(PROCESS_PROPOSAL_SKIPPED_PROPOSAL); + + describe_counter!( + CHECK_TX_REMOVED_TOO_LARGE, + Unit::Count, + "The number of transactions that have been removed from the mempool due to being too \ + large" + ); + let check_tx_removed_too_large = counter!(CHECK_TX_REMOVED_TOO_LARGE); + + describe_counter!( + CHECK_TX_REMOVED_FAILED_STATELESS, + Unit::Count, + "The number of transactions that have been removed from the mempool due to failing \ + the stateless check" + ); + let check_tx_removed_failed_stateless = counter!(CHECK_TX_REMOVED_FAILED_STATELESS); + + describe_counter!( + CHECK_TX_REMOVED_STALE_NONCE, + Unit::Count, + "The number of transactions that have been removed from the mempool due to having a \ + stale nonce" + ); + let check_tx_removed_stale_nonce = counter!(CHECK_TX_REMOVED_STALE_NONCE); + + describe_counter!( + CHECK_TX_REMOVED_ACCOUNT_BALANCE, + Unit::Count, + "The number of transactions that have been removed from the mempool due to having not \ + enough account balance" + ); + let check_tx_removed_account_balance = counter!(CHECK_TX_REMOVED_ACCOUNT_BALANCE); + + Self { + prepare_proposal_excluded_transactions_decode_failure, + prepare_proposal_excluded_transactions_cometbft_space, + prepare_proposal_excluded_transactions_sequencer_space, + prepare_proposal_excluded_transactions_failed_execution, + prepare_proposal_excluded_transactions, + proposal_deposits, + proposal_transactions, + process_proposal_skipped_proposal, + check_tx_removed_too_large, + check_tx_removed_failed_stateless, + check_tx_removed_stale_nonce, + check_tx_removed_account_balance, + } + } + + pub(crate) fn increment_prepare_proposal_excluded_transactions_decode_failure(&self) { + self.prepare_proposal_excluded_transactions_decode_failure + .increment(1); + } + + pub(crate) fn increment_prepare_proposal_excluded_transactions_cometbft_space(&self) { + self.prepare_proposal_excluded_transactions_cometbft_space + .increment(1); + } + + pub(crate) fn increment_prepare_proposal_excluded_transactions_sequencer_space(&self) { + self.prepare_proposal_excluded_transactions_sequencer_space + .increment(1); + } + + pub(crate) fn increment_prepare_proposal_excluded_transactions_failed_execution(&self) { + self.prepare_proposal_excluded_transactions_failed_execution + .increment(1); + } + + pub(crate) fn set_prepare_proposal_excluded_transactions(&self, count: f64) { + self.prepare_proposal_excluded_transactions.set(count); + } + + pub(crate) fn record_proposal_deposits(&self, count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.proposal_deposits.record(count as f64); + } + + pub(crate) fn record_proposal_transactions(&self, count: usize) { + // allow: precision loss is unlikely (values too small) but also unimportant in histograms. + #[allow(clippy::cast_precision_loss)] + self.proposal_transactions.record(count as f64); + } + + pub(crate) fn increment_process_proposal_skipped_proposal(&self) { + self.process_proposal_skipped_proposal.increment(1); + } + + pub(crate) fn increment_check_tx_removed_too_large(&self) { + self.check_tx_removed_too_large.increment(1); + } + + pub(crate) fn increment_check_tx_removed_failed_stateless(&self) { + self.check_tx_removed_failed_stateless.increment(1); + } + + pub(crate) fn increment_check_tx_removed_stale_nonce(&self) { + self.check_tx_removed_stale_nonce.increment(1); + } + + pub(crate) fn increment_check_tx_removed_account_balance(&self) { + self.check_tx_removed_account_balance.increment(1); + } +} + +metric_names!(pub const METRICS_NAMES: + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, + PROPOSAL_DEPOSITS, + PROPOSAL_TRANSACTIONS, + PROCESS_PROPOSAL_SKIPPED_PROPOSAL, + CHECK_TX_REMOVED_TOO_LARGE, + CHECK_TX_REMOVED_FAILED_STATELESS, + CHECK_TX_REMOVED_STALE_NONCE, + CHECK_TX_REMOVED_ACCOUNT_BALANCE +); + +#[cfg(test)] +mod tests { + use super::{ + CHECK_TX_REMOVED_ACCOUNT_BALANCE, + CHECK_TX_REMOVED_FAILED_STATELESS, + CHECK_TX_REMOVED_STALE_NONCE, + CHECK_TX_REMOVED_TOO_LARGE, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, + PROCESS_PROPOSAL_SKIPPED_PROPOSAL, + PROPOSAL_DEPOSITS, + PROPOSAL_TRANSACTIONS, + }; + + #[track_caller] + fn assert_const(actual: &'static str, suffix: &str) { + // XXX: hard-code this so the crate name isn't accidentally changed. + const CRATE_NAME: &str = "astria_sequencer"; + let expected = format!("{CRATE_NAME}_{suffix}"); + assert_eq!(expected, actual); + } + + #[test] + fn metrics_are_as_expected() { + assert_const( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, + "prepare_proposal_excluded_transactions_decode_failure", + ); + assert_const( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, + "prepare_proposal_excluded_transactions_cometbft_space", + ); + assert_const( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, + "prepare_proposal_excluded_transactions_sequencer_space", + ); + assert_const( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, + "prepare_proposal_excluded_transactions_failed_execution", + ); + assert_const( + PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, + "prepare_proposal_excluded_transactions", + ); + assert_const(PROPOSAL_DEPOSITS, "proposal_deposits"); + assert_const(PROPOSAL_TRANSACTIONS, "proposal_transactions"); + assert_const( + PROCESS_PROPOSAL_SKIPPED_PROPOSAL, + "process_proposal_skipped_proposal", + ); + assert_const(CHECK_TX_REMOVED_TOO_LARGE, "check_tx_removed_too_large"); + assert_const( + CHECK_TX_REMOVED_FAILED_STATELESS, + "check_tx_removed_failed_stateless", + ); + assert_const(CHECK_TX_REMOVED_STALE_NONCE, "check_tx_removed_stale_nonce"); + assert_const( + CHECK_TX_REMOVED_ACCOUNT_BALANCE, + "check_tx_removed_account_balance", + ); + } +} diff --git a/crates/astria-sequencer/src/metrics_init.rs b/crates/astria-sequencer/src/metrics_init.rs deleted file mode 100644 index 6684a95acb..0000000000 --- a/crates/astria-sequencer/src/metrics_init.rs +++ /dev/null @@ -1,172 +0,0 @@ -//! Crate-specific metrics functionality. -//! -//! Registers metrics & lists constants to be used as metric names throughout crate. - -use metrics::{ - describe_counter, - describe_gauge, - describe_histogram, - Unit, -}; -use telemetry::metric_names; - -/// Registers all metrics used by this crate. -pub fn register() { - describe_counter!( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, - Unit::Count, - "The number of transactions that have been excluded from blocks due to failing to decode" - ); - - describe_counter!( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, - Unit::Count, - "The number of transactions that have been excluded from blocks due to running out of \ - space in the cometbft block" - ); - - describe_counter!( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, - Unit::Count, - "The number of transactions that have been excluded from blocks due to running out of \ - space in the sequencer block" - ); - - describe_counter!( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, - Unit::Count, - "The number of transactions that have been excluded from blocks due to failing to execute" - ); - - describe_gauge!( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, - Unit::Count, - "The number of excluded transactions in a proposal being prepared" - ); - - describe_counter!( - PROCESS_PROPOSAL_SKIPPED_PROPOSAL, - Unit::Count, - "The number of times our submitted prepared proposal was skipped in process proposal" - ); - - describe_counter!( - CHECK_TX_REMOVED_TOO_LARGE, - Unit::Count, - "The number of transactions that have been removed from the mempool due to being too large" - ); - - describe_counter!( - CHECK_TX_REMOVED_FAILED_STATELESS, - Unit::Count, - "The number of transactions that have been removed from the mempool due to failing the \ - stateless check" - ); - - describe_counter!( - CHECK_TX_REMOVED_STALE_NONCE, - Unit::Count, - "The number of transactions that have been removed from the mempool due to having a stale \ - nonce" - ); - - describe_counter!( - CHECK_TX_REMOVED_ACCOUNT_BALANCE, - Unit::Count, - "The number of transactions that have been removed from the mempool due to having not \ - enough account balance" - ); - - describe_histogram!( - PROPOSAL_TRANSACTIONS, - Unit::Count, - "The number of transactions in a proposal" - ); - - describe_histogram!( - PROPOSAL_DEPOSITS, - Unit::Count, - "The number of deposits in a proposal" - ); -} - -metric_names!(pub const METRICS_NAMES: - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, - PROPOSAL_DEPOSITS, - PROPOSAL_TRANSACTIONS, - PROCESS_PROPOSAL_SKIPPED_PROPOSAL, - CHECK_TX_REMOVED_TOO_LARGE, - CHECK_TX_REMOVED_FAILED_STATELESS, - CHECK_TX_REMOVED_STALE_NONCE, - CHECK_TX_REMOVED_ACCOUNT_BALANCE -); - -#[cfg(test)] -mod tests { - use super::{ - CHECK_TX_REMOVED_ACCOUNT_BALANCE, - CHECK_TX_REMOVED_FAILED_STATELESS, - CHECK_TX_REMOVED_STALE_NONCE, - CHECK_TX_REMOVED_TOO_LARGE, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, - PROCESS_PROPOSAL_SKIPPED_PROPOSAL, - PROPOSAL_DEPOSITS, - PROPOSAL_TRANSACTIONS, - }; - - #[track_caller] - fn assert_const(actual: &'static str, suffix: &str) { - // XXX: hard-code this so the crate name isn't accidentally changed. - const CRATE_NAME: &str = "astria_sequencer"; - let expected = format!("{CRATE_NAME}_{suffix}"); - assert_eq!(expected, actual); - } - - #[test] - fn metrics_are_as_expected() { - assert_const( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_DECODE_FAILURE, - "prepare_proposal_excluded_transactions_decode_failure", - ); - assert_const( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, - "prepare_proposal_excluded_transactions_cometbft_space", - ); - assert_const( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_SEQUENCER_SPACE, - "prepare_proposal_excluded_transactions_sequencer_space", - ); - assert_const( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, - "prepare_proposal_excluded_transactions_failed_execution", - ); - assert_const( - PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, - "prepare_proposal_excluded_transactions", - ); - assert_const(PROPOSAL_DEPOSITS, "proposal_deposits"); - assert_const(PROPOSAL_TRANSACTIONS, "proposal_transactions"); - assert_const( - PROCESS_PROPOSAL_SKIPPED_PROPOSAL, - "process_proposal_skipped_proposal", - ); - assert_const(CHECK_TX_REMOVED_TOO_LARGE, "check_tx_removed_too_large"); - assert_const( - CHECK_TX_REMOVED_FAILED_STATELESS, - "check_tx_removed_failed_stateless", - ); - assert_const(CHECK_TX_REMOVED_STALE_NONCE, "check_tx_removed_stale_nonce"); - assert_const( - CHECK_TX_REMOVED_ACCOUNT_BALANCE, - "check_tx_removed_account_balance", - ); - } -} diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index ed60debb4e..9ca0ffd5a6 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -1,3 +1,5 @@ +use std::sync::OnceLock; + use anyhow::{ anyhow, Context as _, @@ -34,6 +36,7 @@ use crate::{ grpc::sequencer::SequencerServer, ibc::host_interface::AstriaHost, mempool::Mempool, + metrics::Metrics, service, state_ext::StateReadExt as _, }; @@ -43,6 +46,9 @@ pub struct Sequencer; impl Sequencer { #[instrument(skip_all)] pub async fn run_until_stopped(config: Config) -> Result<()> { + static METRICS: OnceLock = OnceLock::new(); + let metrics = METRICS.get_or_init(Metrics::new); + if config .db_filepath .try_exists() @@ -88,7 +94,7 @@ impl Sequencer { } let mempool = Mempool::new(); - let app = App::new(snapshot, mempool.clone()) + let app = App::new(snapshot, mempool.clone(), metrics) .await .context("failed to initialize app")?; @@ -100,7 +106,7 @@ impl Sequencer { let storage = storage.clone(); async move { service::Consensus::new(storage, app, queue).run().await } })); - let mempool_service = service::Mempool::new(storage.clone(), mempool.clone()); + let mempool_service = service::Mempool::new(storage.clone(), mempool.clone(), metrics); let info_service = service::Info::new(storage.clone()).context("failed initializing info service")?; let snapshot_service = service::Snapshot; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 6e97e14e68..d099c04176 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -250,6 +250,7 @@ mod test { app::test_utils::default_fees, asset::get_native_asset, mempool::Mempool, + metrics::Metrics, proposal::commitment::generate_rollup_datas_commitment, }; @@ -492,7 +493,8 @@ mod test { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); - let mut app = App::new(snapshot, mempool.clone()).await.unwrap(); + let metrics = Box::leak(Box::new(Metrics::new())); + let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await .unwrap(); diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index 4cc1b4b029..cdb7ce3999 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -32,7 +32,7 @@ use tracing::Instrument as _; use crate::{ accounts::state_ext::StateReadExt, mempool::Mempool as AppMempool, - metrics_init, + metrics::Metrics, transaction, }; @@ -45,14 +45,16 @@ const MAX_TX_SIZE: usize = 256_000; // 256 KB #[derive(Clone)] pub(crate) struct Mempool { storage: Storage, - mempool: AppMempool, + inner: AppMempool, + metrics: &'static Metrics, } impl Mempool { - pub(crate) fn new(storage: Storage, mempool: AppMempool) -> Self { + pub(crate) fn new(storage: Storage, mempool: AppMempool, metrics: &'static Metrics) -> Self { Self { storage, - mempool, + inner: mempool, + metrics, } } } @@ -70,11 +72,12 @@ impl Service for Mempool { use penumbra_tower_trace::v038::RequestExt as _; let span = req.create_span(); let storage = self.storage.clone(); - let mut mempool = self.mempool.clone(); + let mut mempool = self.inner.clone(); + let metrics = self.metrics; async move { let rsp = match req { MempoolRequest::CheckTx(req) => MempoolResponse::CheckTx( - handle_check_tx(req, storage.latest_snapshot(), &mut mempool).await, + handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await, ), }; Ok(rsp) @@ -95,6 +98,7 @@ async fn handle_check_tx( req: request::CheckTx, state: S, mempool: &mut AppMempool, + metrics: &'static Metrics, ) -> response::CheckTx { use sha2::Digest as _; @@ -105,7 +109,7 @@ async fn handle_check_tx( } = req; if tx.len() > MAX_TX_SIZE { mempool.remove(tx_hash).await; - metrics::counter!(metrics_init::CHECK_TX_REMOVED_TOO_LARGE).increment(1); + metrics.increment_check_tx_removed_too_large(); return response::CheckTx { code: AbciErrorCode::TRANSACTION_TOO_LARGE.into(), log: format!( @@ -146,7 +150,7 @@ async fn handle_check_tx( if let Err(e) = transaction::check_stateless(&signed_tx).await { mempool.remove(tx_hash).await; - metrics::counter!(metrics_init::CHECK_TX_REMOVED_FAILED_STATELESS).increment(1); + metrics.increment_check_tx_removed_failed_stateless(); return response::CheckTx { code: AbciErrorCode::INVALID_PARAMETER.into(), info: "transaction failed stateless check".into(), @@ -157,7 +161,7 @@ async fn handle_check_tx( if let Err(e) = transaction::check_nonce_mempool(&signed_tx, &state).await { mempool.remove(tx_hash).await; - metrics::counter!(metrics_init::CHECK_TX_REMOVED_STALE_NONCE).increment(1); + metrics.increment_check_tx_removed_stale_nonce(); return response::CheckTx { code: AbciErrorCode::INVALID_NONCE.into(), info: "failed verifying transaction nonce".into(), @@ -178,7 +182,7 @@ async fn handle_check_tx( if let Err(e) = transaction::check_balance_mempool(&signed_tx, &state).await { mempool.remove(tx_hash).await; - metrics::counter!(metrics_init::CHECK_TX_REMOVED_ACCOUNT_BALANCE).increment(1); + metrics.increment_check_tx_removed_account_balance(); return response::CheckTx { code: AbciErrorCode::INSUFFICIENT_FUNDS.into(), info: "failed verifying account balance".into(), diff --git a/crates/astria-telemetry/src/lib.rs b/crates/astria-telemetry/src/lib.rs index 6226e71228..d63680016d 100644 --- a/crates/astria-telemetry/src/lib.rs +++ b/crates/astria-telemetry/src/lib.rs @@ -70,17 +70,9 @@ impl Error { Self(ErrorKind::MetricsAddr(source)) } - fn bucket_error(source: BuildError) -> Self { - Self(ErrorKind::BucketError(source)) - } - fn exporter_install(source: BuildError) -> Self { Self(ErrorKind::ExporterInstall(source)) } - - fn no_metric_register_func() -> Self { - Self(ErrorKind::NoMetricRegisterFunc) - } } #[derive(Debug, thiserror::Error)] @@ -93,15 +85,8 @@ enum ErrorKind { InitSubscriber(#[source] TryInitError), #[error("failed to parse metrics address")] MetricsAddr(#[source] AddrParseError), - #[error("failed to configure prometheus buckets")] - BucketError(#[source] BuildError), #[error("failed installing prometheus metrics exporter")] ExporterInstall(#[source] BuildError), - #[error( - "telemetry was configured to run with metrics, but no function/closure to register \ - metrics was provided" - )] - NoMetricRegisterFunc, } #[must_use = "the otel config must be initialized to be useful"] @@ -147,8 +132,6 @@ pub struct Config { stdout_writer: BoxedMakeWriter, metrics_addr: Option, service_name: String, - metric_buckets: Option>, - register_metrics: Option>, } impl Config { @@ -162,8 +145,6 @@ impl Config { stdout_writer: BoxedMakeWriter::new(std::io::stdout), metrics_addr: None, service_name: String::new(), - metric_buckets: None, - register_metrics: None, } } } @@ -243,22 +224,6 @@ impl Config { } } - #[must_use = "telemetry must be initialized to be useful"] - pub fn metric_buckets(self, metric_buckets: Vec) -> Self { - Self { - metric_buckets: Some(metric_buckets), - ..self - } - } - - #[must_use = "telemetry must be initialized to be useful"] - pub fn register_metrics(self, f: F) -> Self { - Self { - register_metrics: Some(Box::new(f)), - ..self - } - } - /// Initialize telemetry, consuming the config. /// /// # Errors @@ -273,8 +238,6 @@ impl Config { stdout_writer, metrics_addr, service_name, - metric_buckets, - register_metrics, } = self; let env_filter = { @@ -339,17 +302,6 @@ impl Config { metrics_builder = metrics_builder.add_global_label("service", service_name); } - if let Some(buckets) = metric_buckets { - metrics_builder = metrics_builder - .set_buckets(&buckets) - .map_err(Error::bucket_error)?; - } - - let Some(register_metrics) = register_metrics else { - return Err(Error::no_metric_register_func()); - }; - register_metrics(); - metrics_builder.install().map_err(Error::exporter_install)?; }