diff --git a/crates/supervisor/core/src/l1_watcher/watcher.rs b/crates/supervisor/core/src/l1_watcher/watcher.rs index f4df036654..eeaa03382b 100644 --- a/crates/supervisor/core/src/l1_watcher/watcher.rs +++ b/crates/supervisor/core/src/l1_watcher/watcher.rs @@ -1,6 +1,6 @@ use crate::{event::ChainEvent, syncnode::ManagedNodeController}; use alloy_eips::{BlockNumHash, BlockNumberOrTag}; -use alloy_primitives::{B256, ChainId}; +use alloy_primitives::ChainId; use alloy_rpc_client::RpcClient; use alloy_rpc_types_eth::{Block, Header}; use futures::StreamExt; @@ -78,7 +78,7 @@ where S: futures::Stream + Unpin, { let mut finalized_number = 0; - let mut previous_latest_block = BlockNumHash { number: 0, hash: B256::ZERO }; + let mut previous_latest_block: Option = None; loop { tokio::select! { @@ -89,23 +89,31 @@ where latest_block = latest_head_stream.next() => { if let Some(latest_block) = latest_block { info!(target: "supervisor::l1_watcher", "Latest L1 block received: {:?}", latest_block.header.number); - self.handle_new_latest_block(latest_block, &mut previous_latest_block).await; + previous_latest_block = self.handle_new_latest_block(latest_block, previous_latest_block).await; } } finalized_block = finalized_head_stream.next() => { if let Some(finalized_block) = finalized_block { info!(target: "supervisor::l1_watcher", "Finalized L1 block received: {:?}", finalized_block.header.number); - self.handle_new_finalized_block(finalized_block, &mut finalized_number); + finalized_number = self.handle_new_finalized_block(finalized_block, finalized_number); } } } } } - fn handle_new_finalized_block(&self, block: Block, last_finalized_number: &mut u64) { + /// Handles a new finalized [`Block`], updating the storage and broadcasting the event. + /// + /// Arguments: + /// - `block`: The finalized block to process. + /// - `last_finalized_number`: The last finalized block number. + /// + /// Returns: + /// - `u64`: The new finalized block number. + fn handle_new_finalized_block(&self, block: Block, last_finalized_number: u64) -> u64 { let block_number = block.header.number; - if block_number == *last_finalized_number { - return; + if block_number == last_finalized_number { + return last_finalized_number; } let Header { @@ -123,12 +131,12 @@ where if let Err(err) = self.finalized_l1_storage.update_finalized_l1(finalized_source_block) { error!(target: "supervisor::l1_watcher", %err, "Failed to update finalized L1 block"); - return; + return last_finalized_number; } self.broadcast_finalized_source_update(finalized_source_block); - *last_finalized_number = block_number; + block_number } fn broadcast_finalized_source_update(&self, finalized_source_block: BlockInfo) { @@ -145,47 +153,60 @@ where } } + /// Handles a new latest [`Block`], checking if it requires a reorg or is sequential. + /// + /// Arguments: + /// - `incoming_block`: The incoming block to process. + /// - `previous_block`: The previously stored latest block, if any. + /// + /// Returns: + /// - `Option`: The ID of the new latest block if processed successfully, or the + /// previous block if no changes were made. async fn handle_new_latest_block( &self, incoming_block: Block, - previous_block: &mut BlockNumHash, - ) { - let incoming_block_number = incoming_block.header.number; + previous_block: Option, + ) -> Option { + let Header { + hash, + inner: alloy_consensus::Header { number, parent_hash, timestamp, .. }, + .. + } = incoming_block.header; + let latest_block = BlockInfo::new(hash, number, parent_hash, timestamp); + + let prev = match previous_block { + Some(prev) => prev, + None => { + return Some(latest_block.id()); + } + }; // Early exit if the incoming block is not newer than the previous block - if incoming_block_number <= previous_block.number { + if latest_block.number <= prev.number { info!( target: "supervisor::l1_watcher", - incoming_block_number, - previous_block_number = previous_block.number, + incoming_block_number = latest_block.number, + previous_block_number = prev.number, "Incoming latest L1 block is not greater than the stored latest block" ); - return; + return previous_block; } trace!( target: "l1_watcher", - block_number = incoming_block_number, + block_number = latest_block.number, block_hash = ?incoming_block.header.hash, "New latest L1 block received" ); - let Header { - hash, - inner: alloy_consensus::Header { number, parent_hash, timestamp, .. }, - .. - } = incoming_block.header; - let latest_block = BlockInfo::new(hash, number, parent_hash, timestamp); - // Early exit: check if no reorg is needed (sequential block) - if latest_block.parent_hash == previous_block.hash { + if latest_block.parent_hash == prev.hash { trace!( target: "supervisor::l1_watcher", block_number = latest_block.number, "Sequential block received, no reorg needed" ); - *previous_block = latest_block.id(); - return; + return Some(latest_block.id()); } match self.reorg_handler.handle_l1_reorg(latest_block).await { @@ -206,7 +227,7 @@ where } } - *previous_block = latest_block.id(); + Some(latest_block.id()) } } @@ -338,7 +359,8 @@ mod tests { ..Default::default() }; let mut last_finalized_number = 0; - watcher.handle_new_finalized_block(block.clone(), &mut last_finalized_number); + last_finalized_number = + watcher.handle_new_finalized_block(block.clone(), last_finalized_number); let event = rx.recv().await.unwrap(); let expected = BlockInfo::new( @@ -351,6 +373,7 @@ mod tests { matches!(event, ChainEvent::FinalizedSourceUpdate { ref finalized_source_block } if *finalized_source_block == expected), "Expected FinalizedSourceUpdate with block {expected:?}, got {event:?}" ); + assert_eq!(last_finalized_number, block.header.number); } #[tokio::test] @@ -385,8 +408,9 @@ mod tests { ..Default::default() }; let mut last_finalized_number = 0; - watcher.handle_new_finalized_block(block, &mut last_finalized_number); + last_finalized_number = watcher.handle_new_finalized_block(block, last_finalized_number); + assert_eq!(last_finalized_number, 0); // Should NOT broadcast if storage update fails assert!(rx.try_recv().is_err()); } @@ -417,9 +441,9 @@ mod tests { }, ..Default::default() }; - let mut last_latest_number = BlockNumHash { number: 0, hash: B256::ZERO }; - watcher.handle_new_latest_block(block, &mut last_latest_number).await; - assert_eq!(last_latest_number.number, 1); + let mut last_latest_number = None; + last_latest_number = watcher.handle_new_latest_block(block, last_latest_number).await; + assert_eq!(last_latest_number.unwrap().number, 1); // Should NOT send any event for latest block assert!(rx.try_recv().is_err()); } @@ -450,9 +474,9 @@ mod tests { }, ..Default::default() }; - let mut last_latest_number = BlockNumHash { number: 100, hash: B256::ZERO }; - watcher.handle_new_latest_block(block, &mut last_latest_number).await; - assert_eq!(last_latest_number.number, 101); + let mut last_latest_number = Some(BlockNumHash { number: 100, hash: B256::ZERO }); + last_latest_number = watcher.handle_new_latest_block(block, last_latest_number).await; + assert_eq!(last_latest_number.unwrap().number, 101); // Send previous block as latest block let reorg_block = Block { @@ -480,8 +504,8 @@ mod tests { .with(predicate::eq(reorg_block_info)) .returning(|_| Ok(())); - watcher.handle_new_latest_block(reorg_block, &mut last_latest_number).await; - assert_eq!(last_latest_number.number, 105); + last_latest_number = watcher.handle_new_latest_block(reorg_block, last_latest_number).await; + assert_eq!(last_latest_number.unwrap().number, 105); // Should NOT send any event for latest block assert!(rx.try_recv().is_err()); }