From 87e431fa9137dfc02fc429136402125ef9337086 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Wed, 23 Jul 2025 21:32:31 +0530 Subject: [PATCH 01/13] feat(supervisor/core): added invalidated block on managed node --- .../core/src/chain_processor/chain.rs | 132 +++++++----------- .../core/src/chain_processor/task.rs | 4 +- crates/supervisor/core/src/syncnode/client.rs | 20 ++- crates/supervisor/core/src/syncnode/node.rs | 7 +- .../supervisor/core/src/syncnode/resetter.rs | 3 +- crates/supervisor/core/src/syncnode/task.rs | 3 +- crates/supervisor/core/src/syncnode/traits.rs | 14 +- 7 files changed, 96 insertions(+), 87 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index a94f9e7a5c..fd5feb7d4c 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -142,99 +142,70 @@ mod tests { use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, }; - use kona_supervisor_types::{Log, OutputV0, Receipts}; + use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; - use std::{ - sync::atomic::{AtomicBool, Ordering}, - time::Duration, - }; + use std::time::Duration; use tokio::time::sleep; - #[derive(Debug)] - struct MockNode { - subscribed: Arc, - } - - impl MockNode { - fn new() -> Self { - Self { subscribed: Arc::new(AtomicBool::new(false)) } - } - } + mock!( + #[derive(Debug)] + pub Node {} - #[async_trait] - impl NodeSubscriber for MockNode { - async fn start_subscription( - &self, - _tx: mpsc::Sender, - ) -> Result<(), ManagedNodeError> { - self.subscribed.store(true, Ordering::SeqCst); - Ok(()) + #[async_trait] + impl NodeSubscriber for Node { + async fn start_subscription( + &self, + _event_tx: mpsc::Sender, + ) -> Result<(), ManagedNodeError>; } - } - #[async_trait] - impl BlockProvider for MockNode { - async fn fetch_receipts(&self, _block_hash: B256) -> Result { - Ok(vec![]) // dummy + #[async_trait] + impl BlockProvider for Node { + async fn fetch_receipts(&self, _block_hash: B256) -> Result; + async fn block_by_number(&self, _number: u64) -> Result; } - async fn block_by_number(&self, _number: u64) -> Result { - Ok(BlockInfo::default()) - } - } + #[async_trait] + impl ManagedNodeDataProvider for Node { + async fn output_v0_at_timestamp( + &self, + _timestamp: u64, + ) -> Result; - #[async_trait] - impl ManagedNodeDataProvider for MockNode { - async fn output_v0_at_timestamp( - &self, - _timestamp: u64, - ) -> Result { - Ok(OutputV0::default()) - } + async fn pending_output_v0_at_timestamp( + &self, + _timestamp: u64, + ) -> Result; - async fn pending_output_v0_at_timestamp( - &self, - _timestamp: u64, - ) -> Result { - Ok(OutputV0::default()) + async fn l2_block_ref_by_timestamp( + &self, + _timestamp: u64, + ) -> Result; } - async fn l2_block_ref_by_timestamp( - &self, - _timestamp: u64, - ) -> Result { - Ok(BlockInfo::default()) - } - } + #[async_trait] + impl ManagedNodeController for Node { + async fn update_finalized( + &self, + _finalized_block_id: BlockNumHash, + ) -> Result<(), ManagedNodeError>; - #[async_trait] - impl ManagedNodeController for MockNode { - async fn update_finalized( - &self, - _finalized_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { - Ok(()) - } + async fn update_cross_unsafe( + &self, + cross_unsafe_block_id: BlockNumHash, + ) -> Result<(), ManagedNodeError>; - async fn update_cross_unsafe( - &self, - _cross_unsafe_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { - Ok(()) - } + async fn update_cross_safe( + &self, + source_block_id: BlockNumHash, + derived_block_id: BlockNumHash, + ) -> Result<(), ManagedNodeError>; - async fn update_cross_safe( - &self, - _source_block_id: BlockNumHash, - _derived_block_id: BlockNumHash, - ) -> Result<(), ManagedNodeError> { - Ok(()) - } + async fn reset(&self) -> Result<(), ManagedNodeError>; - async fn reset(&self) -> Result<(), ManagedNodeError> { - Ok(()) + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>; } - } + ); mock!( #[derive(Debug)] @@ -297,7 +268,9 @@ mod tests { #[tokio::test] async fn test_chain_processor_start_sets_task_and_calls_subscription() { - let mock_node = Arc::new(MockNode::new()); + let mut mock_node = MockNode::new(); + mock_node.expect_start_subscription().returning(|_| Ok(())); + let storage = Arc::new(MockDb::new()); let cancel_token = CancellationToken::new(); @@ -305,7 +278,7 @@ mod tests { let mut processor = ChainProcessor::new( rollup_config, 1, - Arc::clone(&mock_node), + Arc::new(mock_node), Arc::clone(&storage), cancel_token, ); @@ -315,9 +288,6 @@ mod tests { // Wait a moment for task to spawn and subscription to run sleep(Duration::from_millis(50)).await; - // Ensure start_subscription was called - assert!(mock_node.subscribed.load(Ordering::SeqCst)); - let handle_guard = processor.task_handle.lock().await; assert!(handle_guard.is_some()); } diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index e8eb69f0a4..902484f9a0 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -457,7 +457,7 @@ mod tests { use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, }; - use kona_supervisor_types::{Log, OutputV0, Receipts}; + use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; use std::time::Duration; use tokio::sync::mpsc; @@ -517,6 +517,8 @@ mod tests { ) -> Result<(), ManagedNodeError>; async fn reset(&self) -> Result<(), ManagedNodeError>; + + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>; } ); diff --git a/crates/supervisor/core/src/syncnode/client.rs b/crates/supervisor/core/src/syncnode/client.rs index 9fc0b731f7..d932f1f3ba 100644 --- a/crates/supervisor/core/src/syncnode/client.rs +++ b/crates/supervisor/core/src/syncnode/client.rs @@ -9,7 +9,7 @@ use jsonrpsee::{ }; use kona_supervisor_metrics::observe_metrics_for_result_async; use kona_supervisor_rpc::{BlockInfo, ManagedModeApiClient, jsonrpsee::SubscriptionTopic}; -use kona_supervisor_types::{OutputV0, Receipts, SubscriptionEvent}; +use kona_supervisor_types::{BlockSeal, OutputV0, Receipts, SubscriptionEvent}; use std::{ fmt::Debug, sync::{Arc, OnceLock}, @@ -55,6 +55,9 @@ pub trait ManagedNodeClient: Debug { finalised_id: BlockNumHash, ) -> Result<(), ClientError>; + /// Invalidates a block in the managed node. + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>; + /// Provides L1 [`BlockInfo`] to the managed node. async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; @@ -350,6 +353,21 @@ impl ManagedNodeClient for Client { Ok(()) } + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError> { + let client = self.get_ws_client().await?; + observe_metrics_for_result_async!( + Metrics::MANAGED_NODE_RPC_REQUESTS_SUCCESS_TOTAL, + Metrics::MANAGED_NODE_RPC_REQUESTS_ERROR_TOTAL, + Metrics::MANAGED_NODE_RPC_REQUEST_DURATION_SECONDS, + "invalidate_block", + async { + ManagedModeApiClient::invalidate_block(client.as_ref(), seal).await + }, + "node" => self.config.url.clone() + )?; + Ok(()) + } + async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError> { let client = self.get_ws_client().await?; observe_metrics_for_result_async!( diff --git a/crates/supervisor/core/src/syncnode/node.rs b/crates/supervisor/core/src/syncnode/node.rs index 841da84daf..e465abd505 100644 --- a/crates/supervisor/core/src/syncnode/node.rs +++ b/crates/supervisor/core/src/syncnode/node.rs @@ -7,7 +7,7 @@ use alloy_rpc_types_eth::BlockNumHash; use async_trait::async_trait; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, LogStorageReader}; -use kona_supervisor_types::{OutputV0, Receipts}; +use kona_supervisor_types::{BlockSeal, OutputV0, Receipts}; use std::sync::Arc; use tokio::{ sync::{Mutex, mpsc}, @@ -192,4 +192,9 @@ where self.resetter.reset().await?; Ok(()) } + + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError> { + self.client.invalidate_block(seal).await?; + Ok(()) + } } diff --git a/crates/supervisor/core/src/syncnode/resetter.rs b/crates/supervisor/core/src/syncnode/resetter.rs index 213b4979f1..544ebf3663 100644 --- a/crates/supervisor/core/src/syncnode/resetter.rs +++ b/crates/supervisor/core/src/syncnode/resetter.rs @@ -160,7 +160,7 @@ mod tests { use kona_interop::{DerivedRefPair, SafetyLevel}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, HeadRefStorageReader, StorageError}; - use kona_supervisor_types::{OutputV0, Receipts, SubscriptionEvent, SuperHead}; + use kona_supervisor_types::{BlockSeal, OutputV0, Receipts, SubscriptionEvent, SuperHead}; use mockall::{mock, predicate}; // Mock for HeadRefStorageReader @@ -195,6 +195,7 @@ mod tests { async fn block_ref_by_number(&self, block_number: u64) -> Result; async fn reset_pre_interop(&self) -> Result<(), ClientError>; async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>; async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; diff --git a/crates/supervisor/core/src/syncnode/task.rs b/crates/supervisor/core/src/syncnode/task.rs index 2439a53283..29b006b5e4 100644 --- a/crates/supervisor/core/src/syncnode/task.rs +++ b/crates/supervisor/core/src/syncnode/task.rs @@ -257,7 +257,7 @@ mod tests { use kona_interop::{BlockReplacement, DerivedRefPair, SafetyLevel}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{DerivationStorageReader, LogStorageReader, StorageError}; - use kona_supervisor_types::{Log, OutputV0, Receipts, SubscriptionEvent, SuperHead}; + use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts, SubscriptionEvent, SuperHead}; use mockall::mock; mock! { @@ -297,6 +297,7 @@ mod tests { async fn block_ref_by_number(&self, block_number: u64) -> Result; async fn reset_pre_interop(&self) -> Result<(), ClientError>; async fn reset(&self, unsafe_id: BlockNumHash, cross_unsafe_id: BlockNumHash, local_safe_id: BlockNumHash, cross_safe_id: BlockNumHash, finalised_id: BlockNumHash) -> Result<(), ClientError>; + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ClientError>; async fn provide_l1(&self, block_info: BlockInfo) -> Result<(), ClientError>; async fn update_finalized(&self, finalized_block_id: BlockNumHash) -> Result<(), ClientError>; async fn update_cross_unsafe(&self, cross_unsafe_block_id: BlockNumHash) -> Result<(), ClientError>; diff --git a/crates/supervisor/core/src/syncnode/traits.rs b/crates/supervisor/core/src/syncnode/traits.rs index a4f57dc474..6841b13293 100644 --- a/crates/supervisor/core/src/syncnode/traits.rs +++ b/crates/supervisor/core/src/syncnode/traits.rs @@ -4,7 +4,7 @@ use alloy_eips::BlockNumHash; use alloy_primitives::B256; use async_trait::async_trait; use kona_protocol::BlockInfo; -use kona_supervisor_types::{OutputV0, Receipts}; +use kona_supervisor_types::{BlockSeal, OutputV0, Receipts}; use std::fmt::Debug; use tokio::sync::mpsc; @@ -138,6 +138,18 @@ pub trait ManagedNodeController: Send + Sync + Debug { /// * `Ok(())` on success /// * `Err(ManagedNodeError)` if the reset fails async fn reset(&self) -> Result<(), ManagedNodeError>; + + /// Instructs the managed node to invalidate a block. + /// This is used when the supervisor detects an invalid block + /// and needs to roll back the node's state. + /// + /// # Arguments + /// * `seal` - The [`BlockSeal`] of the block. + /// + /// # Returns + /// * `Ok(())` on success + /// * `Err(ManagedNodeError)` if the invalidation fails + async fn invalidate_block(&self, seal: BlockSeal) -> Result<(), ManagedNodeError>; } /// Composite trait for any node that provides: From 8af96a72c4a560cae5cc5d7c9790832a4c7f719f Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Thu, 24 Jul 2025 07:12:19 +0600 Subject: [PATCH 02/13] added reorg integration to chain processor --- .../core/src/chain_processor/chain.rs | 6 ++ .../core/src/chain_processor/task.rs | 89 ++++++++++++++++++- crates/supervisor/core/src/rewinder/chain.rs | 11 +-- crates/supervisor/storage/src/chaindb.rs | 19 ++-- crates/supervisor/storage/src/error.rs | 4 + 5 files changed, 112 insertions(+), 17 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index a94f9e7a5c..8b34e74507 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -3,6 +3,7 @@ use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvid use alloy_primitives::ChainId; use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, + StorageRewinder, }; use std::sync::Arc; use tokio::{ @@ -50,6 +51,7 @@ where + LogStorageReader + DerivationStorageWriter + HeadRefStorageWriter + + StorageRewinder + 'static, { /// Creates a new instance of [`ChainProcessor`]. @@ -293,6 +295,10 @@ mod tests { block: &BlockInfo, ) -> Result; } + impl StorageRewinder for Db { + fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError>; + fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError>; + } ); #[tokio::test] diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index e8eb69f0a4..f3c1a16e50 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -1,13 +1,14 @@ use super::Metrics; use crate::{ - ChainProcessorError, LogIndexer, config::RollupConfig, event::ChainEvent, + ChainProcessorError, ChainRewinder, LogIndexer, config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider, }; use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError, + DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, + StorageError, StorageRewinder, }; use std::{fmt::Debug, sync::Arc}; use tokio::sync::mpsc; @@ -28,6 +29,8 @@ pub struct ChainProcessorTask { log_indexer: Arc>, + rewinder: Arc>, + cancel_token: CancellationToken, /// The channel for receiving node events. @@ -41,6 +44,7 @@ where + LogStorageReader + DerivationStorageWriter + HeadRefStorageWriter + + StorageRewinder + 'static, { /// Creates a new [`ChainProcessorTask`]. @@ -53,6 +57,8 @@ where event_rx: mpsc::Receiver, ) -> Self { let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone()); + let rewinder = ChainRewinder::new(chain_id, state_manager.clone()); + Self { rollup_config, chain_id, @@ -62,6 +68,7 @@ where event_rx, state_manager, log_indexer: Arc::from(log_indexer), + rewinder: Arc::from(rewinder), } } @@ -348,6 +355,17 @@ where } return Err(StorageError::BlockOutOfOrder.into()); } + + Err(StorageError::ReorgRequired) => { + error!( + target: "chain_processor", + chain = self.chain_id, + derived_block = %derived_ref_pair.derived, + "Local derivation conflict detected — rewinding" + ); + self.rewinder.handle_local_reorg(&derived_ref_pair)?; + } + Err(err) => { error!( target: "chain_processor", @@ -577,6 +595,11 @@ mod tests { block: &BlockInfo, ) -> Result; } + + impl StorageRewinder for Db { + fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError>; + fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError>; + } ); fn genesis() -> Genesis { @@ -918,6 +941,68 @@ mod tests { task_handle.await.unwrap(); } + #[tokio::test] + async fn test_handle_derived_event_block_triggers_reorg() { + let block_pair = DerivedRefPair { + source: BlockInfo { + number: 123, + hash: B256::ZERO, + parent_hash: B256::ZERO, + timestamp: 0, + }, + derived: BlockInfo { + number: 1234, + hash: B256::ZERO, + parent_hash: B256::ZERO, + timestamp: 1003, // post-interop + }, + }; + + let mut mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + // Simulate ReorgRequired error + mockdb + .expect_save_derived_block() + .returning(move |_pair: DerivedRefPair| Err(StorageError::ReorgRequired)); + + mockdb.expect_get_block().returning(move |num| { + Ok(BlockInfo { + number: num, + hash: B256::random(), // different hash from safe derived block + parent_hash: B256::ZERO, + timestamp: 1003, // post-interop + }) + }); + + // Expect reorg on log storage + mockdb.expect_rewind_log_storage().returning(|_block_id| Ok(())); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + + let cancel_token = CancellationToken::new(); + let (tx, rx) = mpsc::channel(10); + + let rollup_config = get_rollup_config(1000); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + tx.send(ChainEvent::DerivedBlock { derived_ref_pair: block_pair }).await.unwrap(); + + let task_handle = tokio::spawn(task.run()); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + cancel_token.cancel(); + task_handle.await.unwrap(); + } + #[tokio::test] async fn test_handle_derived_event_other_error() { let block_pair = DerivedRefPair { diff --git a/crates/supervisor/core/src/rewinder/chain.rs b/crates/supervisor/core/src/rewinder/chain.rs index a2cf5c7f21..639dd00342 100644 --- a/crates/supervisor/core/src/rewinder/chain.rs +++ b/crates/supervisor/core/src/rewinder/chain.rs @@ -2,6 +2,7 @@ use alloy_primitives::ChainId; use derive_more::Constructor; use kona_interop::DerivedRefPair; use kona_supervisor_storage::{LogStorageReader, StorageError, StorageRewinder}; +use std::sync::Arc; use thiserror::Error; use tracing::{error, info, warn}; @@ -16,7 +17,7 @@ use tracing::{error, info, warn}; #[derive(Debug, Constructor)] pub struct ChainRewinder { chain_id: ChainId, - db: DB, + db: Arc, } #[expect(dead_code)] // todo: to be removed in the follow up PR @@ -28,13 +29,7 @@ where /// /// This is triggered when an update to supervisor storage fails due to an /// integrity violation (e.g., mismatched on storing local safe block hash). - fn handle_local_reorg(&self, derived_pair: &DerivedRefPair) -> Result<(), StorageError> { - warn!( - target: "rewinder", - chain = %self.chain_id, - derived_block = %derived_pair.derived, - "Local derivation conflict detected — rewinding..." - ); + pub fn handle_local_reorg(&self, derived_pair: &DerivedRefPair) -> Result<(), StorageError> { // get the same block from log storage let conflicting_block = self.db.get_block(derived_pair.derived.number).inspect_err(|err| { diff --git a/crates/supervisor/storage/src/chaindb.rs b/crates/supervisor/storage/src/chaindb.rs index 18db0978b6..b172e9ba29 100644 --- a/crates/supervisor/storage/src/chaindb.rs +++ b/crates/supervisor/storage/src/chaindb.rs @@ -117,6 +117,14 @@ impl DerivationStorageWriter for ChainDb { fn save_derived_block(&self, incoming_pair: DerivedRefPair) -> Result<(), StorageError> { self.observe_call("save_derived_block", || { self.env.update(|ctx| { + DerivationProvider::new(ctx, self.chain_id).save_derived_block(incoming_pair)?; + + // Verify the consistency with log storage. + // The check is intentionally deferred until after saving the derived block, + // ensuring validation only triggers on the committed state to prevent false + // positives. + // Example: If the parent derived block doesn't exist, it should return error from + // derivation provider, not from log provider. let derived_block = incoming_pair.derived; let block = LogProvider::new(ctx, self.chain_id) .get_block(derived_block.number) @@ -124,7 +132,6 @@ impl DerivationStorageWriter for ChainDb { StorageError::EntryNotFound(_) => { error!( target: "supervisor_storage", - chain_id = %self.chain_id, incoming_block = %derived_block, "Derived block not found in log storage: {derived_block:?}" ); @@ -132,18 +139,16 @@ impl DerivationStorageWriter for ChainDb { } other => other, // propagate other errors as-is })?; - if block != derived_block { error!( target: "supervisor_storage", - chain_id = %self.chain_id, incoming_block = %derived_block, stored_log_block = %block, "Derived block does not match the stored log block" ); - return Err(StorageError::ConflictError); + return Err(StorageError::ReorgRequired); } - DerivationProvider::new(ctx, self.chain_id).save_derived_block(incoming_pair)?; + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived) }) @@ -687,9 +692,9 @@ mod tests { db.initialise_log_storage(anchor.derived).expect("initialise log storage"); db.initialise_derivation_storage(anchor).expect("initialise derivation storage"); - // Save derived block pair - should error conflict + // Save derived block pair - should error BlockOutOfOrder error let err = db.save_derived_block(derived_pair).unwrap_err(); - assert!(matches!(err, StorageError::ConflictError)); + assert!(matches!(err, StorageError::BlockOutOfOrder)); db.store_block_logs( &BlockInfo { diff --git a/crates/supervisor/storage/src/error.rs b/crates/supervisor/storage/src/error.rs index bea913b126..8888ab7297 100644 --- a/crates/supervisor/storage/src/error.rs +++ b/crates/supervisor/storage/src/error.rs @@ -37,6 +37,10 @@ pub enum StorageError { /// Represents an error that occurred while writing to log database. #[error("latest stored block is not parent of the incoming block")] BlockOutOfOrder, + + /// Represents an error that occurred when there is inconsistency in log storage + #[error("reorg required due to inconsistent storage state")] + ReorgRequired, } impl PartialEq for StorageError { From a6a37ce04df113c3a9a3de4a1dc1ab511627e687 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Thu, 24 Jul 2025 12:17:36 +0530 Subject: [PATCH 03/13] added missing metrics --- crates/supervisor/core/src/syncnode/metrics.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/crates/supervisor/core/src/syncnode/metrics.rs b/crates/supervisor/core/src/syncnode/metrics.rs index 34df2490de..fca829fff3 100644 --- a/crates/supervisor/core/src/syncnode/metrics.rs +++ b/crates/supervisor/core/src/syncnode/metrics.rs @@ -18,12 +18,21 @@ impl Metrics { // --- RPC Method Names (for zeroing) --- /// Lists all managed mode RPC methods here to ensure they are pre-registered. - const RPC_METHODS: [&'static str; 5] = [ + const RPC_METHODS: [&'static str; 14] = [ + "chain_id", + "subscribe_events", "fetch_receipts", "output_v0_at_timestamp", "pending_output_v0_at_timestamp", "l2_block_ref_by_timestamp", + "block_ref_by_number", + "reset_pre_interop", + "reset", + "invalidate_block", "provide_l1", + "update_finalized", + "update_cross_unsafe", + "update_cross_safe", ]; /// Initializes metrics for the Supervisor RPC service. From d7d47009958d1fa26c8cd093ddca822b02e3ffa4 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Fri, 25 Jul 2025 13:54:07 +0530 Subject: [PATCH 04/13] initial commit --- .../core/src/chain_processor/chain.rs | 18 +- .../core/src/chain_processor/task.rs | 207 +++++++++++++++++- crates/supervisor/core/src/event/chain.rs | 6 + .../core/src/safety_checker/task.rs | 47 +++- crates/supervisor/types/src/types.rs | 8 +- 5 files changed, 270 insertions(+), 16 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index fd5feb7d4c..b67716433c 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -2,7 +2,8 @@ use super::{ChainProcessorError, ChainProcessorTask}; use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, + LogStorageWriter, StorageRewinder, }; use std::sync::Arc; use tokio::{ @@ -48,8 +49,10 @@ where P: ManagedNodeProvider + 'static, W: LogStorageWriter + LogStorageReader + + DerivationStorageReader + DerivationStorageWriter + HeadRefStorageWriter + + StorageRewinder + 'static, { /// Creates a new instance of [`ChainProcessor`]. @@ -224,13 +227,19 @@ mod tests { ) -> Result<(), StorageError>; } - impl LogStorageReader for Db { + impl LogStorageReader for Db { fn get_block(&self, block_number: u64) -> Result; fn get_latest_block(&self) -> Result; fn get_log(&self,block_number: u64,log_index: u32) -> Result; fn get_logs(&self, block_number: u64) -> Result, StorageError>; } + impl DerivationStorageReader for Db { + fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result; + fn latest_derived_block_at_source(&self, source_block_id: BlockNumHash) -> Result; + fn latest_derivation_state(&self) -> Result; + } + impl DerivationStorageWriter for Db { fn initialise_derivation_storage( &self, @@ -264,6 +273,11 @@ mod tests { block: &BlockInfo, ) -> Result; } + + impl StorageRewinder for Db { + fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError>; + fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError>; + } ); #[tokio::test] diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 902484f9a0..fe3e42dde0 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -7,12 +7,14 @@ use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, + LogStorageWriter, StorageError, StorageRewinder, }; +use kona_supervisor_types::BlockSeal; use std::{fmt::Debug, sync::Arc}; -use tokio::sync::mpsc; +use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; /// Represents a task that processes chain events from a managed node. /// It listens for events emitted by the managed node and handles them accordingly. @@ -32,6 +34,8 @@ pub struct ChainProcessorTask { /// The channel for receiving node events. event_rx: mpsc::Receiver, + + invalidated_block: RwLock>, } impl ChainProcessorTask @@ -40,7 +44,9 @@ where W: LogStorageWriter + LogStorageReader + DerivationStorageWriter + + DerivationStorageReader + HeadRefStorageWriter + + StorageRewinder + 'static, { /// Creates a new [`ChainProcessorTask`]. @@ -62,6 +68,7 @@ where event_rx, state_manager, log_indexer: Arc::from(log_indexer), + invalidated_block: RwLock::new(None), } } @@ -197,8 +204,19 @@ where ); }); } + ChainEvent::InvalidateBlock { block } => { + let _ = self.handle_invalidate_block(block).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + %err, + "Failed to invalidate block" + ); + }); + } ChainEvent::BlockReplaced { replacement } => { - let _ = self.handle_block_replacement(replacement).inspect_err(|err| { + let _ = self.handle_block_replacement(replacement).await.inspect_err(|err| { error!( target: "chain_processor", chain_id = self.chain_id, @@ -257,12 +275,121 @@ where } } - #[allow(clippy::missing_const_for_fn)] - fn handle_block_replacement( + async fn handle_block_replacement( &self, - _replacement: BlockReplacement, + replacement: BlockReplacement, ) -> Result<(), ChainProcessorError> { // Logic to handle block replacement + info!( + target: "chain_processor", + chain_id = self.chain_id, + %replacement, + "Handling block replacement" + ); + + let mut guard = self.invalidated_block.write().await; + // check if invalidated block is same as replacement block + if let Some(invalidated_ref_pair) = *guard { + if invalidated_ref_pair.derived.hash == replacement.invalidated { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + "Invalidated block matches replacement block, skipping" + ); + + *guard = None; + // save the derived block + let derived_ref_pair = DerivedRefPair { + source: invalidated_ref_pair.source, + derived: replacement.replacement, + }; + + // todo: index logs if needed + // hardcoding logs for now + self.state_manager.store_block_logs(&replacement.replacement, Vec::new()).inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + %err, + "Failed to store logs for derived block on replacement" + ); + })?; + self.state_manager.save_derived_block(derived_ref_pair).inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + %err, + "Failed to save derived block after replacement" + ); + })?; + return Ok(()); + } + } else { + warn!( + target: "chain_processor", + chain_id = self.chain_id, + "No invalidated block found, but block replacement event received" + ); + } + + Ok(()) + } + + async fn handle_invalidate_block(&self, block: BlockInfo) -> Result<(), ChainProcessorError> { + info!( + target: "chain_processor", + chain_id = self.chain_id, + invalidated_block = %block, + "Processing invalidate block" + ); + + let mut invalidated_block = self.invalidated_block.write().await; + if invalidated_block.is_some() { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + "Invalidated block already set, skipping" + ); + return Ok(()); + } + + // todo: handle error if block is not found or conflict error + let source_block = self.state_manager.derived_to_source(block.id())?; + + // rewind the storage to the block before the invalidated block + let to = block.id(); + self.state_manager.rewind(&to)?; + + // log latest derivation and log state for debugging + let latest_derivation_state = self.state_manager.latest_derivation_state()?; + info!( + target: "chain_processor", + chain_id = self.chain_id, + latest_derivation_state = ?latest_derivation_state, + "Latest derivation state after rewinding storage" + ); + + let latest_log_state = self.state_manager.get_latest_block()?; + info!( + target: "chain_processor", + chain_id = self.chain_id, + latest_log_state = ?latest_log_state, + "Latest log state after rewinding storage" + ); + + let block_seal = BlockSeal::new(block.hash, block.number, block.timestamp); + self.managed_node.invalidate_block(block_seal).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + %err, + "Failed to invalidate block" + ); + })?; + + *invalidated_block = Some(DerivedRefPair { source: source_block, derived: block }); Ok(()) } @@ -292,6 +419,18 @@ where block_number = origin.number, "Processing derivation origin update" ); + + let invalidated_block = self.invalidated_block.read().await; + if invalidated_block.is_some() { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = origin.number, + "Invalidated block set, skipping derivation origin update" + ); + return Ok(()); + } + match self.state_manager.save_source_block(origin) { Ok(_) => Ok(()), Err(StorageError::BlockOutOfOrder | StorageError::ConflictError) => { @@ -327,6 +466,38 @@ where "Processing local safe derived block pair" ); + let invalidated_block = self.invalidated_block.read().await; + if invalidated_block.is_some() { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + "Invalidated block already set, skipping safe event processing" + ); + return Ok(derived_ref_pair.derived); + } + + let latest_derivation_state = self.state_manager.latest_derivation_state()?; + info!( + target: "chain_processor", + chain_id = self.chain_id, + latest_derivation_state = ?latest_derivation_state, + "Latest derivation state in handle_safe_event" + ); + + // // for testing purpose + // // trigger handle_invalidate with block 15 at block 20 + // if derived_ref_pair.derived.number == 20 { + // info!( + // target: "chain_processor", + // chain_id = self.chain_id, + // block_number = derived_ref_pair.derived.number, + // "Triggering handle_invalidate for block 15 at block 20" + // ); + // let block_15 = self.state_manager.get_block(15)?; + // let _ = self.handle_invalidate_block(block_15).await; + // } + if self.rollup_config.is_post_interop(derived_ref_pair.derived.timestamp) { match self.state_manager.save_derived_block(derived_ref_pair) { Ok(_) => return Ok(derived_ref_pair.derived), @@ -386,6 +557,17 @@ where "Processing unsafe block" ); + let invalidated_block = self.invalidated_block.read().await; + if invalidated_block.is_some() { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + "Invalidated block already set, skipping unsafe event processing" + ); + return Ok(block); + } + if self.rollup_config.is_post_interop(block.timestamp) { self.log_indexer.clone().sync_logs(block); return Ok(block); @@ -546,6 +728,12 @@ mod tests { fn get_logs(&self, block_number: u64) -> Result, StorageError>; } + impl DerivationStorageReader for Db { + fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result; + fn latest_derived_block_at_source(&self, source_block_id: BlockNumHash) -> Result; + fn latest_derivation_state(&self) -> Result; + } + impl DerivationStorageWriter for Db { fn initialise_derivation_storage( &self, @@ -579,6 +767,11 @@ mod tests { block: &BlockInfo, ) -> Result; } + + impl StorageRewinder for Db { + fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError>; + fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError>; + } ); fn genesis() -> Genesis { diff --git a/crates/supervisor/core/src/event/chain.rs b/crates/supervisor/core/src/event/chain.rs index 0f9b666a8a..7b9e102e09 100644 --- a/crates/supervisor/core/src/event/chain.rs +++ b/crates/supervisor/core/src/event/chain.rs @@ -26,6 +26,12 @@ pub enum ChainEvent { origin: BlockInfo, }, + /// A invalidateBlock event, indicating that a block has been invalidated. + InvalidateBlock { + /// The [`BlockInfo`] of the block that has been invalidated. + block: BlockInfo, + }, + /// A block replacement event, indicating that a block has been replaced with a new one. BlockReplaced { /// The [`BlockReplacement`] containing the replacement block and the invalidated block diff --git a/crates/supervisor/core/src/safety_checker/task.rs b/crates/supervisor/core/src/safety_checker/task.rs index 5446130220..1c8b73ef8f 100644 --- a/crates/supervisor/core/src/safety_checker/task.rs +++ b/crates/supervisor/core/src/safety_checker/task.rs @@ -4,10 +4,13 @@ use crate::{ safety_checker::{CrossSafetyChecker, traits::SafetyPromoter}, }; use alloy_primitives::ChainId; -use derive_more::Constructor; use kona_protocol::BlockInfo; use kona_supervisor_storage::{CrossChainSafetyProvider, StorageError}; -use std::{sync::Arc, time::Duration}; +use op_alloy_consensus::interop::SafetyLevel; +use std::{ + sync::{Arc, Mutex}, + time::Duration, +}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -16,7 +19,7 @@ use tracing::{error, info, warn}; /// /// It uses [`CrossChainSafetyProvider`] to fetch candidate blocks and the [`CrossSafetyChecker`] /// to validate cross-chain message dependencies. -#[derive(Debug, Constructor)] +#[derive(Debug)] pub struct CrossSafetyCheckerJob { chain_id: ChainId, provider: Arc

