diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index a94f9e7a5c..f04ac8528d 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -94,7 +94,7 @@ where pub async fn start(&mut self) -> Result<(), ChainProcessorError> { let mut handle_guard = self.task_handle.lock().await; if handle_guard.is_some() { - warn!(target: "chain_processor", "ChainProcessor is already running"); + warn!(target: "chain_processor", chain_id = %self.chain_id, "ChainProcessor is already running"); return Ok(()) } diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index e8eb69f0a4..ef3aefdbaa 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -46,13 +46,13 @@ where /// Creates a new [`ChainProcessorTask`]. pub fn new( rollup_config: RollupConfig, - chain_id: u64, + chain_id: ChainId, managed_node: Arc

, state_manager: Arc, cancel_token: CancellationToken, event_rx: mpsc::Receiver, ) -> Self { - let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone()); + let log_indexer = LogIndexer::new(chain_id, managed_node.clone(), state_manager.clone()); Self { rollup_config, chain_id, diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index 42302e676c..081eb89ab6 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -2,6 +2,7 @@ use crate::{ logindexer::{log_to_log_hash, payload_hash_to_log_hash}, syncnode::{BlockProvider, ManagedNodeError}, }; +use alloy_primitives::ChainId; use kona_interop::parse_log_to_executing_message; use kona_protocol::BlockInfo; use kona_supervisor_storage::{LogStorageReader, LogStorageWriter, StorageError}; @@ -9,16 +10,18 @@ use kona_supervisor_types::{ExecutingMessage, Log}; use std::sync::Arc; use thiserror::Error; use tokio::sync::Mutex; -use tracing::debug; +use tracing::{debug, error}; /// The [`LogIndexer`] is responsible for processing L2 receipts, extracting [`ExecutingMessage`]s, /// and persisting them to the state manager. #[derive(Debug)] pub struct LogIndexer { + /// The chain ID of the rollup. + chain_id: ChainId, /// Component that provides receipts for a given block hash. - pub block_provider: Arc

, + block_provider: Arc

, /// Component that persists parsed log entries to storage. - pub log_storage: Arc, + log_storage: Arc, /// Protects concurrent catch-up is_catch_up_running: Mutex, } @@ -34,8 +37,8 @@ where /// - `block_provider`: Shared reference to a component capable of fetching block ref and /// receipts. /// - `log_storage`: Shared reference to the storage layer for persisting parsed logs. - pub fn new(block_provider: Arc

, log_storage: Arc) -> Self { - Self { block_provider, log_storage, is_catch_up_running: Mutex::new(false) } + pub fn new(chain_id: ChainId, block_provider: Arc

, log_storage: Arc) -> Self { + Self { chain_id, block_provider, log_storage, is_catch_up_running: Mutex::new(false) } } /// Asynchronously initiates a background task to catch up and index logs @@ -50,7 +53,7 @@ where let mut running = self.is_catch_up_running.lock().await; if *running { - debug!(target: "log_indexer", "Catch-up running log index"); + debug!(target: "log_indexer", chain_id = %self.chain_id, "Catch-up running log index"); return; } @@ -58,7 +61,12 @@ where drop(running); // release the lock while the job runs if let Err(err) = self.index_log_upto(&block).await { - tracing::error!(target: "log_indexer", %err, "Log indexer catch-up failed"); + error!( + target: "log_indexer", + chain_id = %self.chain_id, + %err, + "Log indexer catch-up failed" + ); } let mut running = self.is_catch_up_running.lock().await; @@ -227,7 +235,7 @@ mod tests { .withf(|block, logs| block.number == 1 && logs.len() == 2) .returning(|_, _| Ok(())); - let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)); + let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)); let result = log_indexer.process_and_store_logs(&block_info).await; assert!(result.is_ok()); @@ -257,7 +265,7 @@ mod tests { .withf(|block, logs| block.number == 2 && logs.is_empty()) .returning(|_, _| Ok(())); - let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)); + let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)); let result = log_indexer.process_and_store_logs(&block_info).await; assert!(result.is_ok()); @@ -282,7 +290,7 @@ mod tests { let mock_db = MockDb::new(); // No call expected - let log_indexer = LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db)); + let log_indexer = LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db)); let result = log_indexer.process_and_store_logs(&block_info).await; assert!(result.is_err()); @@ -320,7 +328,7 @@ mod tests { mock_db.expect_store_block_logs().times(5).returning(move |_, _| Ok(())); - let indexer = Arc::new(LogIndexer::new(Arc::new(mock_provider), Arc::new(mock_db))); + let indexer = Arc::new(LogIndexer::new(1, Arc::new(mock_provider), Arc::new(mock_db))); indexer.clone().sync_logs(target_block); diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 213b4979f1..8ab848de62 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -1,11 +1,12 @@ use super::{ManagedNodeClient, ManagedNodeError}; use alloy_eips::BlockNumHash; +use alloy_primitives::ChainId; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, StorageError}; use kona_supervisor_types::SuperHead; use std::sync::Arc; use tokio::sync::Mutex; -use tracing::{error, info}; +use tracing::{debug, error, info}; #[derive(Debug)] pub(super) struct Resetter { @@ -26,27 +27,30 @@ where /// Resets the node using the latest super head. pub(crate) async fn reset(&self) -> Result<(), ManagedNodeError> { + // get the chain ID to log it, this is useful for debugging + // no performance impact as it is cached in the client + let chain_id = self.client.chain_id().await?; let _guard = self.reset_guard.lock().await; - info!(target: "resetter", "Resetting the node"); + debug!(target: "resetter", %chain_id, "Resetting the node"); - let local_safe = match self.get_latest_valid_local_safe().await { + let local_safe = match self.get_latest_valid_local_safe(chain_id).await { Ok(block) => block, // todo: require refactor and corner case handling Err(ManagedNodeError::StorageError(StorageError::DatabaseNotInitialised)) => { - self.reset_pre_interop().await?; + self.reset_pre_interop(chain_id).await?; return Ok(()); } Err(err) => { - error!(target: "resetter", %err, "Failed to get latest valid derived block"); + error!(target: "resetter", %chain_id, %err, "Failed to get latest valid derived block"); return Err(ManagedNodeError::ResetFailed); } }; - let SuperHead { cross_unsafe, cross_safe, finalized, .. } = self - .db_provider - .get_super_head() - .inspect_err(|err| error!(target: "resetter", %err, "Failed to get super head"))?; + let SuperHead { cross_unsafe, cross_safe, finalized, .. } = + self.db_provider.get_super_head().inspect_err( + |err| error!(target: "resetter", %chain_id, %err, "Failed to get super head"), + )?; // using the local safe block as the local unsafe as well let local_unsafe = local_safe; @@ -67,6 +71,7 @@ where } info!(target: "resetter", + %chain_id, %local_unsafe, %cross_unsafe, %local_safe, @@ -85,30 +90,30 @@ where ) .await .inspect_err(|err| { - error!(target: "resetter", %err, "Failed to reset managed node"); + error!(target: "resetter", %chain_id, %err, "Failed to reset managed node"); })?; Ok(()) } - async fn reset_pre_interop(&self) -> Result<(), ManagedNodeError> { - info!(target: "resetter", "Resetting the node to pre-interop state"); + async fn reset_pre_interop(&self, chain_id: ChainId) -> Result<(), ManagedNodeError> { + info!(target: "resetter", %chain_id, "Resetting the node to pre-interop state"); self.client.reset_pre_interop().await.inspect_err(|err| { - error!(target: "resetter", %err, "Failed to reset managed node to pre-interop state"); + error!(target: "resetter", %chain_id, %err, "Failed to reset managed node to pre-interop state"); })?; Ok(()) } - async fn get_latest_valid_local_safe(&self) -> Result { - let latest_derivation_state = self.db_provider.latest_derivation_state().inspect_err( - |err| error!(target: "resetter", %err, "Failed to get latest derivation state"), - )?; - + async fn get_latest_valid_local_safe( + &self, + chain_id: ChainId, + ) -> Result { + let latest_derivation_state = self.db_provider.latest_derivation_state()?; let mut local_safe = latest_derivation_state.derived; loop { let node_block = self.client.block_ref_by_number(local_safe.number).await.inspect_err( - |err| error!(target: "resetter", %err, "Failed to get block by number"), + |err| error!(target: "resetter", %chain_id, %err, "Failed to get block by number"), )?; // If the local safe block matches the node block, we can return the super @@ -122,7 +127,7 @@ where let source_block = self .db_provider .derived_to_source(local_safe.id()) - .inspect_err(|err| error!(target: "resetter", %err, "Failed to get source block for the local safe head ref"))?; + .inspect_err(|err| error!(target: "resetter", %chain_id, %err, "Failed to get source block for the local safe head ref"))?; // Get the previous source block id let prev_source_id = @@ -131,7 +136,7 @@ where // If the previous source block id is 0, we cannot reset further. This should not happen // in prod, added for safety during dev environment. if prev_source_id.number == 0 { - error!(target: "resetter", "Source block number is 0, cannot reset further"); + error!(target: "resetter", %chain_id, "Source block number is 0, cannot reset further"); return Err(ManagedNodeError::ResetFailed); } @@ -143,7 +148,7 @@ where .db_provider .latest_derived_block_at_source(prev_source_id) .inspect_err(|err| { - error!(target: "resetter", %err, "Failed to get latest derived block for the previous source block") + error!(target: "resetter", %chain_id, %err, "Failed to get latest derived block for the previous source block") })?; } } @@ -228,6 +233,7 @@ mod tests { db.expect_get_super_head().returning(move || Ok(super_head)); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe.unwrap())); client.expect_reset().returning(|_, _, _, _, _| Ok(())); @@ -242,7 +248,8 @@ mod tests { let mut db = MockDb::new(); db.expect_latest_derivation_state().returning(|| Err(StorageError::LockPoisoned)); - let client = MockClient::new(); + let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); let resetter = Resetter::new(Arc::new(client), Arc::new(db)); @@ -261,6 +268,7 @@ mod tests { }) }); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); client .expect_block_ref_by_number() .returning(|_| Err(ClientError::Authentication(AuthenticationError::InvalidHeader))); @@ -296,6 +304,7 @@ mod tests { .returning(move |_| Ok(last_valid_derived_block)); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); // Return a block that does not match local_safe client .expect_block_ref_by_number() @@ -330,6 +339,7 @@ mod tests { db.expect_get_super_head().returning(move || Ok(super_head)); let mut client = MockClient::new(); + client.expect_chain_id().returning(move || Ok(1)); client.expect_block_ref_by_number().returning(move |_| Ok(super_head.local_safe.unwrap())); client.expect_reset().returning(|_, _, _, _, _| { Err(ClientError::Authentication(AuthenticationError::InvalidJwt)) diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 2439a53283..6b4a41e90e 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -2,6 +2,7 @@ use super::{ManagedEventTaskError, ManagedNodeClient, resetter::Resetter}; use crate::event::ChainEvent; use alloy_eips::BlockNumberOrTag; use alloy_network::Ethereum; +use alloy_primitives::ChainId; use alloy_provider::{Provider, RootProvider}; use kona_interop::{DerivedRefPair, ManagedEvent}; use kona_protocol::BlockInfo; @@ -64,7 +65,7 @@ where incoming_event = subscription.next() => { match incoming_event { Some(Ok(subscription_event)) => { - self.handle_managed_event(subscription_event.data).await; + self.handle_managed_event(chain_id, subscription_event.data).await; } Some(Err(err)) => { error!( @@ -101,54 +102,58 @@ where /// /// Analyzes the event content and takes appropriate actions based on the /// event fields. - pub(super) async fn handle_managed_event(&self, incoming_event: Option) { + pub(super) async fn handle_managed_event( + &self, + chain_id: ChainId, + incoming_event: Option, + ) { match incoming_event { Some(event) => { - debug!(target: "managed_event_task", %event, "Handling ManagedEvent"); + debug!(target: "managed_event_task", %chain_id, %event, "Handling ManagedEvent"); // Process each field of the event if it's present if let Some(reset_id) = &event.reset { - info!(target: "managed_event_task", %reset_id, "Reset event received"); + info!(target: "managed_event_task", %chain_id, %reset_id, "Reset event received"); if let Err(err) = self.resetter.reset().await { - error!(target: "managed_event_task", %err, "Failed to reset node"); + error!(target: "managed_event_task", %chain_id, %err, "Failed to reset node"); } } if let Some(unsafe_block) = &event.unsafe_block { - info!(target: "managed_event_task", %unsafe_block, "Unsafe block event received"); + info!(target: "managed_event_task", %chain_id, %unsafe_block, "Unsafe block event received"); // todo: check any pre processing needed if let Err(err) = self.event_tx.send(ChainEvent::UnsafeBlock { block: *unsafe_block }).await { - warn!(target: "managed_event_task", %err, "Failed to send unsafe block event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to send unsafe block event, channel closed or receiver dropped"); } } if let Some(derived_ref_pair) = &event.derivation_update { if event.derivation_origin_update.is_none() { - info!(target: "managed_event_task", %event, "Derivation update received without origin update"); + info!(target: "managed_event_task", %chain_id, %event, "Derivation update received without origin update"); if let Err(err) = self .event_tx .send(ChainEvent::DerivedBlock { derived_ref_pair: *derived_ref_pair }) .await { - warn!(target: "managed_event_task", %err, "Failed to derivation update event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to derivation update event, channel closed or receiver dropped"); } } } if let Some(derived_ref_pair) = &event.exhaust_l1 { - info!(target: "managed_event_task", ?derived_ref_pair, "L1 exhausted event received"); + info!(target: "managed_event_task", %chain_id, %derived_ref_pair, "L1 exhausted event received"); - if let Err(err) = self.handle_exhaust_l1(derived_ref_pair).await { - error!(target: "managed_event_task", %err, "Failed to fetch next L1 block"); + if let Err(err) = self.handle_exhaust_l1(chain_id, derived_ref_pair).await { + error!(target: "managed_event_task", %chain_id, %err, "Failed to fetch next L1 block"); } } if let Some(replacement) = &event.replace_block { - info!(target: "managed_event_task", %replacement, "Block replacement received"); + info!(target: "managed_event_task", %chain_id, %replacement, "Block replacement received"); // todo: check any pre processing needed if let Err(err) = self @@ -156,19 +161,19 @@ where .send(ChainEvent::BlockReplaced { replacement: *replacement }) .await { - warn!(target: "managed_event_task", %err, "Failed to send block replacement event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to send block replacement event, channel closed or receiver dropped"); } } if let Some(origin) = &event.derivation_origin_update { - info!(target: "managed_event_task", %origin, "Derivation origin update received"); + info!(target: "managed_event_task", %chain_id, %origin, "Derivation origin update received"); if let Err(err) = self .event_tx .send(ChainEvent::DerivationOriginUpdate { origin: *origin }) .await { - warn!(target: "managed_event_task", %err, "Failed to send derivation origin update event, channel closed or receiver dropped"); + warn!(target: "managed_event_task", %chain_id, %err, "Failed to send derivation origin update event, channel closed or receiver dropped"); } } @@ -180,12 +185,13 @@ where event.replace_block.is_none() && event.derivation_origin_update.is_none() { - debug!(target: "managed_event_task", "Received empty event with all fields None"); + debug!(target: "managed_event_task", %chain_id, "Received empty event with all fields None"); } } None => { warn!( target: "managed_event_task", + %chain_id, "Received None event, possibly an empty notification or an issue with deserialization." ); } @@ -196,6 +202,7 @@ where /// node. async fn handle_exhaust_l1( &self, + chain_id: ChainId, derived_ref_pair: &DerivedRefPair, ) -> Result<(), ManagedEventTaskError> { let next_block_number = derived_ref_pair.source.number + 1; @@ -204,7 +211,7 @@ where .get_block_by_number(BlockNumberOrTag::Number(next_block_number)) .await .map_err(|err| { - error!(target: "managed_event_task", %err, "Failed to fetch next L1 block"); + error!(target: "managed_event_task", %chain_id, %err, "Failed to fetch next L1 block"); ManagedEventTaskError::GetBlockByNumberFailed(next_block_number) })?; @@ -220,7 +227,7 @@ where if block.header.parent_hash != derived_ref_pair.source.hash { // this could happen due to a reorg. // this case should be handled by the reorg manager - error!(target: "managed_event_task", "L1 Block parent hash mismatch"); + error!(target: "managed_event_task", %chain_id, "L1 Block parent hash mismatch"); Err(ManagedEventTaskError::BlockHashMismatch { current: derived_ref_pair.source.hash, parent: block.header.parent_hash, @@ -235,11 +242,11 @@ where }; if let Err(err) = self.client.provide_l1(block_info).await { - error!(target: "managed_event_task", %err, "Error sending provide_l1 to managed node"); + error!(target: "managed_event_task", %chain_id, %err, "Error sending provide_l1 to managed node"); Err(ManagedEventTaskError::ManagedNodeAPICallFailed)? } - info!(target: "managed_event_task", "Sent next L1 block to managed node using provide_l1"); + info!(target: "managed_event_task", %chain_id, "Sent next L1 block to managed node using provide_l1"); Ok(()) } } @@ -339,7 +346,7 @@ mod tests { let cancel_token = CancellationToken::new(); let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); - task.handle_managed_event(Some(managed_event)).await; + task.handle_managed_event(1, Some(managed_event)).await; let event = rx.recv().await.expect("Should receive event"); match event { @@ -390,7 +397,7 @@ mod tests { let cancel_token = CancellationToken::new(); let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); - task.handle_managed_event(Some(managed_event)).await; + task.handle_managed_event(1, Some(managed_event)).await; let event = rx.recv().await.expect("Should receive event"); match event { @@ -438,7 +445,7 @@ mod tests { let cancel_token = CancellationToken::new(); let task = ManagedEventTask::new(client, provider, resetter, cancel_token, tx); - task.handle_managed_event(Some(managed_event)).await; + task.handle_managed_event(1, Some(managed_event)).await; let event = rx.recv().await.expect("Should receive event"); match event { @@ -525,7 +532,7 @@ mod tests { // push the value that we expect on next call asserter.push(MockResponse::Success(serde_json::from_str(next_block).unwrap())); - let result = task.handle_exhaust_l1(&derived_ref_pair).await; + let result = task.handle_exhaust_l1(1, &derived_ref_pair).await; assert!(result.is_err(), "Expected error"); assert_eq!(