From 14c92fe7c24b69c5c1f59e3792fa3f6864872976 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 25 Jun 2025 22:33:32 +0530 Subject: [PATCH 1/5] feat(supervisor/core): block processing metrics --- .../core/src/chain_processor/chain.rs | 19 +- .../core/src/chain_processor/error.rs | 11 +- .../core/src/chain_processor/metrics.rs | 75 ++++++ .../core/src/chain_processor/mod.rs | 3 + .../core/src/chain_processor/task.rs | 224 +++++++++++++----- .../supervisor/core/src/logindexer/indexer.rs | 2 +- crates/supervisor/core/src/supervisor.rs | 3 + tests/supervisor/sync_test.go | 44 ++-- 8 files changed, 291 insertions(+), 90 deletions(-) create mode 100644 crates/supervisor/core/src/chain_processor/metrics.rs diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index 480a29d835..624f10437f 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -19,8 +19,12 @@ pub struct ChainProcessor { // The chainId that this processor is associated with chain_id: ChainId, + // The sender for chain events, used to communicate with the event loop event_tx: Option>, + // Whether metrics are enabled for the processor + metrics_enabled: Option, + // The managed node that this processor will handle managed_node: Arc

, @@ -47,10 +51,10 @@ where cancel_token: CancellationToken, ) -> Self { // todo: validate chain_id against managed_node - Self { chain_id, event_tx: None, + metrics_enabled: None, managed_node, state_manager, cancel_token, @@ -58,6 +62,13 @@ where } } + /// Enables metrics on the database environment. + pub fn with_metrics(mut self) -> Self { + self.metrics_enabled = Some(true); + super::Metrics::init(self.chain_id); + self + } + /// Returns the [`ChainId`] associated with this processor. pub const fn chain_id(&self) -> ChainId { self.chain_id @@ -81,13 +92,17 @@ where self.event_tx = Some(event_tx.clone()); self.managed_node.start_subscription(event_tx.clone()).await?; - let task = ChainProcessorTask::new( + let mut task = ChainProcessorTask::new( self.chain_id, self.managed_node.clone(), self.state_manager.clone(), self.cancel_token.clone(), event_rx, ); + if self.metrics_enabled.unwrap_or(false) { + task = task.with_metrics(); + } + let handle = tokio::spawn(async move { task.run().await; }); diff --git a/crates/supervisor/core/src/chain_processor/error.rs b/crates/supervisor/core/src/chain_processor/error.rs index 403560250d..e5d4714af4 100644 --- a/crates/supervisor/core/src/chain_processor/error.rs +++ b/crates/supervisor/core/src/chain_processor/error.rs @@ -1,4 +1,5 @@ -use crate::syncnode::ManagedNodeError; +use crate::{logindexer::LogIndexerError, syncnode::ManagedNodeError}; +use kona_supervisor_storage::StorageError; use thiserror::Error; /// Errors that may occur while processing chains in the supervisor core. @@ -7,4 +8,12 @@ pub enum ChainProcessorError { /// Represents an error that occurred while interacting with the managed node. #[error(transparent)] ManagedNode(#[from] ManagedNodeError), + + /// Represents an error that occured while interacting with the storage layer. + #[error(transparent)] + StorageError(#[from] StorageError), + + /// Represents an error that occured while indexing logs. + #[error(transparent)] + LogIndexerError(#[from] LogIndexerError), } diff --git a/crates/supervisor/core/src/chain_processor/metrics.rs b/crates/supervisor/core/src/chain_processor/metrics.rs new file mode 100644 index 0000000000..6116ba0b9e --- /dev/null +++ b/crates/supervisor/core/src/chain_processor/metrics.rs @@ -0,0 +1,75 @@ +use alloy_primitives::ChainId; + +#[derive(Debug)] +pub(crate) struct Metrics; + +impl Metrics { + // --- Metric Names --- + /// Identifier for block processing success. + /// Labels: `chain_id`, `type` + pub(crate) const BLOCK_PROCESSING_SUCCESS_TOTAL: &'static str = + "supervisor_block_processing_success_total"; + + /// Identifier for block processing errors. + /// Labels: `chain_id`, `type` + pub(crate) const BLOCK_PROCESSING_ERROR_TOTAL: &'static str = + "supervisor_block_processing_error_total"; + + /// Identifier for block processing latency. + /// Labels: `chain_id`, `type` + pub(crate) const BLOCK_PROCESSING_LATENCY_SECONDS: &'static str = + "supervisor_block_processing_latency_seconds"; + + const TYPES: [&'static str; 5] = + ["local_unsafe", "cross_unsafe", "local_safe", "cross_safe", "finalized"]; + + pub(crate) fn init(chain_id: ChainId) { + Self::describe(); + Self::zero(chain_id); + } + + fn describe() { + metrics::describe_counter!( + Self::BLOCK_PROCESSING_SUCCESS_TOTAL, + metrics::Unit::Count, + "Total number of successfully processed blocks in the supervisor", + ); + + metrics::describe_counter!( + Self::BLOCK_PROCESSING_ERROR_TOTAL, + metrics::Unit::Count, + "Total number of errors encountered while processing blocks in the supervisor", + ); + + metrics::describe_histogram!( + Self::BLOCK_PROCESSING_LATENCY_SECONDS, + metrics::Unit::Seconds, + "Latency for processing in the supervisor", + ); + } + + fn zero(chain_id: ChainId) { + for &type_name in Self::TYPES.iter() { + metrics::counter!( + Self::BLOCK_PROCESSING_SUCCESS_TOTAL, + "type" => type_name, + "chain_id" => chain_id.to_string() + ) + .increment(0); + + metrics::counter!( + Self::BLOCK_PROCESSING_ERROR_TOTAL, + "type" => type_name, + "chain_id" => chain_id.to_string() + ) + .increment(0); + + metrics::histogram!( + Self::BLOCK_PROCESSING_LATENCY_SECONDS, + "type" => type_name, + "chain_id" => chain_id.to_string() + ) + .record(0.0); + } + } +} diff --git a/crates/supervisor/core/src/chain_processor/mod.rs b/crates/supervisor/core/src/chain_processor/mod.rs index d7cee8a67c..a9c7bedbfe 100644 --- a/crates/supervisor/core/src/chain_processor/mod.rs +++ b/crates/supervisor/core/src/chain_processor/mod.rs @@ -10,3 +10,6 @@ pub use error::ChainProcessorError; mod task; pub use task::ChainProcessorTask; + +mod metrics; +pub(crate) use metrics::Metrics; diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 53ae75d787..e191b34a64 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -1,4 +1,5 @@ -use crate::{LogIndexer, event::ChainEvent, syncnode::ManagedNodeProvider}; +use super::Metrics; +use crate::{ChainProcessorError, LogIndexer, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; @@ -13,6 +14,7 @@ use tracing::{debug, error, info}; #[derive(Debug)] pub struct ChainProcessorTask { chain_id: ChainId, + metrics_enabled: Option, managed_node: Arc

, @@ -42,6 +44,7 @@ where let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone()); Self { chain_id, + metrics_enabled: None, cancel_token, managed_node, event_rx, @@ -50,6 +53,71 @@ where } } + /// Enables metrics on the database environment. + pub fn with_metrics(mut self) -> Self { + self.metrics_enabled = Some(true); + self + } + + /// Observes an async call, recording metrics and latency for block processing. + async fn observe_block_processing( + &self, + event_type: &'static str, + f: F, + ) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + let result = f().await; + + if !self.metrics_enabled.unwrap_or(false) { + return result; + } + + match &result { + Ok(block) => { + metrics::counter!( + Metrics::BLOCK_PROCESSING_SUCCESS_TOTAL, + "type" => event_type, + "chain_id" => self.chain_id.to_string() + ) + .increment(1); + + // Calculate elapsed time for block processing + let now = match std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) { + Ok(duration) => duration.as_secs_f64(), + Err(e) => { + error!( + target: "chain_processor", + chain_id = self.chain_id, + "SystemTime error when recording block processing latency: {e}" + ); + return result; + } + }; + + let latency = now - block.timestamp as f64; + metrics::histogram!( + Metrics::BLOCK_PROCESSING_LATENCY_SECONDS, + "type" => event_type, + "chain_id" => self.chain_id.to_string() + ) + .record(latency); + } + Err(_err) => { + metrics::counter!( + Metrics::BLOCK_PROCESSING_ERROR_TOTAL, + "type" => event_type, + "chain_id" => self.chain_id.to_string() + ) + .increment(1); + } + } + + result + } + /// Runs the chain processor task, which listens for events and processes them. /// This method will run indefinitely until the cancellation token is triggered. pub async fn run(mut self) { @@ -74,27 +142,89 @@ where async fn handle_event(&self, event: ChainEvent) { match event { - ChainEvent::UnsafeBlock { block } => self.handle_unsafe_event(block).await, + ChainEvent::UnsafeBlock { block } => { + let _ = self + .observe_block_processing("local_unsafe", || async { + self.handle_unsafe_event(block).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + %err, + "Failed to process unsafe block" + ); + }) + }) + .await; + } ChainEvent::DerivedBlock { derived_ref_pair } => { - self.handle_safe_event(derived_ref_pair).await + let _ = self + .observe_block_processing("local_safe", || async { + self.handle_safe_event(derived_ref_pair).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + %err, + "Failed to process local safe derived block pair" + ); + }) + }) + .await; } ChainEvent::DerivationOriginUpdate { origin } => { - self.handle_derivation_origin_update(origin) + let _ = self.handle_derivation_origin_update(origin).inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = origin.number, + %err, + "Failed to update derivation origin" + ); + }); } ChainEvent::BlockReplaced { replacement } => { - self.handle_block_replacement(replacement).await + let _ = self.handle_block_replacement(replacement).inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + %err, + "Failed to handle block replacement" + ); + }); } ChainEvent::FinalizedSourceUpdate { finalized_source_block } => { - self.handle_finalized_l1_update(finalized_source_block).await + let _ = self + .observe_block_processing("finalized", || async { + self.handle_finalized_l1_update(finalized_source_block).await.inspect_err( + |err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = finalized_source_block.number, + %err, + "Failed to process finalized source update" + ); + }, + ) + }) + .await; } } } - async fn handle_block_replacement(&self, _replacement: BlockReplacement) { + fn handle_block_replacement( + &self, + _replacement: BlockReplacement, + ) -> Result<(), ChainProcessorError> { // Logic to handle block replacement + Ok(()) } - async fn handle_finalized_l1_update(&self, finalized_source_block: BlockInfo) { + async fn handle_finalized_l1_update( + &self, + finalized_source_block: BlockInfo, + ) -> Result { debug!( target: "chain_processor", chain_id = self.chain_id, @@ -102,86 +232,52 @@ where "Processing finalized L1 update" ); let finalized_derived_block = - match self.state_manager.update_finalized_using_source(finalized_source_block) { - Ok(finalized_l2) => finalized_l2, - Err(err) => { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = finalized_source_block.number, - %err, - "Failed to update finalized L1 block" - ); - return; - } - }; - - if let Err(err) = self.managed_node.update_finalized(finalized_derived_block.id()).await { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = finalized_source_block.number, - %err, - "Failed to update finalized L2 block on managed node" - ); - } + self.state_manager.update_finalized_using_source(finalized_source_block)?; + self.managed_node.update_finalized(finalized_derived_block.id()).await?; + Ok(finalized_derived_block) } - fn handle_derivation_origin_update(&self, origin: BlockInfo) { + fn handle_derivation_origin_update( + &self, + origin: BlockInfo, + ) -> Result<(), ChainProcessorError> { debug!( target: "chain_processor", chain_id = self.chain_id, block_number = origin.number, "Processing derivation origin update" ); - if let Err(err) = self.state_manager.update_current_l1(origin) { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = origin.number, - %err, - "Failed to update current L1 block" - ); - } + self.state_manager.update_current_l1(origin)?; + Ok(()) } - async fn handle_safe_event(&self, derived_ref_pair: DerivedRefPair) { + async fn handle_safe_event( + &self, + derived_ref_pair: DerivedRefPair, + ) -> Result { debug!( target: "chain_processor", chain_id = self.chain_id, block_number = derived_ref_pair.derived.number, "Processing local safe derived block pair" ); - if let Err(err) = self.state_manager.save_derived_block_pair(derived_ref_pair) { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = derived_ref_pair.derived.number, - %err, - "Failed to process safe block" - ); - // TODO: take next action based on the error - } + self.state_manager.save_derived_block_pair(derived_ref_pair)?; + Ok(derived_ref_pair.derived) } - async fn handle_unsafe_event(&self, block_info: BlockInfo) { + async fn handle_unsafe_event( + &self, + block: BlockInfo, + ) -> Result { debug!( target: "chain_processor", chain_id = self.chain_id, - block_number = block_info.number, + block_number = block.number, "Processing unsafe block" ); - if let Err(err) = self.log_indexer.process_and_store_logs(&block_info).await { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = block_info.number, - %err, - "Failed to process unsafe block" - ); - // TODO: take next action based on the error - } + self.log_indexer.process_and_store_logs(&block).await?; + Ok(block) } } diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index f5a86b2c13..0d2c2f31db 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -79,7 +79,7 @@ where } /// Error type for the [`LogIndexer`]. -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq, Eq)] pub enum LogIndexerError { /// Failed to write processed logs for a block to the state manager. #[error(transparent)] diff --git a/crates/supervisor/core/src/supervisor.rs b/crates/supervisor/core/src/supervisor.rs index 5ae78ef235..62e320fc9e 100644 --- a/crates/supervisor/core/src/supervisor.rs +++ b/crates/supervisor/core/src/supervisor.rs @@ -162,6 +162,9 @@ impl Supervisor { let mut processor = ChainProcessor::new(*chain_id, managed_node.clone(), db, self.cancel_token.clone()); + // todo: enable metrics only if configured + processor = processor.with_metrics(); + // Start the chain processors. // Each chain processor will start its own managed nodes and begin processing messages. processor.start().await?; diff --git a/tests/supervisor/sync_test.go b/tests/supervisor/sync_test.go index 3d472350e7..4379f583fa 100644 --- a/tests/supervisor/sync_test.go +++ b/tests/supervisor/sync_test.go @@ -60,7 +60,7 @@ func TestLocalUnsafeHeadAdvancing(gt *testing.T) { } func TestCrossUnsafeHeadAdvancing(gt *testing.T) { - gt.Skip("Feature not implemented yet") + // gt.Skip("Feature not implemented yet") t := devtest.ParallelT(gt) out := presets.NewSimpleInterop(t) @@ -79,17 +79,17 @@ func TestCrossUnsafeHeadAdvancing(gt *testing.T) { latestSupervisorStatus.Chains[l2bChainID].LocalUnsafe.Number >= supervisorStatus.Chains[l2bChainID].LocalUnsafe.Number, nil }) - // Wait and check if the cross unsafe head has advanced on L2A - err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - status := out.L2CLA.SyncStatus() - return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossUnsafe.Number, nil - }) + // // Wait and check if the cross unsafe head has advanced on L2A + // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + // status := out.L2CLA.SyncStatus() + // return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossUnsafe.Number, nil + // }) - // Wait and check if the cross unsafe head has advanced on L2B - err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - status := out.L2CLB.SyncStatus() - return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossUnsafe.Number, nil - }) + // // Wait and check if the cross unsafe head has advanced on L2B + // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + // status := out.L2CLB.SyncStatus() + // return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossUnsafe.Number, nil + // }) t.Require().NoError(err) } @@ -128,7 +128,7 @@ func TestLocalSafeHeadAdvancing(gt *testing.T) { } func TestCrossSafeHeadAdvancing(gt *testing.T) { - gt.Skip("Feature not implemented yet") + // gt.Skip("Feature not implemented yet") t := devtest.ParallelT(gt) out := presets.NewSimpleInterop(t) @@ -147,17 +147,17 @@ func TestCrossSafeHeadAdvancing(gt *testing.T) { latestSupervisorStatus.Chains[l2bChainID].CrossSafe.Number >= supervisorStatus.Chains[l2bChainID].CrossSafe.Number, nil }) - // Wait and check if the cross safe head has advanced on L2A - err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - status := out.L2CLA.SyncStatus() - return status.SafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossSafe.Number, nil - }) + // // Wait and check if the cross safe head has advanced on L2A + // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + // status := out.L2CLA.SyncStatus() + // return status.SafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossSafe.Number, nil + // }) - // Wait and check if the cross safe head has advanced on L2B - err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - status := out.L2CLB.SyncStatus() - return status.SafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossSafe.Number, nil - }) + // // Wait and check if the cross safe head has advanced on L2B + // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + // status := out.L2CLB.SyncStatus() + // return status.SafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossSafe.Number, nil + // }) t.Require().NoError(err) } From 2789ab9c400a11961d97a0030c2a088d304e795f Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 26 Jun 2025 10:26:02 +0530 Subject: [PATCH 2/5] doc --- crates/supervisor/core/src/chain_processor/task.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index e191b34a64..1ddcefad22 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -60,6 +60,7 @@ where } /// Observes an async call, recording metrics and latency for block processing. + /// The latecy is calculated as the difference between the current system time and the block's timestamp. async fn observe_block_processing( &self, event_type: &'static str, From 0c34a693ab195c73f3415bc45104c39a1b114443 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 26 Jun 2025 10:35:59 +0530 Subject: [PATCH 3/5] revert test --- tests/supervisor/sync_test.go | 44 +++++++++++++++++------------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/tests/supervisor/sync_test.go b/tests/supervisor/sync_test.go index 4379f583fa..3d472350e7 100644 --- a/tests/supervisor/sync_test.go +++ b/tests/supervisor/sync_test.go @@ -60,7 +60,7 @@ func TestLocalUnsafeHeadAdvancing(gt *testing.T) { } func TestCrossUnsafeHeadAdvancing(gt *testing.T) { - // gt.Skip("Feature not implemented yet") + gt.Skip("Feature not implemented yet") t := devtest.ParallelT(gt) out := presets.NewSimpleInterop(t) @@ -79,17 +79,17 @@ func TestCrossUnsafeHeadAdvancing(gt *testing.T) { latestSupervisorStatus.Chains[l2bChainID].LocalUnsafe.Number >= supervisorStatus.Chains[l2bChainID].LocalUnsafe.Number, nil }) - // // Wait and check if the cross unsafe head has advanced on L2A - // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - // status := out.L2CLA.SyncStatus() - // return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossUnsafe.Number, nil - // }) + // Wait and check if the cross unsafe head has advanced on L2A + err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + status := out.L2CLA.SyncStatus() + return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossUnsafe.Number, nil + }) - // // Wait and check if the cross unsafe head has advanced on L2B - // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - // status := out.L2CLB.SyncStatus() - // return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossUnsafe.Number, nil - // }) + // Wait and check if the cross unsafe head has advanced on L2B + err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + status := out.L2CLB.SyncStatus() + return status.CrossUnsafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossUnsafe.Number, nil + }) t.Require().NoError(err) } @@ -128,7 +128,7 @@ func TestLocalSafeHeadAdvancing(gt *testing.T) { } func TestCrossSafeHeadAdvancing(gt *testing.T) { - // gt.Skip("Feature not implemented yet") + gt.Skip("Feature not implemented yet") t := devtest.ParallelT(gt) out := presets.NewSimpleInterop(t) @@ -147,17 +147,17 @@ func TestCrossSafeHeadAdvancing(gt *testing.T) { latestSupervisorStatus.Chains[l2bChainID].CrossSafe.Number >= supervisorStatus.Chains[l2bChainID].CrossSafe.Number, nil }) - // // Wait and check if the cross safe head has advanced on L2A - // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - // status := out.L2CLA.SyncStatus() - // return status.SafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossSafe.Number, nil - // }) + // Wait and check if the cross safe head has advanced on L2A + err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + status := out.L2CLA.SyncStatus() + return status.SafeL2.Number > supervisorStatus.Chains[l2aChainID].CrossSafe.Number, nil + }) - // // Wait and check if the cross safe head has advanced on L2B - // err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { - // status := out.L2CLB.SyncStatus() - // return status.SafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossSafe.Number, nil - // }) + // Wait and check if the cross safe head has advanced on L2B + err = wait.For(t.Ctx(), 2*time.Second, func() (bool, error) { + status := out.L2CLB.SyncStatus() + return status.SafeL2.Number > supervisorStatus.Chains[l2bChainID].CrossSafe.Number, nil + }) t.Require().NoError(err) } From f54865436cce599080fcdce67734c15643146641 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 26 Jun 2025 11:24:13 +0530 Subject: [PATCH 4/5] lintfix --- crates/supervisor/core/src/chain_processor/task.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 1ddcefad22..8335722722 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -60,7 +60,8 @@ where } /// Observes an async call, recording metrics and latency for block processing. - /// The latecy is calculated as the difference between the current system time and the block's timestamp. + /// The latecy is calculated as the difference between the current system time and the block's + /// timestamp. async fn observe_block_processing( &self, event_type: &'static str, From 685006c85630110645e62c16b391c267d9baac94 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 26 Jun 2025 11:38:51 +0530 Subject: [PATCH 5/5] lintfix --- crates/supervisor/core/src/chain_processor/task.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 8335722722..ae9dfff590 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -54,7 +54,7 @@ where } /// Enables metrics on the database environment. - pub fn with_metrics(mut self) -> Self { + pub const fn with_metrics(mut self) -> Self { self.metrics_enabled = Some(true); self } @@ -215,6 +215,7 @@ where } } + #[allow(clippy::missing_const_for_fn)] fn handle_block_replacement( &self, _replacement: BlockReplacement,