diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index 480a29d835..624f10437f 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -19,8 +19,12 @@ pub struct ChainProcessor { // The chainId that this processor is associated with chain_id: ChainId, + // The sender for chain events, used to communicate with the event loop event_tx: Option>, + // Whether metrics are enabled for the processor + metrics_enabled: Option, + // The managed node that this processor will handle managed_node: Arc

, @@ -47,10 +51,10 @@ where cancel_token: CancellationToken, ) -> Self { // todo: validate chain_id against managed_node - Self { chain_id, event_tx: None, + metrics_enabled: None, managed_node, state_manager, cancel_token, @@ -58,6 +62,13 @@ where } } + /// Enables metrics on the database environment. + pub fn with_metrics(mut self) -> Self { + self.metrics_enabled = Some(true); + super::Metrics::init(self.chain_id); + self + } + /// Returns the [`ChainId`] associated with this processor. pub const fn chain_id(&self) -> ChainId { self.chain_id @@ -81,13 +92,17 @@ where self.event_tx = Some(event_tx.clone()); self.managed_node.start_subscription(event_tx.clone()).await?; - let task = ChainProcessorTask::new( + let mut task = ChainProcessorTask::new( self.chain_id, self.managed_node.clone(), self.state_manager.clone(), self.cancel_token.clone(), event_rx, ); + if self.metrics_enabled.unwrap_or(false) { + task = task.with_metrics(); + } + let handle = tokio::spawn(async move { task.run().await; }); diff --git a/crates/supervisor/core/src/chain_processor/error.rs b/crates/supervisor/core/src/chain_processor/error.rs index 403560250d..e5d4714af4 100644 --- a/crates/supervisor/core/src/chain_processor/error.rs +++ b/crates/supervisor/core/src/chain_processor/error.rs @@ -1,4 +1,5 @@ -use crate::syncnode::ManagedNodeError; +use crate::{logindexer::LogIndexerError, syncnode::ManagedNodeError}; +use kona_supervisor_storage::StorageError; use thiserror::Error; /// Errors that may occur while processing chains in the supervisor core. @@ -7,4 +8,12 @@ pub enum ChainProcessorError { /// Represents an error that occurred while interacting with the managed node. #[error(transparent)] ManagedNode(#[from] ManagedNodeError), + + /// Represents an error that occured while interacting with the storage layer. + #[error(transparent)] + StorageError(#[from] StorageError), + + /// Represents an error that occured while indexing logs. + #[error(transparent)] + LogIndexerError(#[from] LogIndexerError), } diff --git a/crates/supervisor/core/src/chain_processor/metrics.rs b/crates/supervisor/core/src/chain_processor/metrics.rs new file mode 100644 index 0000000000..6116ba0b9e --- /dev/null +++ b/crates/supervisor/core/src/chain_processor/metrics.rs @@ -0,0 +1,75 @@ +use alloy_primitives::ChainId; + +#[derive(Debug)] +pub(crate) struct Metrics; + +impl Metrics { + // --- Metric Names --- + /// Identifier for block processing success. + /// Labels: `chain_id`, `type` + pub(crate) const BLOCK_PROCESSING_SUCCESS_TOTAL: &'static str = + "supervisor_block_processing_success_total"; + + /// Identifier for block processing errors. + /// Labels: `chain_id`, `type` + pub(crate) const BLOCK_PROCESSING_ERROR_TOTAL: &'static str = + "supervisor_block_processing_error_total"; + + /// Identifier for block processing latency. + /// Labels: `chain_id`, `type` + pub(crate) const BLOCK_PROCESSING_LATENCY_SECONDS: &'static str = + "supervisor_block_processing_latency_seconds"; + + const TYPES: [&'static str; 5] = + ["local_unsafe", "cross_unsafe", "local_safe", "cross_safe", "finalized"]; + + pub(crate) fn init(chain_id: ChainId) { + Self::describe(); + Self::zero(chain_id); + } + + fn describe() { + metrics::describe_counter!( + Self::BLOCK_PROCESSING_SUCCESS_TOTAL, + metrics::Unit::Count, + "Total number of successfully processed blocks in the supervisor", + ); + + metrics::describe_counter!( + Self::BLOCK_PROCESSING_ERROR_TOTAL, + metrics::Unit::Count, + "Total number of errors encountered while processing blocks in the supervisor", + ); + + metrics::describe_histogram!( + Self::BLOCK_PROCESSING_LATENCY_SECONDS, + metrics::Unit::Seconds, + "Latency for processing in the supervisor", + ); + } + + fn zero(chain_id: ChainId) { + for &type_name in Self::TYPES.iter() { + metrics::counter!( + Self::BLOCK_PROCESSING_SUCCESS_TOTAL, + "type" => type_name, + "chain_id" => chain_id.to_string() + ) + .increment(0); + + metrics::counter!( + Self::BLOCK_PROCESSING_ERROR_TOTAL, + "type" => type_name, + "chain_id" => chain_id.to_string() + ) + .increment(0); + + metrics::histogram!( + Self::BLOCK_PROCESSING_LATENCY_SECONDS, + "type" => type_name, + "chain_id" => chain_id.to_string() + ) + .record(0.0); + } + } +} diff --git a/crates/supervisor/core/src/chain_processor/mod.rs b/crates/supervisor/core/src/chain_processor/mod.rs index d7cee8a67c..a9c7bedbfe 100644 --- a/crates/supervisor/core/src/chain_processor/mod.rs +++ b/crates/supervisor/core/src/chain_processor/mod.rs @@ -10,3 +10,6 @@ pub use error::ChainProcessorError; mod task; pub use task::ChainProcessorTask; + +mod metrics; +pub(crate) use metrics::Metrics; diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 53ae75d787..ae9dfff590 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -1,4 +1,5 @@ -use crate::{LogIndexer, event::ChainEvent, syncnode::ManagedNodeProvider}; +use super::Metrics; +use crate::{ChainProcessorError, LogIndexer, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; @@ -13,6 +14,7 @@ use tracing::{debug, error, info}; #[derive(Debug)] pub struct ChainProcessorTask { chain_id: ChainId, + metrics_enabled: Option, managed_node: Arc

, @@ -42,6 +44,7 @@ where let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone()); Self { chain_id, + metrics_enabled: None, cancel_token, managed_node, event_rx, @@ -50,6 +53,73 @@ where } } + /// Enables metrics on the database environment. + pub const fn with_metrics(mut self) -> Self { + self.metrics_enabled = Some(true); + self + } + + /// Observes an async call, recording metrics and latency for block processing. + /// The latecy is calculated as the difference between the current system time and the block's + /// timestamp. + async fn observe_block_processing( + &self, + event_type: &'static str, + f: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + let result = f().await; + + if !self.metrics_enabled.unwrap_or(false) { + return result; + } + + match &result { + Ok(block) => { + metrics::counter!( + Metrics::BLOCK_PROCESSING_SUCCESS_TOTAL, + "type" => event_type, + "chain_id" => self.chain_id.to_string() + ) + .increment(1); + + // Calculate elapsed time for block processing + let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(duration) => duration.as_secs_f64(), + Err(e) => { + error!( + target: "chain_processor", + chain_id = self.chain_id, + "SystemTime error when recording block processing latency: {e}" + ); + return result; + } + }; + + let latency = now - block.timestamp as f64; + metrics::histogram!( + Metrics::BLOCK_PROCESSING_LATENCY_SECONDS, + "type" => event_type, + "chain_id" => self.chain_id.to_string() + ) + .record(latency); + } + Err(_err) => { + metrics::counter!( + Metrics::BLOCK_PROCESSING_ERROR_TOTAL, + "type" => event_type, + "chain_id" => self.chain_id.to_string() + ) + .increment(1); + } + } + + result + } + /// Runs the chain processor task, which listens for events and processes them. /// This method will run indefinitely until the cancellation token is triggered. pub async fn run(mut self) { @@ -74,27 +144,90 @@ where async fn handle_event(&self, event: ChainEvent) { match event { - ChainEvent::UnsafeBlock { block } => self.handle_unsafe_event(block).await, + ChainEvent::UnsafeBlock { block } => { + let _ = self + .observe_block_processing("local_unsafe", || async { + self.handle_unsafe_event(block).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + %err, + "Failed to process unsafe block" + ); + }) + }) + .await; + } ChainEvent::DerivedBlock { derived_ref_pair } => { - self.handle_safe_event(derived_ref_pair).await + let _ = self + .observe_block_processing("local_safe", || async { + self.handle_safe_event(derived_ref_pair).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + %err, + "Failed to process local safe derived block pair" + ); + }) + }) + .await; } ChainEvent::DerivationOriginUpdate { origin } => { - self.handle_derivation_origin_update(origin) + let _ = self.handle_derivation_origin_update(origin).inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = origin.number, + %err, + "Failed to update derivation origin" + ); + }); } ChainEvent::BlockReplaced { replacement } => { - self.handle_block_replacement(replacement).await + let _ = self.handle_block_replacement(replacement).inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + %err, + "Failed to handle block replacement" + ); + }); } ChainEvent::FinalizedSourceUpdate { finalized_source_block } => { - self.handle_finalized_l1_update(finalized_source_block).await + let _ = self + .observe_block_processing("finalized", || async { + self.handle_finalized_l1_update(finalized_source_block).await.inspect_err( + |err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = finalized_source_block.number, + %err, + "Failed to process finalized source update" + ); + }, + ) + }) + .await; } } } - async fn handle_block_replacement(&self, _replacement: BlockReplacement) { + #[allow(clippy::missing_const_for_fn)] + fn handle_block_replacement( + &self, + _replacement: BlockReplacement, + ) -> Result<(), ChainProcessorError> { // Logic to handle block replacement + Ok(()) } - async fn handle_finalized_l1_update(&self, finalized_source_block: BlockInfo) { + async fn handle_finalized_l1_update( + &self, + finalized_source_block: BlockInfo, + ) -> Result { debug!( target: "chain_processor", chain_id = self.chain_id, @@ -102,86 +235,52 @@ where "Processing finalized L1 update" ); let finalized_derived_block = - match self.state_manager.update_finalized_using_source(finalized_source_block) { - Ok(finalized_l2) => finalized_l2, - Err(err) => { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = finalized_source_block.number, - %err, - "Failed to update finalized L1 block" - ); - return; - } - }; - - if let Err(err) = self.managed_node.update_finalized(finalized_derived_block.id()).await { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = finalized_source_block.number, - %err, - "Failed to update finalized L2 block on managed node" - ); - } + self.state_manager.update_finalized_using_source(finalized_source_block)?; + self.managed_node.update_finalized(finalized_derived_block.id()).await?; + Ok(finalized_derived_block) } - fn handle_derivation_origin_update(&self, origin: BlockInfo) { + fn handle_derivation_origin_update( + &self, + origin: BlockInfo, + ) -> Result<(), ChainProcessorError> { debug!( target: "chain_processor", chain_id = self.chain_id, block_number = origin.number, "Processing derivation origin update" ); - if let Err(err) = self.state_manager.update_current_l1(origin) { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = origin.number, - %err, - "Failed to update current L1 block" - ); - } + self.state_manager.update_current_l1(origin)?; + Ok(()) } - async fn handle_safe_event(&self, derived_ref_pair: DerivedRefPair) { + async fn handle_safe_event( + &self, + derived_ref_pair: DerivedRefPair, + ) -> Result { debug!( target: "chain_processor", chain_id = self.chain_id, block_number = derived_ref_pair.derived.number, "Processing local safe derived block pair" ); - if let Err(err) = self.state_manager.save_derived_block_pair(derived_ref_pair) { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = derived_ref_pair.derived.number, - %err, - "Failed to process safe block" - ); - // TODO: take next action based on the error - } + self.state_manager.save_derived_block_pair(derived_ref_pair)?; + Ok(derived_ref_pair.derived) } - async fn handle_unsafe_event(&self, block_info: BlockInfo) { + async fn handle_unsafe_event( + &self, + block: BlockInfo, + ) -> Result { debug!( target: "chain_processor", chain_id = self.chain_id, - block_number = block_info.number, + block_number = block.number, "Processing unsafe block" ); - if let Err(err) = self.log_indexer.process_and_store_logs(&block_info).await { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = block_info.number, - %err, - "Failed to process unsafe block" - ); - // TODO: take next action based on the error - } + self.log_indexer.process_and_store_logs(&block).await?; + Ok(block) } } diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index f5a86b2c13..0d2c2f31db 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -79,7 +79,7 @@ where } /// Error type for the [`LogIndexer`]. -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum LogIndexerError { /// Failed to write processed logs for a block to the state manager. #[error(transparent)] diff --git a/crates/supervisor/core/src/supervisor.rs b/crates/supervisor/core/src/supervisor.rs index 5ae78ef235..62e320fc9e 100644 --- a/crates/supervisor/core/src/supervisor.rs +++ b/crates/supervisor/core/src/supervisor.rs @@ -162,6 +162,9 @@ impl Supervisor { let mut processor = ChainProcessor::new(*chain_id, managed_node.clone(), db, self.cancel_token.clone()); + // todo: enable metrics only if configured + processor = processor.with_metrics(); + // Start the chain processors. // Each chain processor will start its own managed nodes and begin processing messages. processor.start().await?;