, @@ -24,6 +27,8 @@ pub struct CrossSafetyCheckerJob { interval: Duration, promoter: L, event_tx: mpsc::Sender, + + test_run: Mutex, } impl CrossSafetyCheckerJob @@ -31,6 +36,26 @@ where P: CrossChainSafetyProvider + Send + Sync + 'static, L: SafetyPromoter, { + /// Creates a new instance of [`CrossSafetyCheckerJob`]. + pub fn new( + chain_id: ChainId, + provider: Arc

, + cancel_token: CancellationToken, + interval: Duration, + promoter: L, + event_tx: mpsc::Sender, + ) -> Self { + Self { + chain_id, + provider, + cancel_token, + interval, + promoter, + event_tx, + test_run: Mutex::new(true), + } + } + /// Runs the job loop until cancelled, promoting blocks by Promoter /// /// On each iteration: @@ -105,6 +130,22 @@ where // TODO: Add more checks in future + // test the invalidate block + let mut test_run = self.test_run.lock().unwrap(); + if *test_run && candidate.number == 20 && self.promoter.target_level() == SafetyLevel::CrossSafe { + let event = ChainEvent::InvalidateBlock { block: candidate.clone() }; + if let Err(err) = self.event_tx.try_send(event) { + error!( + target: "safety_checker", + target_level = %self.promoter.target_level(), + %err, + "Failed to broadcast cross head update event", + ); + } + *test_run = false; + return Ok(candidate); + } + let event = self.promoter.update_and_emit_event(&*self.provider, self.chain_id, &candidate)?; self.broadcast_event(event); diff --git a/crates/supervisor/types/src/types.rs b/crates/supervisor/types/src/types.rs index cb5ddccbf3..44ee01b9ce 100644 --- a/crates/supervisor/types/src/types.rs +++ b/crates/supervisor/types/src/types.rs @@ -4,7 +4,7 @@ //! and the op-node components in the rollup system. It includes block references, //! block seals, derivation events, and event notifications. -use alloy_primitives::{B256, U64}; +use alloy_primitives::B256; use kona_interop::ManagedEvent; use serde::{Deserialize, Serialize}; @@ -18,14 +18,14 @@ pub struct BlockSeal { /// The block's hash pub hash: B256, /// The block number - pub number: U64, + pub number: u64, /// The block's timestamp - pub timestamp: U64, + pub timestamp: u64, } impl BlockSeal { /// Creates a new [`BlockSeal`] with the given hash, number, and timestamp. - pub const fn new(hash: B256, number: U64, timestamp: U64) -> Self { + pub const fn new(hash: B256, number: u64, timestamp: u64) -> Self { Self { hash, number, timestamp } } } From ff56763e700e26611e6e6e049706e98ffb7ab4dc Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Mon, 28 Jul 2025 06:26:25 +0600 Subject: [PATCH 05/13] updated processing safe event --- .../core/src/chain_processor/task.rs | 188 ++++++++++++++---- .../supervisor/core/src/logindexer/indexer.rs | 2 +- crates/supervisor/storage/src/chaindb.rs | 2 +- 3 files changed, 150 insertions(+), 42 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 1d0e6ad244..1005626e57 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -335,60 +335,101 @@ where ); if self.rollup_config.is_post_interop(derived_ref_pair.derived.timestamp) { - match self.state_manager.save_derived_block(derived_ref_pair) { - Ok(_) => return Ok(derived_ref_pair.derived), - Err(StorageError::BlockOutOfOrder) => { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = derived_ref_pair.derived.number, - "Block out of order detected, resetting managed node" - ); + return self.process_safe_derived_block(derived_ref_pair).await + } - if let Err(err) = self.managed_node.reset().await { - error!( - target: "chain_processor", - chain_id = self.chain_id, - %err, - "Failed to reset managed node after block out of order" - ); - } - return Err(StorageError::BlockOutOfOrder.into()); - } + if self.rollup_config.is_interop_activation_block(derived_ref_pair.derived) { + info!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + "Initialising derivation storage for interop activation block" + ); + self.state_manager.initialise_derivation_storage(derived_ref_pair)?; + return Ok(derived_ref_pair.derived); + } - Err(StorageError::ReorgRequired) => { - error!( - target: "chain_processor", - chain = self.chain_id, - derived_block = %derived_ref_pair.derived, - "Local derivation conflict detected — rewinding" - ); - self.rewinder.handle_local_reorg(&derived_ref_pair)?; - } + Ok(derived_ref_pair.derived) + } + + async fn process_safe_derived_block( + &self, + derived_ref_pair: DerivedRefPair, + ) -> Result { + match self.state_manager.save_derived_block(derived_ref_pair) { + Ok(_) => Ok(derived_ref_pair.derived), + Err(StorageError::BlockOutOfOrder) => { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + "Block out of order detected, resetting managed node" + ); - Err(err) => { + if let Err(err) = self.managed_node.reset().await { error!( target: "chain_processor", chain_id = self.chain_id, - block_number = derived_ref_pair.derived.number, %err, - "Failed to save derived block pair" + "Failed to reset managed node after block out of order" ); - return Err(err.into()); } + Err(StorageError::BlockOutOfOrder.into()) } - } - if self.rollup_config.is_interop_activation_block(derived_ref_pair.derived) { - info!( + Err(StorageError::ReorgRequired) => { + debug!( + target: "chain_processor", + chain = self.chain_id, + derived_block = %derived_ref_pair.derived, + "Local derivation conflict detected — rewinding" + ); + self.rewinder.handle_local_reorg(&derived_ref_pair)?; + Ok(self.retry_with_resync_derived_block(derived_ref_pair).await?) + } + + Err(StorageError::FutureData) => { + Ok(self.retry_with_resync_derived_block(derived_ref_pair).await?) + } + + Err(err) => { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + %err, + "Failed to save derived block pair" + ); + Err(err.into()) + } + } + } + async fn retry_with_resync_derived_block( + &self, + derived_ref_pair: DerivedRefPair, + ) -> Result { + self.log_indexer + .clone() + .process_and_store_logs(&derived_ref_pair.derived) + .await + .inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + %err, + "Error resyncing logs for derived block" + ); + })?; + self.state_manager.save_derived_block(derived_ref_pair).inspect_err(|err| { + error!( target: "chain_processor", chain_id = self.chain_id, block_number = derived_ref_pair.derived.number, - "Initialising derivation storage for interop activation block" + %err, + "Error saving derived block after resync" ); - self.state_manager.initialise_derivation_storage(derived_ref_pair)?; - return Ok(derived_ref_pair.derived); - } + })?; Ok(derived_ref_pair.derived) } @@ -961,7 +1002,7 @@ mod tests { }; let mut mockdb = MockDb::new(); - let mocknode = MockNode::new(); + let mut mocknode = MockNode::new(); // Simulate ReorgRequired error mockdb @@ -980,6 +1021,73 @@ mod tests { // Expect reorg on log storage mockdb.expect_rewind_log_storage().returning(|_block_id| Ok(())); + mockdb.expect_store_block_logs().returning(|_block_id, _logs| Ok(())); + + mocknode.expect_fetch_receipts().returning(|_receipts| Ok(Receipts::default())); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + + let cancel_token = CancellationToken::new(); + let (tx, rx) = mpsc::channel(10); + + let rollup_config = get_rollup_config(1000); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + tx.send(ChainEvent::DerivedBlock { derived_ref_pair: block_pair }).await.unwrap(); + + let task_handle = tokio::spawn(task.run()); + + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + cancel_token.cancel(); + task_handle.await.unwrap(); + } + + #[tokio::test] + async fn test_handle_derived_event_block_triggers_resync() { + let block_pair = DerivedRefPair { + source: BlockInfo { + number: 123, + hash: B256::ZERO, + parent_hash: B256::ZERO, + timestamp: 0, + }, + derived: BlockInfo { + number: 1234, + hash: B256::ZERO, + parent_hash: B256::ZERO, + timestamp: 1003, // post-interop + }, + }; + + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + + // Simulate ReorgRequired error + mockdb + .expect_save_derived_block() + .returning(move |_pair: DerivedRefPair| Err(StorageError::FutureData)); + + mockdb.expect_get_block().returning(move |num| { + Ok(BlockInfo { + number: num, + hash: B256::random(), // different hash from safe derived block + parent_hash: B256::ZERO, + timestamp: 1003, // post-interop + }) + }); + + mockdb.expect_store_block_logs().returning(|_block_id, _logs| Ok(())); + + mocknode.expect_fetch_receipts().returning(|_receipts| Ok(Receipts::default())); + let writer = Arc::new(mockdb); let managed_node = Arc::new(mocknode); diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index 081eb89ab6..d97564b680 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -100,7 +100,7 @@ where /// /// # Arguments /// - `block`: Metadata about the block being processed. - async fn process_and_store_logs(&self, block: &BlockInfo) -> Result<(), LogIndexerError> { + pub async fn process_and_store_logs(&self, block: &BlockInfo) -> Result<(), LogIndexerError> { let receipts = self.block_provider.fetch_receipts(block.hash).await?; let mut log_entries = Vec::with_capacity(receipts.len()); let mut log_index: u32 = 0; diff --git a/crates/supervisor/storage/src/chaindb.rs b/crates/supervisor/storage/src/chaindb.rs index b172e9ba29..782ee6303a 100644 --- a/crates/supervisor/storage/src/chaindb.rs +++ b/crates/supervisor/storage/src/chaindb.rs @@ -135,7 +135,7 @@ impl DerivationStorageWriter for ChainDb { incoming_block = %derived_block, "Derived block not found in log storage: {derived_block:?}" ); - StorageError::ConflictError + StorageError::FutureData } other => other, // propagate other errors as-is })?; From 56e647bdc87c5e5d3213a33d5ea88d466a8d61fd Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Mon, 28 Jul 2025 06:34:03 +0600 Subject: [PATCH 06/13] removed redundant ok --- crates/supervisor/core/src/chain_processor/task.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 1005626e57..61519f51a3 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -385,11 +385,11 @@ where "Local derivation conflict detected — rewinding" ); self.rewinder.handle_local_reorg(&derived_ref_pair)?; - Ok(self.retry_with_resync_derived_block(derived_ref_pair).await?) + self.retry_with_resync_derived_block(derived_ref_pair).await } Err(StorageError::FutureData) => { - Ok(self.retry_with_resync_derived_block(derived_ref_pair).await?) + self.retry_with_resync_derived_block(derived_ref_pair).await } Err(err) => { From 35f0b9030bcb8d8c65cb61c0f6cc6ca11077e183 Mon Sep 17 00:00:00 2001 From: sadiq1971 Date: Mon, 28 Jul 2025 06:36:13 +0600 Subject: [PATCH 07/13] removed test --- tests/supervisor/reorgs/init_test.go | 13 -- tests/supervisor/reorgs/unsafe_head_test.go | 133 -------------------- 2 files changed, 146 deletions(-) delete mode 100644 tests/supervisor/reorgs/init_test.go delete mode 100644 tests/supervisor/reorgs/unsafe_head_test.go diff --git a/tests/supervisor/reorgs/init_test.go b/tests/supervisor/reorgs/init_test.go deleted file mode 100644 index df1ce435a1..0000000000 --- a/tests/supervisor/reorgs/init_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package reorgs - -import ( - "testing" - - "github.com/ethereum-optimism/optimism/op-devstack/presets" -) - -// TestMain creates the test-setups against the shared backend -func TestMain(m *testing.M) { - // Other setups may be added here, hydrated from the same orchestrator - presets.DoMain(m, presets.WithSimpleInterop()) -} diff --git a/tests/supervisor/reorgs/unsafe_head_test.go b/tests/supervisor/reorgs/unsafe_head_test.go deleted file mode 100644 index bcd7883530..0000000000 --- a/tests/supervisor/reorgs/unsafe_head_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package reorgs - -import ( - "testing" - "time" - - "github.com/ethereum-optimism/optimism/op-devstack/devtest" - "github.com/ethereum-optimism/optimism/op-devstack/presets" - "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum-optimism/optimism/op-service/txplan" - "github.com/ethereum-optimism/optimism/op-test-sequencer/sequencer/seqtypes" - "github.com/stretchr/testify/require" -) - -// TestReorgUnsafeHead starts an interop chain with an op-test-sequencer, which takes control over sequencing the L2 chain and introduces a reorg on the unsafe head -func TestReorgUnsafeHead(gt *testing.T) { - t := devtest.SerialT(gt) - ctx := t.Ctx() - - sys := presets.NewSimpleInterop(t) - l := sys.Log - - ia := sys.TestSequencer.Escape().ControlAPI(sys.L2ChainA.ChainID()) - - // stop batcher on chain A - sys.L2BatcherA.Stop() - - // two EOAs for a sample transfer tx used later in a conflicting block - alice := sys.FunderA.NewFundedEOA(eth.OneHundredthEther) - bob := sys.Wallet.NewEOA(sys.L2ELA) - - sys.L1Network.WaitForBlock() - - sys.L2ChainA.WaitForBlock() - // waiting for two blocks in order to make sure we are not jumping ahead of a L1 origin (i.e. can't build a chain with L1Origin gaps) - sys.L2ChainA.WaitForBlock() - sys.L2ChainA.WaitForBlock() - - unsafeHead := sys.L2CLA.StopSequencer() - - var divergenceBlockNumber_A uint64 - var originalRef_A eth.L2BlockRef - // prepare and sequence a conflicting block for the L2A chain - { - unsafeHeadRef := sys.L2ELA.BlockRefByLabel(eth.Unsafe) - - l.Info("Current unsafe ref", "unsafeHead", unsafeHead, "parent", unsafeHeadRef.ParentID().Hash, "l1_origin", unsafeHeadRef.L1Origin) - - l.Info("Expect to reorg the chain on current unsafe block", "number", unsafeHeadRef.Number, "head", unsafeHead, "parent", unsafeHeadRef.ParentID().Hash) - divergenceBlockNumber_A = unsafeHeadRef.Number - originalRef_A = unsafeHeadRef - - parentOfUnsafeHead := unsafeHeadRef.ParentID() - - l.Info("Sequencing a conflicting block", "unsafeHead", unsafeHeadRef, "parent", parentOfUnsafeHead) - - // sequence a conflicting block with a simple transfer tx, based on the parent of the parent of the unsafe head - { - err := ia.New(ctx, seqtypes.BuildOpts{ - Parent: parentOfUnsafeHead.Hash, - L1Origin: nil, - }) - require.NoError(t, err, "Expected to be able to create a new block job for sequencing on op-test-sequencer, but got error") - - // include simple transfer tx in opened block - { - to := alice.PlanTransfer(bob.Address(), eth.OneGWei) - opt := txplan.Combine(to) - ptx := txplan.NewPlannedTx(opt) - signed_tx, err := ptx.Signed.Eval(ctx) - require.NoError(t, err, "Expected to be able to evaluate a planned transaction on op-test-sequencer, but got error") - txdata, err := signed_tx.MarshalBinary() - require.NoError(t, err, "Expected to be able to marshal a signed transaction on op-test-sequencer, but got error") - - err = ia.IncludeTx(ctx, txdata) - require.NoError(t, err, "Expected to be able to include a signed transaction on op-test-sequencer, but got error") - } - - err = ia.Next(ctx) - require.NoError(t, err, "Expected to be able to call Next() after New() on op-test-sequencer, but got error") - } - } - - // start batcher on chain A - sys.L2BatcherA.Start() - - // sequence a second block with op-test-sequencer (no L1 origin override) - { - l.Info("Sequencing with op-test-sequencer (no L1 origin override)") - err := ia.New(ctx, seqtypes.BuildOpts{ - Parent: sys.L2ELA.BlockRefByLabel(eth.Unsafe).Hash, - L1Origin: nil, - }) - require.NoError(t, err, "Expected to be able to create a new block job for sequencing on op-test-sequencer, but got error") - time.Sleep(2 * time.Second) - - err = ia.Next(ctx) - require.NoError(t, err, "Expected to be able to call Next() after New() on op-test-sequencer, but got error") - time.Sleep(2 * time.Second) - } - - // continue sequencing with consensus node (op-node) - sys.L2CLA.StartSequencer() - - sys.L2ChainA.WaitForBlock() - - reorgedRef_A, err := sys.L2ELA.Escape().EthClient().BlockRefByNumber(ctx, divergenceBlockNumber_A) - require.NoError(t, err, "Expected to be able to call BlockRefByNumber API, but got error") - - l.Info("Reorged chain A on divergence block number (prior the reorg)", "number", divergenceBlockNumber_A, "head", originalRef_A.Hash, "parent", originalRef_A.ParentID().Hash) - l.Info("Reorged chain A on divergence block number (after the reorg)", "number", divergenceBlockNumber_A, "head", reorgedRef_A.Hash, "parent", reorgedRef_A.ParentID().Hash) - require.NotEqual(t, originalRef_A.Hash, reorgedRef_A.Hash, "Expected to get different heads on divergence block number, but got the same hash, so no reorg happened on chain A") - require.Equal(t, originalRef_A.ParentID().Hash, reorgedRef_A.ParentHash, "Expected to get same parent hashes on divergence block number, but got different hashes") - - err = wait.For(ctx, 5*time.Second, func() (bool, error) { - safeL2Head_A_supervisor := sys.Supervisor.SafeBlockID(sys.L2ChainA.ChainID()).Hash - safeL2Head_A_sequencer := sys.L2CLA.SafeL2BlockRef() - - if safeL2Head_A_sequencer.Number <= divergenceBlockNumber_A { - l.Info("Safe ref number is still behind divergence block number", "divergence", divergenceBlockNumber_A, "safe", safeL2Head_A_sequencer.Number) - return false, nil - } - if safeL2Head_A_sequencer.Hash.Cmp(safeL2Head_A_supervisor) != 0 { - l.Info("Safe ref still not the same on supervisor and sequencer", "supervisor", safeL2Head_A_supervisor, "sequencer", safeL2Head_A_sequencer.Hash) - return false, nil - } - l.Info("Safe ref is the same on both supervisor and sequencer", "supervisor", safeL2Head_A_supervisor, "sequencer", safeL2Head_A_sequencer.Hash) - - return true, nil - }) - require.NoError(t, err, "Expected to get same safe ref on both supervisor and sequencer eventually") -} From 6d92a35ec1388041d73dea670ab52f48137b271a Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Mon, 28 Jul 2025 13:57:47 +0530 Subject: [PATCH 08/13] refactor + removed test code --- .../core/src/chain_processor/chain.rs | 11 ++- .../core/src/chain_processor/task.rs | 76 +++---------------- .../core/src/safety_checker/task.rs | 22 +----- 3 files changed, 17 insertions(+), 92 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index 03044fd016..464c7b5845 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -2,8 +2,8 @@ use super::{ChainProcessorError, ChainProcessorTask}; use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_supervisor_storage::{ - DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, - LogStorageWriter, StorageRewinder, + DerivationStorage, DerivationStorageWriter, HeadRefStorageWriter, LogStorage, + StorageRewinder, }; use std::sync::Arc; use tokio::{ @@ -47,9 +47,8 @@ pub struct ChainProcessor { impl ChainProcessor where P: ManagedNodeProvider + 'static, - W: LogStorageWriter - + LogStorageReader - + DerivationStorageReader + W: LogStorage + + DerivationStorage + DerivationStorageWriter + HeadRefStorageWriter + StorageRewinder @@ -143,7 +142,7 @@ mod tests { use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError, }; use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 90ff832f59..a94810587d 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -7,14 +7,14 @@ use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, - LogStorageWriter, StorageError, StorageRewinder, + DerivationStorage, HeadRefStorageWriter, + LogStorage, StorageError, StorageRewinder, }; use kona_supervisor_types::BlockSeal; use std::{fmt::Debug, sync::Arc}; use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, info}; /// Represents a task that processes chain events from a managed node. /// It listens for events emitted by the managed node and handles them accordingly. @@ -43,10 +43,8 @@ pub struct ChainProcessorTask { impl ChainProcessorTask where P: ManagedNodeProvider + 'static, - W: LogStorageWriter - + LogStorageReader - + DerivationStorageWriter - + DerivationStorageReader + W: LogStorage + + DerivationStorage + HeadRefStorageWriter + StorageRewinder + 'static, @@ -299,7 +297,9 @@ where debug!( target: "chain_processor", chain_id = self.chain_id, - "Invalidated block matches replacement block, skipping" + invalidated_block = %invalidated_ref_pair.derived, + replacement_block = %replacement.replacement, + "Invalidated block matches replacement block, processing replacement" ); *guard = None; @@ -309,34 +309,10 @@ where derived: replacement.replacement, }; - // todo: index logs if needed - // hardcoding logs for now - self.state_manager.store_block_logs(&replacement.replacement, Vec::new()).inspect_err(|err| { - error!( - target: "chain_processor", - chain_id = self.chain_id, - %err, - "Failed to store logs for derived block on replacement" - ); - })?; - self.state_manager.save_derived_block(derived_ref_pair).inspect_err(|err| { - error!( - target: "chain_processor", - chain_id = self.chain_id, - %err, - "Failed to save derived block after replacement" - ); - })?; + self.retry_with_resync_derived_block(derived_ref_pair).await?; return Ok(()); } - } else { - warn!( - target: "chain_processor", - chain_id = self.chain_id, - "No invalidated block found, but block replacement event received" - ); } - Ok(()) } @@ -359,30 +335,12 @@ where return Ok(()); } - // todo: handle error if block is not found or conflict error let source_block = self.state_manager.derived_to_source(block.id())?; // rewind the storage to the block before the invalidated block let to = block.id(); self.state_manager.rewind(&to)?; - // log latest derivation and log state for debugging - let latest_derivation_state = self.state_manager.latest_derivation_state()?; - info!( - target: "chain_processor", - chain_id = self.chain_id, - latest_derivation_state = ?latest_derivation_state, - "Latest derivation state after rewinding storage" - ); - - let latest_log_state = self.state_manager.get_latest_block()?; - info!( - target: "chain_processor", - chain_id = self.chain_id, - latest_log_state = ?latest_log_state, - "Latest log state after rewinding storage" - ); - let block_seal = BlockSeal::new(block.hash, block.number, block.timestamp); self.managed_node.invalidate_block(block_seal).await.inspect_err(|err| { error!( @@ -490,19 +448,6 @@ where "Latest derivation state in handle_safe_event" ); - // // for testing purpose - // // trigger handle_invalidate with block 15 at block 20 - // if derived_ref_pair.derived.number == 20 { - // info!( - // target: "chain_processor", - // chain_id = self.chain_id, - // block_number = derived_ref_pair.derived.number, - // "Triggering handle_invalidate for block 15 at block 20" - // ); - // let block_15 = self.state_manager.get_block(15)?; - // let _ = self.handle_invalidate_block(block_15).await; - // } - if self.rollup_config.is_post_interop(derived_ref_pair.derived.timestamp) { return self.process_safe_derived_block(derived_ref_pair).await } @@ -573,6 +518,7 @@ where } } } + async fn retry_with_resync_derived_block( &self, derived_ref_pair: DerivedRefPair, @@ -694,7 +640,7 @@ mod tests { use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError, }; use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; diff --git a/crates/supervisor/core/src/safety_checker/task.rs b/crates/supervisor/core/src/safety_checker/task.rs index 44a1b6c276..1f744c308f 100644 --- a/crates/supervisor/core/src/safety_checker/task.rs +++ b/crates/supervisor/core/src/safety_checker/task.rs @@ -4,9 +4,8 @@ use crate::{ use alloy_primitives::ChainId; use kona_protocol::BlockInfo; use kona_supervisor_storage::{CrossChainSafetyProvider, StorageError}; -use op_alloy_consensus::interop::SafetyLevel; use std::{ - sync::{Arc, Mutex}, + sync::Arc, time::Duration, }; use tokio::sync::mpsc; @@ -26,8 +25,6 @@ pub struct CrossSafetyCheckerJob { promoter: L, event_tx: mpsc::Sender, config: Arc, - - test_run: Mutex, } impl CrossSafetyCheckerJob @@ -53,7 +50,6 @@ where promoter, event_tx, config, - test_run: Mutex::new(true), } } @@ -130,22 +126,6 @@ where // TODO: Add more checks in future - // test the invalidate block - let mut test_run = self.test_run.lock().unwrap(); - if *test_run && candidate.number == 20 && self.promoter.target_level() == SafetyLevel::CrossSafe { - let event = ChainEvent::InvalidateBlock { block: candidate.clone() }; - if let Err(err) = self.event_tx.try_send(event) { - error!( - target: "safety_checker", - target_level = %self.promoter.target_level(), - %err, - "Failed to broadcast cross head update event", - ); - } - *test_run = false; - return Ok(candidate); - } - let event = self.promoter.update_and_emit_event(&*self.provider, self.chain_id, &candidate)?; self.broadcast_event(event); From c3031e09e3b3baf8b8d8f7bd5474700e460d5601 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Mon, 28 Jul 2025 14:38:15 +0530 Subject: [PATCH 09/13] revert changes --- .../core/src/chain_processor/task.rs | 8 ----- .../core/src/safety_checker/task.rs | 34 ++++--------------- 2 files changed, 7 insertions(+), 35 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index a94810587d..82869727f2 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -440,14 +440,6 @@ where return Ok(derived_ref_pair.derived); } - let latest_derivation_state = self.state_manager.latest_derivation_state()?; - info!( - target: "chain_processor", - chain_id = self.chain_id, - latest_derivation_state = ?latest_derivation_state, - "Latest derivation state in handle_safe_event" - ); - if self.rollup_config.is_post_interop(derived_ref_pair.derived.timestamp) { return self.process_safe_derived_block(derived_ref_pair).await } diff --git a/crates/supervisor/core/src/safety_checker/task.rs b/crates/supervisor/core/src/safety_checker/task.rs index 1f744c308f..7fa8c199e3 100644 --- a/crates/supervisor/core/src/safety_checker/task.rs +++ b/crates/supervisor/core/src/safety_checker/task.rs @@ -1,13 +1,14 @@ use crate::{ - config::Config, event::ChainEvent, safety_checker::{traits::SafetyPromoter, CrossSafetyChecker}, CrossSafetyError + CrossSafetyError, + config::Config, + event::ChainEvent, + safety_checker::{traits::SafetyPromoter, CrossSafetyChecker}, }; use alloy_primitives::ChainId; +use derive_more::Constructor; use kona_protocol::BlockInfo; use kona_supervisor_storage::{CrossChainSafetyProvider, StorageError}; -use std::{ - sync::Arc, - time::Duration, -}; +use std::{sync::Arc, time::Duration }; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; @@ -16,7 +17,7 @@ use tracing::{error, info, warn}; /// /// It uses [`CrossChainSafetyProvider`] to fetch candidate blocks and the [`CrossSafetyChecker`] /// to validate cross-chain message dependencies. -#[derive(Debug)] +#[derive(Debug, Constructor)] pub struct CrossSafetyCheckerJob { chain_id: ChainId, provider: Arc

