Skip to content
This repository was archived by the owner on Jan 16, 2026. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 62 additions & 38 deletions crates/supervisor/core/src/l1_watcher/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{event::ChainEvent, syncnode::ManagedNodeController};
use alloy_eips::{BlockNumHash, BlockNumberOrTag};
use alloy_primitives::{B256, ChainId};
use alloy_primitives::ChainId;
use alloy_rpc_client::RpcClient;
use alloy_rpc_types_eth::{Block, Header};
use futures::StreamExt;
Expand Down Expand Up @@ -78,7 +78,7 @@ where
S: futures::Stream<Item = Block> + Unpin,
{
let mut finalized_number = 0;
let mut previous_latest_block = BlockNumHash { number: 0, hash: B256::ZERO };
let mut previous_latest_block: Option<BlockNumHash> = None;

loop {
tokio::select! {
Expand All @@ -89,23 +89,31 @@ where
latest_block = latest_head_stream.next() => {
if let Some(latest_block) = latest_block {
info!(target: "supervisor::l1_watcher", "Latest L1 block received: {:?}", latest_block.header.number);
self.handle_new_latest_block(latest_block, &mut previous_latest_block).await;
previous_latest_block = self.handle_new_latest_block(latest_block, previous_latest_block).await;
}
}
finalized_block = finalized_head_stream.next() => {
if let Some(finalized_block) = finalized_block {
info!(target: "supervisor::l1_watcher", "Finalized L1 block received: {:?}", finalized_block.header.number);
self.handle_new_finalized_block(finalized_block, &mut finalized_number);
finalized_number = self.handle_new_finalized_block(finalized_block, finalized_number);
}
}
}
}
}

