Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions crates/supervisor/core/src/chain_processor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
// The chainId that this processor is associated with
chain_id: ChainId,

// The sender for chain events, used to communicate with the event loop
event_tx: Option<mpsc::Sender<ChainEvent>>,

// Whether metrics are enabled for the processor
metrics_enabled: Option<bool>,

// The managed node that this processor will handle
managed_node: Arc<P>,

Expand All @@ -47,17 +51,24 @@
cancel_token: CancellationToken,
) -> Self {
// todo: validate chain_id against managed_node

Self {
chain_id,
event_tx: None,
metrics_enabled: None,
managed_node,
state_manager,
cancel_token,
task_handle: Mutex::new(None),
}
}

/// Enables metrics on the database environment.
pub fn with_metrics(mut self) -> Self {
self.metrics_enabled = Some(true);
super::Metrics::init(self.chain_id);
self
}

Check warning on line 70 in crates/supervisor/core/src/chain_processor/chain.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/chain.rs#L66-L70

Added lines #L66 - L70 were not covered by tests

/// Returns the [`ChainId`] associated with this processor.
pub const fn chain_id(&self) -> ChainId {
self.chain_id
Expand All @@ -81,13 +92,17 @@
self.event_tx = Some(event_tx.clone());
self.managed_node.start_subscription(event_tx.clone()).await?;

let task = ChainProcessorTask::new(
let mut task = ChainProcessorTask::new(
self.chain_id,
self.managed_node.clone(),
self.state_manager.clone(),
self.cancel_token.clone(),
event_rx,
);
if self.metrics_enabled.unwrap_or(false) {
task = task.with_metrics();

Check warning on line 103 in crates/supervisor/core/src/chain_processor/chain.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/chain.rs#L103

Added line #L103 was not covered by tests
}

let handle = tokio::spawn(async move {
task.run().await;
});
Expand Down
11 changes: 10 additions & 1 deletion crates/supervisor/core/src/chain_processor/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::syncnode::ManagedNodeError;
use crate::{logindexer::LogIndexerError, syncnode::ManagedNodeError};
use kona_supervisor_storage::StorageError;
use thiserror::Error;

/// Errors that may occur while processing chains in the supervisor core.
Expand All @@ -7,4 +8,12 @@ pub enum ChainProcessorError {
/// Represents an error that occurred while interacting with the managed node.
#[error(transparent)]
ManagedNode(#[from] ManagedNodeError),

/// Represents an error that occured while interacting with the storage layer.
#[error(transparent)]
StorageError(#[from] StorageError),

/// Represents an error that occured while indexing logs.
#[error(transparent)]
LogIndexerError(#[from] LogIndexerError),
}
75 changes: 75 additions & 0 deletions crates/supervisor/core/src/chain_processor/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use alloy_primitives::ChainId;

#[derive(Debug)]
pub(crate) struct Metrics;

impl Metrics {
// --- Metric Names ---
/// Identifier for block processing success.
/// Labels: `chain_id`, `type`
pub(crate) const BLOCK_PROCESSING_SUCCESS_TOTAL: &'static str =
"supervisor_block_processing_success_total";

/// Identifier for block processing errors.
/// Labels: `chain_id`, `type`
pub(crate) const BLOCK_PROCESSING_ERROR_TOTAL: &'static str =
"supervisor_block_processing_error_total";

/// Identifier for block processing latency.
/// Labels: `chain_id`, `type`
pub(crate) const BLOCK_PROCESSING_LATENCY_SECONDS: &'static str =
"supervisor_block_processing_latency_seconds";

const TYPES: [&'static str; 5] =
["local_unsafe", "cross_unsafe", "local_safe", "cross_safe", "finalized"];

pub(crate) fn init(chain_id: ChainId) {
Self::describe();
Self::zero(chain_id);
}

Check warning on line 29 in crates/supervisor/core/src/chain_processor/metrics.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/metrics.rs#L26-L29

Added lines #L26 - L29 were not covered by tests

fn describe() {
metrics::describe_counter!(
Self::BLOCK_PROCESSING_SUCCESS_TOTAL,
metrics::Unit::Count,
"Total number of successfully processed blocks in the supervisor",
);

metrics::describe_counter!(
Self::BLOCK_PROCESSING_ERROR_TOTAL,
metrics::Unit::Count,
"Total number of errors encountered while processing blocks in the supervisor",
);

metrics::describe_histogram!(
Self::BLOCK_PROCESSING_LATENCY_SECONDS,
metrics::Unit::Seconds,
"Latency for processing in the supervisor",
);
}

Check warning on line 49 in crates/supervisor/core/src/chain_processor/metrics.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/metrics.rs#L31-L49

Added lines #L31 - L49 were not covered by tests

fn zero(chain_id: ChainId) {
for &type_name in Self::TYPES.iter() {
metrics::counter!(
Self::BLOCK_PROCESSING_SUCCESS_TOTAL,
"type" => type_name,
"chain_id" => chain_id.to_string()
)
.increment(0);

metrics::counter!(
Self::BLOCK_PROCESSING_ERROR_TOTAL,
"type" => type_name,
"chain_id" => chain_id.to_string()
)
.increment(0);

metrics::histogram!(
Self::BLOCK_PROCESSING_LATENCY_SECONDS,
"type" => type_name,
"chain_id" => chain_id.to_string()
)
.record(0.0);
}
}

Check warning on line 74 in crates/supervisor/core/src/chain_processor/metrics.rs

View check run for this annotation

Codecov / codecov/patch

crates/supervisor/core/src/chain_processor/metrics.rs#L51-L74

Added lines #L51 - L74 were not covered by tests
}
3 changes: 3 additions & 0 deletions crates/supervisor/core/src/chain_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ pub use error::ChainProcessorError;

mod task;
pub use task::ChainProcessorTask;

mod metrics;
pub(crate) use metrics::Metrics;
Loading
Loading