From 343b0962e5b05586089cc0d285e7412a9677608a Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 23 Jul 2025 15:10:11 +0530 Subject: [PATCH 1/4] chore(supervisor/core): log improvements in managed node --- .../core/src/chain_processor/chain.rs | 2 +- .../supervisor/core/src/syncnode/resetter.rs | 32 ++++++++---- crates/supervisor/core/src/syncnode/task.rs | 50 ++++++++++--------- 3 files changed, 49 insertions(+), 35 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index a94f9e7a5c..f04ac8528d 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -94,7 +94,7 @@ where pub async fn start(&mut self) -> Result<(), ChainProcessorError> { let mut handle_guard = self.task_handle.lock().await; if handle_guard.is_some() { - warn!(target: "chain_processor", "ChainProcessor is already running"); + warn!(target: "chain_processor", chain_id = %self.chain_id, "ChainProcessor is already running"); return Ok(()) } diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 213b4979f1..15b6ab8d36 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -26,9 +26,12 @@ where /// Resets the node using the latest super head. pub(crate) async fn reset(&self) -> Result<(), ManagedNodeError> { + // get the chain ID to log it, this is useful for debugging + // no performance impact as it is cached in the client + let chain_id = self.client.chain_id().await?; let _guard = self.reset_guard.lock().await; - info!(target: "resetter", "Resetting the node"); + info!(target: "resetter", %chain_id, "Resetting the node"); let local_safe = match self.get_latest_valid_local_safe().await { Ok(block) => block, @@ -38,7 +41,7 @@ where return Ok(()); } Err(err) => { - error!(target: "resetter", %err, "Failed to get latest valid derived block"); + error!(target: "resetter", %chain_id, %err, "Failed to get latest valid derived block"); return Err(ManagedNodeError::ResetFailed); } }; @@ -46,7 +49,7 @@ where let SuperHead { cross_unsafe, cross_safe, finalized, .. } = self .db_provider .get_super_head() - .inspect_err(|err| error!(target: "resetter", %err, "Failed to get super head"))?; + .inspect_err(|err| error!(target: "resetter", %chain_id, %err, "Failed to get super head"))?; // using the local safe block as the local unsafe as well let local_unsafe = local_safe; @@ -67,6 +70,7 @@ where } info!(target: "resetter", + %chain_id, %local_unsafe, %cross_unsafe, %local_safe, @@ -85,30 +89,36 @@ where ) .await .inspect_err(|err| { - error!(target: "resetter", %err, "Failed to reset managed node"); + error!(target: "resetter", %chain_id, %err, "Failed to reset managed node"); })?; Ok(()) } async fn reset_pre_interop(&self) -> Result<(), ManagedNodeError> { - info!(target: "resetter", "Resetting the node to pre-interop state"); + // get the chain ID to log it, this is useful for debugging + // no performance impact as it is cached in the client + let chain_id = self.client.chain_id().await?; + info!(target: "resetter", %chain_id, "Resetting the node to pre-interop state"); self.client.reset_pre_interop().await.inspect_err(|err| { - error!(target: "resetter", %err, "Failed to reset managed node to pre-interop state"); + error!(target: "resetter", %chain_id, %err, "Failed to reset managed node to pre-interop state"); })?; Ok(()) } async fn get_latest_valid_local_safe(&self) -> Result { + // get the chain ID to log it, this is useful for debugging + // no performance impact as it is cached in the client + let chain_id = self.client.chain_id().await?; let latest_derivation_state = self.db_provider.latest_derivation_state().inspect_err( - |err| error!(target: "resetter", %err, "Failed to get latest derivation state"), + |err| error!(target: "resetter", %chain_id, %err, "Failed to get latest derivation state"), )?; let mut local_safe = latest_derivation_state.derived; loop { let node_block = self.client.block_ref_by_number(local_safe.number).await.inspect_err( - |err| error!(target: "resetter", %err, "Failed to get block by number"), + |err| error!(target: "resetter", %chain_id, %err, "Failed to get block by number"), )?; // If the local safe block matches the node block, we can return the super @@ -122,7 +132,7 @@ where let source_block = self .db_provider .derived_to_source(local_safe.id()) - .inspect_err(|err| error!(target: "resetter", %err, "Failed to get source block for the local safe head ref"))?; + .inspect_err(|err| error!(target: "resetter", %chain_id, %err, "Failed to get source block for the local safe head ref"))?; // Get the previous source block id let prev_source_id = @@ -131,7 +141,7 @@ where // If the previous source block id is 0, we cannot reset further. This should not happen // in prod, added for safety during dev environment. if prev_source_id.number == 0 { - error!(target: "resetter", "Source block number is 0, cannot reset further"); + error!(target: "resetter", %chain_id, "Source block number is 0, cannot reset further"); return Err(ManagedNodeError::ResetFailed); } @@ -143,7 +153,7 @@ where .db_provider .latest_derived_block_at_source(prev_source_id) .inspect_err(|err| { - error!(target: "resetter", %err, "Failed to get latest derived block for the previous source block") + error!(target: "resetter", %chain_id, %err, "Failed to get latest derived block for the previous source block") })?; } } diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 2439a53283..438764af18 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -5,6 +5,7 @@ use alloy_network::Ethereum; use alloy_provider::{Provider, RootProvider}; use kona_interop::{DerivedRefPair, ManagedEvent}; use kona_protocol::BlockInfo; +use alloy_primitives::ChainId; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; use std::sync::Arc; use tokio::sync::mpsc; @@ -64,7 +65,7 @@ where incoming_event = subscription.next() => { match incoming_event { Some(Ok(subscription_event)) => { - self.handle_managed_event(subscription_event.data).await; + self.handle_managed_event(chain_id, subscription_event.data).await; } Some(Err(err)) => { error!( @@ -101,54 +102,54 @@ where /// /// Analyzes the event content and takes appropriate actions based on the /// event fields. - pub(super) async fn handle_managed_event(&self, incoming_event: Option) { + pub(super) async fn handle_managed_event(&self, chain_id: ChainId, incoming_event: Option) { match incoming_event { Some(event) => { - debug!(target: "managed_event_task", %event, "Handling ManagedEvent"); + debug!(target: "managed_event_task", %chain_id, %event, "Handling ManagedEvent"); // Process each field of the event if it's present if let Some(reset_id) = &event.reset { - info!(target: "managed_event_task", %reset_id, "Reset event received"); + info!(target: "managed_event_task", %chain_id, %reset_id, "Reset event received"); if let Err(err) = self.resetter.reset().await { - error!(target: "managed_event_task", %err, "Failed to reset node"); + error!(target: "managed_event_task", %chain_id, %err, "Failed to reset node"); } } if let Some(unsafe_block) = &event.unsafe_block { - info!(target: "managed_event_task", %unsafe_block, "Unsafe block event received"); + info!(target: "managed_event_task", %chain_id, %unsafe_block, "Unsafe block event received"); // todo: check any pre processing needed if let Err(err) = self.event_tx.send(ChainEvent::UnsafeBlock { block: *unsafe_block }).await { - warn!(target: "managed_event_task", %err, "Failed to send unsafe block event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to send unsafe block event, channel closed or receiver dropped"); } } if let Some(derived_ref_pair) = &event.derivation_update { if event.derivation_origin_update.is_none() { - info!(target: "managed_event_task", %event, "Derivation update received without origin update"); + info!(target: "managed_event_task", %chain_id, %event, "Derivation update received without origin update"); if let Err(err) = self .event_tx .send(ChainEvent::DerivedBlock { derived_ref_pair: *derived_ref_pair }) .await { - warn!(target: "managed_event_task", %err, "Failed to derivation update event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to derivation update event, channel closed or receiver dropped"); } } } if let Some(derived_ref_pair) = &event.exhaust_l1 { - info!(target: "managed_event_task", ?derived_ref_pair, "L1 exhausted event received"); + info!(target: "managed_event_task", %chain_id, %derived_ref_pair, "L1 exhausted event received"); if let Err(err) = self.handle_exhaust_l1(derived_ref_pair).await { - error!(target: "managed_event_task", %err, "Failed to fetch next L1 block"); + error!(target: "managed_event_task", %chain_id, %err, "Failed to fetch next L1 block"); } } if let Some(replacement) = &event.replace_block { - info!(target: "managed_event_task", %replacement, "Block replacement received"); + info!(target: "managed_event_task", %chain_id, %replacement, "Block replacement received"); // todo: check any pre processing needed if let Err(err) = self @@ -156,19 +157,19 @@ where .send(ChainEvent::BlockReplaced { replacement: *replacement }) .await { - warn!(target: "managed_event_task", %err, "Failed to send block replacement event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to send block replacement event, channel closed or receiver dropped"); } } if let Some(origin) = &event.derivation_origin_update { - info!(target: "managed_event_task", %origin, "Derivation origin update received"); + info!(target: "managed_event_task", %chain_id, %origin, "Derivation origin update received"); if let Err(err) = self .event_tx .send(ChainEvent::DerivationOriginUpdate { origin: *origin }) .await { - warn!(target: "managed_event_task", %err, "Failed to send derivation origin update event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to send derivation origin update event, channel closed or receiver dropped"); } } @@ -180,12 +181,13 @@ where event.replace_block.is_none() && event.derivation_origin_update.is_none() { - debug!(target: "managed_event_task", "Received empty event with all fields None"); + debug!(target: "managed_event_task", %chain_id, "Received empty event with all fields None"); } } None => { warn!( target: "managed_event_task", + %chain_id, "Received None event, possibly an empty notification or an issue with deserialization." ); } @@ -198,13 +200,15 @@ where &self, derived_ref_pair: &DerivedRefPair, ) -> Result<(), ManagedEventTaskError> { + let chain_id = self.client.chain_id().await?; + let next_block_number = derived_ref_pair.source.number + 1; let next_block = self .l1_provider .get_block_by_number(BlockNumberOrTag::Number(next_block_number)) .await .map_err(|err| { - error!(target: "managed_event_task", %err, "Failed to fetch next L1 block"); + error!(target: "managed_event_task", %chain_id, %err, "Failed to fetch next L1 block"); ManagedEventTaskError::GetBlockByNumberFailed(next_block_number) })?; @@ -220,7 +224,7 @@ where if block.header.parent_hash != derived_ref_pair.source.hash { // this could happen due to a reorg. // this case should be handled by the reorg manager - error!(target: "managed_event_task", "L1 Block parent hash mismatch"); + error!(target: "managed_event_task", %chain_id, "L1 Block parent hash mismatch"); Err(ManagedEventTaskError::BlockHashMismatch { current: derived_ref_pair.source.hash, parent: block.header.parent_hash, @@ -235,11 +239,11 @@ where }; if let Err(err) = self.client.provide_l1(block_info).await { - error!(target: "managed_event_task", %err, "Error sending provide_l1 to managed node"); + error!(target: "managed_event_task", %chain_id, %err, "Error sending provide_l1 to managed node"); Err(ManagedEventTaskError::ManagedNodeAPICallFailed)? } - info!(target: "managed_event_task", "Sent next L1 block to managed node using provide_l1"); + info!(target: "managed_event_task", %chain_id, "Sent next L1 block to managed node using provide_l1"); Ok(()) } } @@ -339,7 +343,7 @@ mod tests { let cancel_token = CancellationToken::new(); let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); - task.handle_managed_event(Some(managed_event)).await; + task.handle_managed_event(1, Some(managed_event)).await; let event = rx.recv().await.expect("Should receive event"); match event { @@ -390,7 +394,7 @@ mod tests { let cancel_token = CancellationToken::new(); let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); - task.handle_managed_event(Some(managed_event)).await; + task.handle_managed_event(1, Some(managed_event)).await; let event = rx.recv().await.expect("Should receive event"); match event { @@ -438,7 +442,7 @@ mod tests { let cancel_token = CancellationToken::new(); let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); - task.handle_managed_event(Some(managed_event)).await; + task.handle_managed_event(1, Some(managed_event)).await; let event = rx.recv().await.expect("Should receive event"); match event { From 763f47e973b56acd30c0aedb60e45c9533ba0edf Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 23 Jul 2025 15:28:11 +0530 Subject: [PATCH 2/4] lintfix --- crates/supervisor/core/src/syncnode/resetter.rs | 8 ++++---- crates/supervisor/core/src/syncnode/task.rs | 15 +++++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 15b6ab8d36..0fb3be80b1 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -46,10 +46,10 @@ where } }; - let SuperHead { cross_unsafe, cross_safe, finalized, .. } = self - .db_provider - .get_super_head() - .inspect_err(|err| error!(target: "resetter", %chain_id, %err, "Failed to get super head"))?; + let SuperHead { cross_unsafe, cross_safe, finalized, .. } = + self.db_provider.get_super_head().inspect_err( + |err| error!(target: "resetter", %chain_id, %err, "Failed to get super head"), + )?; // using the local safe block as the local unsafe as well let local_unsafe = local_safe; diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 438764af18..6b4a41e90e 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -2,10 +2,10 @@ use super::{ManagedEventTaskError, ManagedNodeClient, resetter::Resetter}; use crate::event::ChainEvent; use alloy_eips::BlockNumberOrTag; use alloy_network::Ethereum; +use alloy_primitives::ChainId; use alloy_provider::{Provider, RootProvider}; use kona_interop::{DerivedRefPair, ManagedEvent}; use kona_protocol::BlockInfo; -use alloy_primitives::ChainId; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; use std::sync::Arc; use tokio::sync::mpsc; @@ -102,7 +102,11 @@ where /// /// Analyzes the event content and takes appropriate actions based on the /// event fields. - pub(super) async fn handle_managed_event(&self, chain_id: ChainId, incoming_event: Option) { + pub(super) async fn handle_managed_event( + &self, + chain_id: ChainId, + incoming_event: Option, + ) { match incoming_event { Some(event) => { debug!(target: "managed_event_task", %chain_id, %event, "Handling ManagedEvent"); @@ -143,7 +147,7 @@ where if let Some(derived_ref_pair) = &event.exhaust_l1 { info!(target: "managed_event_task", %chain_id, %derived_ref_pair, "L1 exhausted event received"); - if let Err(err) = self.handle_exhaust_l1(derived_ref_pair).await { + if let Err(err) = self.handle_exhaust_l1(chain_id, derived_ref_pair).await { error!(target: "managed_event_task", %chain_id, %err, "Failed to fetch next L1 block"); } } @@ -198,10 +202,9 @@ where /// node. async fn handle_exhaust_l1( &self, + chain_id: ChainId, derived_ref_pair: &DerivedRefPair, ) -> Result<(), ManagedEventTaskError> { - let chain_id = self.client.chain_id().await?; - let next_block_number = derived_ref_pair.source.number + 1; let next_block = self .l1_provider @@ -529,7 +532,7 @@ mod tests { // push the value that we expect on next call asserter.push(MockResponse::Success(serde_json::from_str(next_block).unwrap())); - let result = task.handle_exhaust_l1(&derived_ref_pair).await; + let result = task.handle_exhaust_l1(1, &derived_ref_pair).await; assert!(result.is_err(), "Expected error"); assert_eq!( From c17870ccb2073626c41483eb6b80451cfc730905 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 23 Jul 2025 15:45:22 +0530 Subject: [PATCH 3/4] testcase fixes --- .../core/src/chain_processor/task.rs | 4 +-- .../supervisor/core/src/logindexer/indexer.rs | 28 ++++++++++++------- .../supervisor/core/src/syncnode/resetter.rs | 25 +++++++++-------- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index e8eb69f0a4..ef3aefdbaa 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -46,13 +46,13 @@ where /// Creates a new [`ChainProcessorTask`]. pub fn new( rollup_config: RollupConfig, - chain_id: u64, + chain_id: ChainId, managed_node: Arc

, state_manager: Arc, cancel_token: CancellationToken, event_rx: mpsc::Receiver, ) -> Self { - let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone()); + let log_indexer = LogIndexer::new(chain_id, managed_node.clone(), state_manager.clone()); Self { rollup_config, chain_id, diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index 42302e676c..36ae20080a 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -2,6 +2,7 @@ use crate::{ logindexer::{log_to_log_hash, payload_hash_to_log_hash}, syncnode::{BlockProvider, ManagedNodeError}, }; +use alloy_primitives::ChainId; use kona_interop::parse_log_to_executing_message; use kona_protocol::BlockInfo; use kona_supervisor_storage::{LogStorageReader, LogStorageWriter, StorageError}; @@ -15,10 +16,12 @@ use tracing::debug; /// and persisting them to the state manager. #[derive(Debug)] pub struct LogIndexer { + /// The chain ID of the rollup. + chain_id: ChainId, /// Component that provides receipts for a given block hash. - pub block_provider: Arc

, + block_provider: Arc

, /// Component that persists parsed log entries to storage. - pub log_storage: Arc, + log_storage: Arc, /// Protects concurrent catch-up is_catch_up_running: Mutex, } @@ -34,8 +37,8 @@ where /// - `block_provider`: Shared reference to a component capable of fetching block ref and /// receipts. /// - `log_storage`: Shared reference to the storage layer for persisting parsed logs. - pub fn new(block_provider: Arc

, log_storage: Arc) -> Self { - Self { block_provider, log_storage, is_catch_up_running: Mutex::new(false) } + pub fn new(chain_id: ChainId, block_provider: Arc

, log_storage: Arc) -> Self { + Self { chain_id, block_provider, log_storage, is_catch_up_running: Mutex::new(false) } } /// Asynchronously initiates a background task to catch up and index logs @@ -50,7 +53,7 @@ where let mut running = self.is_catch_up_running.lock().await; if *running { - debug!(target: "log_indexer", "Catch-up running log index"); + debug!(target: "log_indexer", chain_id = %self.chain_id, "Catch-up running log index"); return; } @@ -58,7 +61,12 @@ where drop(running); // release the lock while the job runs if let Err(err) = self.index_log_upto(&block).await { - tracing::error!(target: "log_indexer", %err, "Log indexer catch-up failed"); + tracing::error!( + target: "log_indexer", + chain_id = %self.chain_id, + %err, + "Log indexer catch-up failed" + ); } let mut running = self.is_catch_up_running.lock().await; @@ -227,7 +235,7 @@ mod tests { .withf(|block, logs| block.number == 1 && logs.len() == 2) .returning(|_, _| Ok(())); - let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)); + let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)); let result = log_indexer.process_and_store_logs(&block_info).await; assert!(result.is_ok()); @@ -257,7 +265,7 @@ mod tests { .withf(|block, logs| block.number == 2 && logs.is_empty()) .returning(|_, _| Ok(())); - let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)); + let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)); let result = log_indexer.process_and_store_logs(&block_info).await; assert!(result.is_ok()); @@ -282,7 +290,7 @@ mod tests { let mock_db = MockDb::new(); // No call expected - let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)); + let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)); let result = log_indexer.process_and_store_logs(&block_info).await; assert!(result.is_err()); @@ -320,7 +328,7 @@ mod tests { mock_db.expect_store_block_logs().times(5).returning(move |_, _| Ok(())); - let indexer = Arc::new(LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db))); + let indexer = Arc::new(LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db))); indexer.clone().sync_logs(target_block); diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 0fb3be80b1..3b07dc2184 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -1,5 +1,6 @@ use super::{ManagedNodeClient, ManagedNodeError}; use alloy_eips::BlockNumHash; +use alloy_primitives::ChainId; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, StorageError}; use kona_supervisor_types::SuperHead; @@ -33,11 +34,11 @@ where info!(target: "resetter", %chain_id, "Resetting the node"); - let local_safe = match self.get_latest_valid_local_safe().await { + let local_safe = match self.get_latest_valid_local_safe(chain_id).await { Ok(block) => block, // todo: require refactor and corner case handling Err(ManagedNodeError::StorageError(StorageError::DatabaseNotInitialised)) => { - self.reset_pre_interop().await?; + self.reset_pre_interop(chain_id).await?; return Ok(()); } Err(err) => { @@ -94,10 +95,7 @@ where Ok(()) } - async fn reset_pre_interop(&self) -> Result<(), ManagedNodeError> { - // get the chain ID to log it, this is useful for debugging - // no performance impact as it is cached in the client - let chain_id = self.client.chain_id().await?; + async fn reset_pre_interop(&self, chain_id: ChainId) -> Result<(), ManagedNodeError> { info!(target: "resetter", %chain_id, "Resetting the node to pre-interop state"); self.client.reset_pre_interop().await.inspect_err(|err| { @@ -106,10 +104,10 @@ where Ok(()) } - async fn get_latest_valid_local_safe(&self) -> Result { - // get the chain ID to log it, this is useful for debugging - // no performance impact as it is cached in the client - let chain_id = self.client.chain_id().await?; + async fn get_latest_valid_local_safe( + &self, + chain_id: ChainId, + ) -> Result { let latest_derivation_state = self.db_provider.latest_derivation_state().inspect_err( |err| error!(target: "resetter", %chain_id, %err, "Failed to get latest derivation state"), )?; @@ -238,6 +236,7 @@ mod tests { db.expect_get_super_head().returning(move || Ok(super_head)); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe.unwrap())); client.expect_reset().returning(|_, _, _, _, _| Ok(())); @@ -252,7 +251,8 @@ mod tests { let mut db = MockDb::new(); db.expect_latest_derivation_state().returning(|| Err(StorageError::LockPoisoned)); - let client = MockClient::new(); + let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); let resetter = Resetter::new(Arc::new(client), Arc::new(db)); @@ -271,6 +271,7 @@ mod tests { }) }); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); client .expect_block_ref_by_number() .returning(|_| Err(ClientError::Authentication(AuthenticationError::InvalidHeader))); @@ -306,6 +307,7 @@ mod tests { .returning(move |_| Ok(last_valid_derived_block)); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); // Return a block that does not match local_safe client .expect_block_ref_by_number() @@ -340,6 +342,7 @@ mod tests { db.expect_get_super_head().returning(move || Ok(super_head)); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe.unwrap())); client.expect_reset().returning(|_, _, _, _, _| { Err(ClientError::Authentication(AuthenticationError::InvalidJwt)) From ba8802b7c78bd705baa5fb271037f28bc4ca1f85 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 23 Jul 2025 18:05:03 +0530 Subject: [PATCH 4/4] improvements --- crates/supervisor/core/src/logindexer/indexer.rs | 4 ++-- crates/supervisor/core/src/syncnode/resetter.rs | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index 36ae20080a..081eb89ab6 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -10,7 +10,7 @@ use kona_supervisor_types::{ExecutingMessage, Log}; use std::sync::Arc; use thiserror::Error; use tokio::sync::Mutex; -use tracing::debug; +use tracing::{debug, error}; /// The [`LogIndexer`] is responsible for processing L2 receipts, extracting [`ExecutingMessage`]s, /// and persisting them to the state manager. @@ -61,7 +61,7 @@ where drop(running); // release the lock while the job runs if let Err(err) = self.index_log_upto(&block).await { - tracing::error!( + error!( target: "log_indexer", chain_id = %self.chain_id, %err, diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 3b07dc2184..8ab848de62 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -6,7 +6,7 @@ use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, Sto use kona_supervisor_types::SuperHead; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, info}; +use tracing::{debug, error, info}; #[derive(Debug)] pub(super) struct Resetter { @@ -32,7 +32,7 @@ where let chain_id = self.client.chain_id().await?; let _guard = self.reset_guard.lock().await; - info!(target: "resetter", %chain_id, "Resetting the node"); + debug!(target: "resetter", %chain_id, "Resetting the node"); let local_safe = match self.get_latest_valid_local_safe(chain_id).await { Ok(block) => block, @@ -108,10 +108,7 @@ where &self, chain_id: ChainId, ) -> Result { - let latest_derivation_state = self.db_provider.latest_derivation_state().inspect_err( - |err| error!(target: "resetter", %chain_id, %err, "Failed to get latest derivation state"), - )?; - + let latest_derivation_state = self.db_provider.latest_derivation_state()?; let mut local_safe = latest_derivation_state.derived; loop {