fn handle_new_finalized_block(&self, block: Block, last_finalized_number: &mut u64) {
/// Handles a new finalized [`Block`], updating the storage and broadcasting the event.
///
/// Arguments:
/// - `block`: The finalized block to process.
/// - `last_finalized_number`: The last finalized block number.
///
/// Returns:
/// - `u64`: The new finalized block number.
fn handle_new_finalized_block(&self, block: Block, last_finalized_number: u64) -> u64 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pls get some docs on this method to explain what it returns

let block_number = block.header.number;
if block_number == *last_finalized_number {
return;
if block_number == last_finalized_number {
return last_finalized_number;
}

let Header {
Expand All @@ -123,12 +131,12 @@ where

if let Err(err) = self.finalized_l1_storage.update_finalized_l1(finalized_source_block) {
error!(target: "supervisor::l1_watcher", %err, "Failed to update finalized L1 block");
return;
return last_finalized_number;
}

self.broadcast_finalized_source_update(finalized_source_block);

*last_finalized_number = block_number;
block_number
}

fn broadcast_finalized_source_update(&self, finalized_source_block: BlockInfo) {
Expand All @@ -145,47 +153,60 @@ where
}
}

/// Handles a new latest [`Block`], checking if it requires a reorg or is sequential.
///
/// Arguments:
/// - `incoming_block`: The incoming block to process.
/// - `previous_block`: The previously stored latest block, if any.
///
/// Returns:
/// - `Option<BlockNumHash>`: The ID of the new latest block if processed successfully, or the
/// previous block if no changes were made.
async fn handle_new_latest_block(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here can we pls get some docs about the return value

&self,
incoming_block: Block,
previous_block: &mut BlockNumHash,
) {
let incoming_block_number = incoming_block.header.number;
previous_block: Option<BlockNumHash>,
) -> Option<BlockNumHash> {
let Header {
hash,
inner: alloy_consensus::Header { number, parent_hash, timestamp, .. },
..
} = incoming_block.header;
let latest_block = BlockInfo::new(hash, number, parent_hash, timestamp);

let prev = match previous_block {
Some(prev) => prev,
None => {
return Some(latest_block.id());
}
};

Comment thread
dhyaniarun1993 marked this conversation as resolved.
// Early exit if the incoming block is not newer than the previous block
if incoming_block_number <= previous_block.number {
if latest_block.number <= prev.number {
info!(
target: "supervisor::l1_watcher",
incoming_block_number,
previous_block_number = previous_block.number,
incoming_block_number = latest_block.number,
previous_block_number = prev.number,
"Incoming latest L1 block is not greater than the stored latest block"
);
return;
return previous_block;
}

trace!(
target: "l1_watcher",
block_number = incoming_block_number,
block_number = latest_block.number,
block_hash = ?incoming_block.header.hash,
"New latest L1 block received"
);

let Header {
hash,
inner: alloy_consensus::Header { number, parent_hash, timestamp, .. },
..
} = incoming_block.header;
let latest_block = BlockInfo::new(hash, number, parent_hash, timestamp);

// Early exit: check if no reorg is needed (sequential block)
if latest_block.parent_hash == previous_block.hash {
if latest_block.parent_hash == prev.hash {
trace!(
target: "supervisor::l1_watcher",
block_number = latest_block.number,
"Sequential block received, no reorg needed"
);
*previous_block = latest_block.id();
return;
return Some(latest_block.id());
}

match self.reorg_handler.handle_l1_reorg(latest_block).await {
Expand All @@ -206,7 +227,7 @@ where
}
}

*previous_block = latest_block.id();
Some(latest_block.id())
}
}

Expand Down Expand Up @@ -338,7 +359,8 @@ mod tests {
..Default::default()
};
let mut last_finalized_number = 0;
watcher.handle_new_finalized_block(block.clone(), &mut last_finalized_number);
last_finalized_number =
watcher.handle_new_finalized_block(block.clone(), last_finalized_number);

let event = rx.recv().await.unwrap();
let expected = BlockInfo::new(
Expand All @@ -351,6 +373,7 @@ mod tests {
matches!(event, ChainEvent::FinalizedSourceUpdate { ref finalized_source_block } if *finalized_source_block == expected),
"Expected FinalizedSourceUpdate with block {expected:?}, got {event:?}"
);
assert_eq!(last_finalized_number, block.header.number);
}

#[tokio::test]
Expand Down Expand Up @@ -385,8 +408,9 @@ mod tests {
..Default::default()
};
let mut last_finalized_number = 0;
watcher.handle_new_finalized_block(block, &mut last_finalized_number);
last_finalized_number = watcher.handle_new_finalized_block(block, last_finalized_number);

assert_eq!(last_finalized_number, 0);
// Should NOT broadcast if storage update fails
assert!(rx.try_recv().is_err());
}
Expand Down Expand Up @@ -417,9 +441,9 @@ mod tests {
},
..Default::default()
};
let mut last_latest_number = BlockNumHash { number: 0, hash: B256::ZERO };
watcher.handle_new_latest_block(block, &mut last_latest_number).await;
assert_eq!(last_latest_number.number, 1);
let mut last_latest_number = None;
last_latest_number = watcher.handle_new_latest_block(block, last_latest_number).await;
assert_eq!(last_latest_number.unwrap().number, 1);
// Should NOT send any event for latest block
assert!(rx.try_recv().is_err());
}
Expand Down Expand Up @@ -450,9 +474,9 @@ mod tests {
},
..Default::default()
};
let mut last_latest_number = BlockNumHash { number: 100, hash: B256::ZERO };
watcher.handle_new_latest_block(block, &mut last_latest_number).await;
assert_eq!(last_latest_number.number, 101);
let mut last_latest_number = Some(BlockNumHash { number: 100, hash: B256::ZERO });
last_latest_number = watcher.handle_new_latest_block(block, last_latest_number).await;
assert_eq!(last_latest_number.unwrap().number, 101);

// Send previous block as latest block
let reorg_block = Block {
Expand Down Expand Up @@ -480,8 +504,8 @@ mod tests {
.with(predicate::eq(reorg_block_info))
.returning(|_| Ok(()));

watcher.handle_new_latest_block(reorg_block, &mut last_latest_number).await;
assert_eq!(last_latest_number.number, 105);
last_latest_number = watcher.handle_new_latest_block(reorg_block, last_latest_number).await;
assert_eq!(last_latest_number.unwrap().number, 105);
// Should NOT send any event for latest block
assert!(rx.try_recv().is_err());
}
Expand Down