, @@ -32,27 +33,6 @@ where P: CrossChainSafetyProvider + Send + Sync + 'static, L: SafetyPromoter, { - /// Creates a new instance of [`CrossSafetyCheckerJob`]. - pub fn new( - chain_id: ChainId, - provider: Arc

, - cancel_token: CancellationToken, - interval: Duration, - promoter: L, - event_tx: mpsc::Sender, - config: Arc, - ) -> Self { - Self { - chain_id, - provider, - cancel_token, - interval, - promoter, - event_tx, - config, - } - } - /// Runs the job loop until cancelled, promoting blocks by Promoter /// /// On each iteration: From 4de229f9a051b2140a58b880d22c0799381b7fa8 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Mon, 28 Jul 2025 14:44:34 +0530 Subject: [PATCH 10/13] lint fixes --- crates/supervisor/core/src/chain_processor/chain.rs | 6 +++--- crates/supervisor/core/src/chain_processor/task.rs | 12 ++++-------- crates/supervisor/core/src/safety_checker/task.rs | 8 ++++---- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index 464c7b5845..d7123ff014 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -2,8 +2,7 @@ use super::{ChainProcessorError, ChainProcessorTask}; use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_supervisor_storage::{ - DerivationStorage, DerivationStorageWriter, HeadRefStorageWriter, LogStorage, - StorageRewinder, + DerivationStorage, DerivationStorageWriter, HeadRefStorageWriter, LogStorage, StorageRewinder, }; use std::sync::Arc; use tokio::{ @@ -142,7 +141,8 @@ mod tests { use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, + LogStorageWriter, StorageError, }; use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 82869727f2..f2372933a1 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -7,8 +7,7 @@ use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorage, HeadRefStorageWriter, - LogStorage, StorageError, StorageRewinder, + DerivationStorage, HeadRefStorageWriter, LogStorage, StorageError, StorageRewinder, }; use kona_supervisor_types::BlockSeal; use std::{fmt::Debug, sync::Arc}; @@ -43,11 +42,7 @@ pub struct ChainProcessorTask { impl ChainProcessorTask where P: ManagedNodeProvider + 'static, - W: LogStorage - + DerivationStorage - + HeadRefStorageWriter - + StorageRewinder - + 'static, + W: LogStorage + DerivationStorage + HeadRefStorageWriter + StorageRewinder + 'static, { /// Creates a new [`ChainProcessorTask`]. pub fn new( @@ -632,7 +627,8 @@ mod tests { use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, + LogStorageWriter, StorageError, }; use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; diff --git a/crates/supervisor/core/src/safety_checker/task.rs b/crates/supervisor/core/src/safety_checker/task.rs index 7fa8c199e3..ec79b3b3de 100644 --- a/crates/supervisor/core/src/safety_checker/task.rs +++ b/crates/supervisor/core/src/safety_checker/task.rs @@ -1,14 +1,14 @@ use crate::{ CrossSafetyError, - config::Config, - event::ChainEvent, - safety_checker::{traits::SafetyPromoter, CrossSafetyChecker}, + config::Config, + event::ChainEvent, + safety_checker::{CrossSafetyChecker, traits::SafetyPromoter}, }; use alloy_primitives::ChainId; use derive_more::Constructor; use kona_protocol::BlockInfo; use kona_supervisor_storage::{CrossChainSafetyProvider, StorageError}; -use std::{sync::Arc, time::Duration }; +use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; From 6a4dc75a8b5615d648f4762fab037e4d1b90d48d Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Mon, 28 Jul 2025 16:02:02 +0530 Subject: [PATCH 11/13] test cases added --- .../core/src/chain_processor/task.rs | 282 +++++++++++++++++- 1 file changed, 280 insertions(+), 2 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index f2372933a1..70ea4b6c77 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -617,8 +617,8 @@ mod tests { config::Genesis, event::ChainEvent, syncnode::{ - BlockProvider, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, - NodeSubscriber, + AuthenticationError, BlockProvider, ClientError, ManagedNodeController, + ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber, }, }; use alloy_primitives::B256; @@ -1515,4 +1515,282 @@ mod tests { cancel_token.cancel(); task_handle.await.unwrap(); } + + #[tokio::test] + async fn test_handle_invalidate_block_already_set_skips() { + let mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + // Set up state: invalidated_block is already set + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + { + let mut guard = task.invalidated_block.write().await; + *guard = Some(DerivedRefPair { source: block, derived: block }); + } + + let result = task.handle_invalidate_block(block).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_derived_to_source_error() { + let mut mockdb = MockDb::new(); + let mocknode = MockNode::new(); + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + mockdb.expect_derived_to_source().returning(move |_id| Err(StorageError::FutureData)); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(block).await; + assert!(matches!(result, Err(ChainProcessorError::StorageError(StorageError::FutureData)))); + + // make sure invalidated_block is not set + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_rewind_error() { + let mut mockdb = MockDb::new(); + let mocknode = MockNode::new(); + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + mockdb.expect_derived_to_source().returning(move |_id| Ok(block)); + mockdb.expect_rewind().returning(move |_to| Err(StorageError::DatabaseNotInitialised)); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(block).await; + assert!(matches!( + result, + Err(ChainProcessorError::StorageError(StorageError::DatabaseNotInitialised)) + )); + + // make sure invalidated_block is not set + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_managed_node_error() { + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + mockdb.expect_derived_to_source().returning(move |_id| Ok(block)); + mockdb.expect_rewind().returning(move |_to| Ok(())); + mocknode.expect_invalidate_block().returning(move |_seal| { + Err(ManagedNodeError::ClientError(ClientError::Authentication( + AuthenticationError::InvalidHeader, + ))) + }); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(block).await; + assert!(matches!(result, Err(ChainProcessorError::ManagedNode(_)))); + + // make sure invalidated_block is not set + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_success_sets_invalidated() { + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + let derived_block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + let source_block = BlockInfo::new(B256::from([2u8; 32]), 41, B256::ZERO, 12344); + + mockdb.expect_derived_to_source().returning(move |_id| Ok(source_block)); + mockdb.expect_rewind().returning(move |_to| Ok(())); + mocknode.expect_invalidate_block().returning(move |_seal| Ok(())); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(derived_block).await; + assert!(result.is_ok()); + + // make sure invalidated_block is set + let guard = task.invalidated_block.read().await; + let pair = guard.as_ref().expect("invalidated_block should be set"); + assert_eq!(pair.derived, derived_block); + assert_eq!(pair.source, source_block); + } + + #[tokio::test] + async fn test_handle_block_replacement_no_invalidated_block() { + let mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + let replacement = BlockReplacement { + invalidated: B256::from([1u8; 32]), + replacement: BlockInfo::new(B256::from([2u8; 32]), 43, B256::ZERO, 12346), + }; + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_block_replacement(replacement).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_handle_block_replacement_invalidated_hash_mismatch() { + let mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + let invalidated_block = BlockInfo::new(B256::from([3u8; 32]), 42, B256::ZERO, 12345); + let replacement = BlockReplacement { + invalidated: B256::from([1u8; 32]), // does not match invalidated_block.hash + replacement: BlockInfo::new(B256::from([2u8; 32]), 43, B256::ZERO, 12346), + }; + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + { + let mut guard = task.invalidated_block.write().await; + *guard = Some(DerivedRefPair { source: invalidated_block, derived: invalidated_block }); + } + + let result = task.handle_block_replacement(replacement).await; + assert!(result.is_ok()); + + // invalidated_block should remain set + let guard = task.invalidated_block.read().await; + assert!(guard.is_some()); + } + + #[tokio::test] + async fn test_handle_block_replacement_success() { + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + + let source_block = BlockInfo::new(B256::from([1u8; 32]), 45, B256::ZERO, 12345); + let invalidated_block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + let replacement_block = BlockInfo::new(B256::from([2u8; 32]), 42, B256::ZERO, 12346); + + mockdb.expect_save_derived_block().returning(move |_pair| Ok(())); + mockdb.expect_store_block_logs().returning(move |_block, _logs| Ok(())); + + mocknode.expect_fetch_receipts().returning(move |_block_hash| { + assert_eq!(_block_hash, replacement_block.hash); + Ok(Receipts::default()) + }); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + { + let mut guard = task.invalidated_block.write().await; + *guard = Some(DerivedRefPair { source: source_block, derived: invalidated_block }); + } + + let replacement = BlockReplacement { + invalidated: invalidated_block.hash, + replacement: replacement_block, + }; + + let result = task.handle_block_replacement(replacement).await; + assert!(result.is_ok()); + + // invalidated_block should be cleared + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } } From 6cebde4cbeb436890a7a830ba83d8fe1fe35cf96 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Mon, 28 Jul 2025 17:58:27 +0530 Subject: [PATCH 12/13] review fixes --- crates/supervisor/core/src/chain_processor/task.rs | 10 +--------- crates/supervisor/core/src/event/chain.rs | 2 +- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 70ea4b6c77..ff855279ca 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -337,15 +337,7 @@ where self.state_manager.rewind(&to)?; let block_seal = BlockSeal::new(block.hash, block.number, block.timestamp); - self.managed_node.invalidate_block(block_seal).await.inspect_err(|err| { - error!( - target: "chain_processor", - chain_id = self.chain_id, - block_number = block.number, - %err, - "Failed to invalidate block" - ); - })?; + self.managed_node.invalidate_block(block_seal).await?; *invalidated_block = Some(DerivedRefPair { source: source_block, derived: block }); Ok(()) diff --git a/crates/supervisor/core/src/event/chain.rs b/crates/supervisor/core/src/event/chain.rs index 7b9e102e09..d50fc78077 100644 --- a/crates/supervisor/core/src/event/chain.rs +++ b/crates/supervisor/core/src/event/chain.rs @@ -26,7 +26,7 @@ pub enum ChainEvent { origin: BlockInfo, }, - /// A invalidateBlock event, indicating that a block has been invalidated. + /// An invalidateBlock event, indicating that a block has been invalidated. InvalidateBlock { /// The [`BlockInfo`] of the block that has been invalidated. block: BlockInfo, From c1dc1a98719da3d80abdc29ae89947b1abf67330 Mon Sep 17 00:00:00 2001 From: Arun Dhyani Date: Mon, 28 Jul 2025 23:35:01 +0530 Subject: [PATCH 13/13] logging refactor --- .../core/src/chain_processor/task.rs | 35 +++++++++---------- crates/supervisor/core/src/event/chain.rs | 2 +- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index ff855279ca..ca09072c37 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -13,7 +13,7 @@ use kona_supervisor_types::BlockSeal; use std::{fmt::Debug, sync::Arc}; use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; /// Represents a task that processes chain events from a managed node. /// It listens for events emitted by the managed node and handles them accordingly. @@ -277,8 +277,7 @@ where &self, replacement: BlockReplacement, ) -> Result<(), ChainProcessorError> { - // Logic to handle block replacement - info!( + debug!( target: "chain_processor", chain_id = self.chain_id, %replacement, @@ -288,31 +287,31 @@ where let mut guard = self.invalidated_block.write().await; // check if invalidated block is same as replacement block if let Some(invalidated_ref_pair) = *guard { - if invalidated_ref_pair.derived.hash == replacement.invalidated { + if invalidated_ref_pair.derived.hash != replacement.invalidated { debug!( target: "chain_processor", chain_id = self.chain_id, invalidated_block = %invalidated_ref_pair.derived, replacement_block = %replacement.replacement, - "Invalidated block matches replacement block, processing replacement" + "Replacement block does not match invalidated block, skipping" ); - - *guard = None; - // save the derived block - let derived_ref_pair = DerivedRefPair { - source: invalidated_ref_pair.source, - derived: replacement.replacement, - }; - - self.retry_with_resync_derived_block(derived_ref_pair).await?; return Ok(()); } + + // save the derived block + let derived_ref_pair = DerivedRefPair { + source: invalidated_ref_pair.source, + derived: replacement.replacement, + }; + self.retry_with_resync_derived_block(derived_ref_pair).await?; + *guard = None; + return Ok(()); } Ok(()) } async fn handle_invalidate_block(&self, block: BlockInfo) -> Result<(), ChainProcessorError> { - info!( + debug!( target: "chain_processor", chain_id = self.chain_id, invalidated_block = %block, @@ -372,7 +371,7 @@ where let invalidated_block = self.invalidated_block.read().await; if invalidated_block.is_some() { - debug!( + trace!( target: "chain_processor", chain_id = self.chain_id, block_number = origin.number, @@ -418,7 +417,7 @@ where let invalidated_block = self.invalidated_block.read().await; if invalidated_block.is_some() { - debug!( + trace!( target: "chain_processor", chain_id = self.chain_id, block_number = derived_ref_pair.derived.number, @@ -541,7 +540,7 @@ where let invalidated_block = self.invalidated_block.read().await; if invalidated_block.is_some() { - debug!( + trace!( target: "chain_processor", chain_id = self.chain_id, block_number = block.number, diff --git a/crates/supervisor/core/src/event/chain.rs b/crates/supervisor/core/src/event/chain.rs index d50fc78077..69711ae20b 100644 --- a/crates/supervisor/core/src/event/chain.rs +++ b/crates/supervisor/core/src/event/chain.rs @@ -26,7 +26,7 @@ pub enum ChainEvent { origin: BlockInfo, }, - /// An invalidateBlock event, indicating that a block has been invalidated. + /// An invalidate Block event, indicating that a block has been invalidated. InvalidateBlock { /// The [`BlockInfo`] of the block that has been invalidated. block: BlockInfo,