Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
25 changes: 15 additions & 10 deletions crates/supervisor/core/src/reorg/handler.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use super::metrics::Metrics;
use crate::{reorg::task::ReorgTask, syncnode::ManagedNodeController};
use alloy_primitives::ChainId;
use alloy_rpc_client::RpcClient;
use derive_more::Constructor;
use futures::future;
use kona_protocol::BlockInfo;
use kona_supervisor_storage::{DbReader, StorageRewinder};
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Instant};
use thiserror::Error;
use tracing::{info, warn};

Expand Down Expand Up @@ -56,19 +57,23 @@ where
managed_node.clone(),
);

let handle = tokio::spawn(async move { reorg_task.process_chain_reorg().await });
let chain_id = *chain_id;

let handle = tokio::spawn(async move {
let reorg_task = reorg_task.with_metrics();
let start_time = Instant::now();
let result = reorg_task.process_chain_reorg().await;
Metrics::record_l1_reorg_processing(chain_id, start_time, &result);
result
});
handles.push(handle);
}

let results = future::join_all(handles).await;
let failed_chains = results.into_iter().filter(|result| result.is_err()).count();

if failed_chains > 0 {
warn!(
target: "supervisor::reorg_handler",
no_of_failed_chains = %failed_chains,
"Reorg processing completed with failed chains"
);
for result in results {
if let Err(err) = result {
warn!(target: "supervisor::reorg_handler", %err, "Reorg task failed");
}
}

Ok(())
Expand Down
122 changes: 122 additions & 0 deletions crates/supervisor/core/src/reorg/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::SupervisorError;
use alloy_primitives::ChainId;
use std::time::Instant;

#[derive(Debug, Clone)]
pub(crate) struct ReorgDepth {
pub(crate) l1_depth: u64,
pub(crate) l2_depth: u64,
}

/// Metrics for reorg operations
#[derive(Debug, Clone)]
pub(crate) struct Metrics;

impl Metrics {
Comment thread
itschaindev marked this conversation as resolved.
pub(crate) const SUPERVISOR_REORG_SUCCESS: &'static str = "kona_supervisor_reorg_success";
pub(crate) const SUPERVISOR_REORG_ERROR: &'static str = "kona_supervisor_reorg_error";
pub(crate) const SUPERVISOR_REORG_DURATION_SECONDS: &'static str =
"kona_supervisor_reorg_duration_seconds";
pub(crate) const SUPERVISOR_REORG_L1_DEPTH: &'static str = "kona_supervisor_reorg_l1_depth";
pub(crate) const SUPERVISOR_REORG_L2_DEPTH: &'static str = "kona_supervisor_reorg_l2_depth";

pub(crate) fn init(chain_id: ChainId) {
Self::describe();
Self::zero(chain_id);
}

fn describe() {
metrics::describe_counter!(
Self::SUPERVISOR_REORG_SUCCESS,
metrics::Unit::Count,
"Total number of successfully processed L1 reorgs in the supervisor",
);

metrics::describe_counter!(
Self::SUPERVISOR_REORG_ERROR,
metrics::Unit::Count,
"Total number of errors encountered while processing L1 reorgs in the supervisor",
);

metrics::describe_histogram!(
Self::SUPERVISOR_REORG_L1_DEPTH,
metrics::Unit::Count,
"Depth of the L1 reorg in the supervisor",
);

metrics::describe_histogram!(
Self::SUPERVISOR_REORG_L2_DEPTH,
metrics::Unit::Count,
"Depth of the L2 reorg in the supervisor",
);

metrics::describe_histogram!(
Self::SUPERVISOR_REORG_DURATION_SECONDS,
metrics::Unit::Seconds,
"Latency for processing L1 reorgs in the supervisor",
);
}

fn zero(chain_id: ChainId) {
metrics::counter!(Self::SUPERVISOR_REORG_SUCCESS, "chain_id" => chain_id.to_string())
.increment(0);

metrics::counter!(Self::SUPERVISOR_REORG_ERROR, "chain_id" => chain_id.to_string())
.increment(0);

metrics::histogram!(Self::SUPERVISOR_REORG_L1_DEPTH, "chain_id" => chain_id.to_string())
.record(0);

metrics::histogram!(Self::SUPERVISOR_REORG_L2_DEPTH, "chain_id" => chain_id.to_string())
.record(0);

metrics::histogram!(Self::SUPERVISOR_REORG_DURATION_SECONDS, "chain_id" => chain_id.to_string())
.record(0.0);
}

/// Records metrics for a L1 reorg processing operation.
/// Takes the result of the processing and extracts the reorg depth if successful.
pub(crate) fn record_l1_reorg_processing(
chain_id: ChainId,
start_time: Instant,
result: &Result<ReorgDepth, SupervisorError>,
) {
match result {
Ok(reorg_depth) => {
metrics::counter!(
Self::SUPERVISOR_REORG_SUCCESS,
"chain_id" => chain_id.to_string(),
)
.increment(1);

metrics::histogram!(
Self::SUPERVISOR_REORG_L1_DEPTH,
"chain_id" => chain_id.to_string(),
)
.record(reorg_depth.l1_depth as f64);

metrics::histogram!(
Self::SUPERVISOR_REORG_L2_DEPTH,
"chain_id" => chain_id.to_string(),
)
.record(reorg_depth.l2_depth as f64);

// Calculate latency
let latency = start_time.elapsed().as_secs_f64();

metrics::histogram!(
Self::SUPERVISOR_REORG_DURATION_SECONDS,
"chain_id" => chain_id.to_string(),
)
.record(latency);
}
Err(_) => {
metrics::counter!(
Self::SUPERVISOR_REORG_ERROR,
"chain_id" => chain_id.to_string(),
)
.increment(1);
}
}
}
}
2 changes: 2 additions & 0 deletions crates/supervisor/core/src/reorg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ mod task;

mod handler;
pub use handler::{ReorgHandler, ReorgHandlerError};

mod metrics;
36 changes: 26 additions & 10 deletions crates/supervisor/core/src/reorg/task.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use super::metrics::{Metrics, ReorgDepth};
use crate::{SupervisorError, syncnode::ManagedNodeController};
use alloy_eips::BlockNumHash;
use alloy_primitives::{B256, ChainId};
use alloy_rpc_client::RpcClient;
use alloy_rpc_types_eth::Block;
use derive_more::Constructor;
use kona_interop::DerivedRefPair;
use kona_supervisor_storage::{DbReader, StorageRewinder};
use std::sync::Arc;
use tracing::{debug, info, trace, warn};
Expand All @@ -22,12 +24,22 @@ where
C: ManagedNodeController + Send + Sync + 'static,
DB: DbReader + StorageRewinder + Send + Sync + 'static,
{
/// Sets up metrics for the reorg task
pub(crate) fn with_metrics(self) -> Self {
Metrics::init(self.chain_id);
self
}

/// Processes reorg for a single chain
pub(crate) async fn process_chain_reorg(&self) -> Result<(), SupervisorError> {
/// Returns the L2 and L1 reorg depths
/// If the reorg is not needed, returns (0, 0)
pub(crate) async fn process_chain_reorg(&self) -> Result<ReorgDepth, SupervisorError> {
let latest_state = self.db.latest_derivation_state()?;

// Find last valid source block for this chain
let Some(rewind_target_source) = self.find_rewind_target().await? else {
let Some(rewind_target_source) = self.find_rewind_target(latest_state).await? else {
// No need to re-org for this chain
return Ok(());
return Ok(ReorgDepth { l1_depth: 0, l2_depth: 0 });
};

// Get the derived block at the target source block
Expand Down Expand Up @@ -63,19 +75,23 @@ where
SupervisorError::from(err)
})?;

Ok(())
Ok(ReorgDepth {
l1_depth: latest_state.source.number - rewind_target_source.number,
l2_depth: latest_state.derived.number - rewind_target_derived.number,
})
}

/// Finds the rewind target for a chain during a reorg
async fn find_rewind_target(&self) -> Result<Option<BlockNumHash>, SupervisorError> {
async fn find_rewind_target(
&self,
latest_state: DerivedRefPair,
) -> Result<Option<BlockNumHash>, SupervisorError> {
trace!(
target: "supervisor::reorg_handler",
chain_id = %self.chain_id,
"Finding rewind target..."
);

let latest_state = self.db.latest_derivation_state()?;

// Check if the latest source block is still canonical
if self.is_block_canonical(latest_state.source.number, latest_state.source.hash).await? {
debug!(
Expand Down Expand Up @@ -245,7 +261,7 @@ mod tests {

let managed_node = Arc::new(MockManagedNode::new());
let reorg_task = ReorgTask::new(1, Arc::new(mock_db), rpc_client, managed_node);
let rewind_target = reorg_task.find_rewind_target().await;
let rewind_target = reorg_task.process_chain_reorg().await;

// Should succeed since the latest source block is still canonical
assert!(rewind_target.is_ok());
Expand Down Expand Up @@ -346,7 +362,7 @@ mod tests {
..Default::default()
};

mock_db.expect_latest_derivation_state().times(1).returning(move || Ok(latest_state));
mock_db.expect_latest_derivation_state().returning(move || Ok(latest_state));
mock_db
.expect_get_safety_head_ref()
.times(1)
Expand Down Expand Up @@ -380,7 +396,7 @@ mod tests {

let managed_node = Arc::new(MockManagedNode::new());
let reorg_task = ReorgTask::new(1, Arc::new(mock_db), rpc_client, managed_node);
let rewind_target = reorg_task.find_rewind_target().await;
let rewind_target = reorg_task.find_rewind_target(latest_state).await;

// Should succeed since the latest source block is still canonical
assert!(rewind_target.is_ok());
Expand Down
2 changes: 1 addition & 1 deletion crates/supervisor/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ impl Service {
self.database_factory.clone(),
chain_event_senders.clone(),
self.cancel_token.clone(),
ReorgHandler::new(l1_rpc, chain_dbs_map, self.managed_nodes.clone()),
ReorgHandler::new(l1_rpc.clone(), chain_dbs_map, self.managed_nodes.clone()),
);

self.join_set.spawn(async move {
Expand Down
Loading