diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index ca4218fa44..d7123ff014 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -2,8 +2,7 @@ use super::{ChainProcessorError, ChainProcessorTask}; use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, - StorageRewinder, + DerivationStorage, DerivationStorageWriter, HeadRefStorageWriter, LogStorage, StorageRewinder, }; use std::sync::Arc; use tokio::{ @@ -47,8 +46,8 @@ pub struct ChainProcessor { impl ChainProcessor where P: ManagedNodeProvider + 'static, - W: LogStorageWriter - + LogStorageReader + W: LogStorage + + DerivationStorage + DerivationStorageWriter + HeadRefStorageWriter + StorageRewinder @@ -142,7 +141,8 @@ mod tests { use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, + LogStorageWriter, StorageError, }; use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; @@ -226,13 +226,19 @@ mod tests { ) -> Result<(), StorageError>; } - impl LogStorageReader for Db { + impl LogStorageReader for Db { fn get_block(&self, block_number: u64) -> Result; fn get_latest_block(&self) -> Result; fn get_log(&self,block_number: u64,log_index: u32) -> Result; fn get_logs(&self, block_number: u64) -> Result, StorageError>; } + impl DerivationStorageReader for Db { + fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result; + fn latest_derived_block_at_source(&self, source_block_id: BlockNumHash) -> Result; + fn latest_derivation_state(&self) -> Result; + } + impl DerivationStorageWriter for Db { fn initialise_derivation_storage( &self, @@ -266,6 +272,7 @@ mod tests { block: &BlockInfo, ) -> Result; } + impl StorageRewinder for Db { fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError>; fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError>; diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index 61519f51a3..ca09072c37 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -7,13 +7,13 @@ use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, - StorageError, StorageRewinder, + DerivationStorage, HeadRefStorageWriter, LogStorage, StorageError, StorageRewinder, }; +use kona_supervisor_types::BlockSeal; use std::{fmt::Debug, sync::Arc}; -use tokio::sync::mpsc; +use tokio::sync::{RwLock, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, trace}; /// Represents a task that processes chain events from a managed node. /// It listens for events emitted by the managed node and handles them accordingly. @@ -35,17 +35,14 @@ pub struct ChainProcessorTask { /// The channel for receiving node events. event_rx: mpsc::Receiver, + + invalidated_block: RwLock>, } impl ChainProcessorTask where P: ManagedNodeProvider + 'static, - W: LogStorageWriter - + LogStorageReader - + DerivationStorageWriter - + HeadRefStorageWriter - + StorageRewinder - + 'static, + W: LogStorage + DerivationStorage + HeadRefStorageWriter + StorageRewinder + 'static, { /// Creates a new [`ChainProcessorTask`]. pub fn new( @@ -68,6 +65,7 @@ where event_rx, state_manager, log_indexer: Arc::from(log_indexer), + invalidated_block: RwLock::new(None), rewinder: Arc::from(rewinder), } } @@ -204,8 +202,19 @@ where ); }); } + ChainEvent::InvalidateBlock { block } => { + let _ = self.handle_invalidate_block(block).await.inspect_err(|err| { + error!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + %err, + "Failed to invalidate block" + ); + }); + } ChainEvent::BlockReplaced { replacement } => { - let _ = self.handle_block_replacement(replacement).inspect_err(|err| { + let _ = self.handle_block_replacement(replacement).await.inspect_err(|err| { error!( target: "chain_processor", chain_id = self.chain_id, @@ -264,12 +273,72 @@ where } } - #[allow(clippy::missing_const_for_fn)] - fn handle_block_replacement( + async fn handle_block_replacement( &self, - _replacement: BlockReplacement, + replacement: BlockReplacement, ) -> Result<(), ChainProcessorError> { - // Logic to handle block replacement + debug!( + target: "chain_processor", + chain_id = self.chain_id, + %replacement, + "Handling block replacement" + ); + + let mut guard = self.invalidated_block.write().await; + // check if invalidated block is same as replacement block + if let Some(invalidated_ref_pair) = *guard { + if invalidated_ref_pair.derived.hash != replacement.invalidated { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + invalidated_block = %invalidated_ref_pair.derived, + replacement_block = %replacement.replacement, + "Replacement block does not match invalidated block, skipping" + ); + return Ok(()); + } + + // save the derived block + let derived_ref_pair = DerivedRefPair { + source: invalidated_ref_pair.source, + derived: replacement.replacement, + }; + self.retry_with_resync_derived_block(derived_ref_pair).await?; + *guard = None; + return Ok(()); + } + Ok(()) + } + + async fn handle_invalidate_block(&self, block: BlockInfo) -> Result<(), ChainProcessorError> { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + invalidated_block = %block, + "Processing invalidate block" + ); + + let mut invalidated_block = self.invalidated_block.write().await; + if invalidated_block.is_some() { + debug!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + "Invalidated block already set, skipping" + ); + return Ok(()); + } + + let source_block = self.state_manager.derived_to_source(block.id())?; + + // rewind the storage to the block before the invalidated block + let to = block.id(); + self.state_manager.rewind(&to)?; + + let block_seal = BlockSeal::new(block.hash, block.number, block.timestamp); + self.managed_node.invalidate_block(block_seal).await?; + + *invalidated_block = Some(DerivedRefPair { source: source_block, derived: block }); Ok(()) } @@ -299,6 +368,18 @@ where block_number = origin.number, "Processing derivation origin update" ); + + let invalidated_block = self.invalidated_block.read().await; + if invalidated_block.is_some() { + trace!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = origin.number, + "Invalidated block set, skipping derivation origin update" + ); + return Ok(()); + } + match self.state_manager.save_source_block(origin) { Ok(_) => Ok(()), Err(StorageError::BlockOutOfOrder | StorageError::ConflictError) => { @@ -334,6 +415,17 @@ where "Processing local safe derived block pair" ); + let invalidated_block = self.invalidated_block.read().await; + if invalidated_block.is_some() { + trace!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = derived_ref_pair.derived.number, + "Invalidated block already set, skipping safe event processing" + ); + return Ok(derived_ref_pair.derived); + } + if self.rollup_config.is_post_interop(derived_ref_pair.derived.timestamp) { return self.process_safe_derived_block(derived_ref_pair).await } @@ -404,6 +496,7 @@ where } } } + async fn retry_with_resync_derived_block( &self, derived_ref_pair: DerivedRefPair, @@ -445,6 +538,17 @@ where "Processing unsafe block" ); + let invalidated_block = self.invalidated_block.read().await; + if invalidated_block.is_some() { + trace!( + target: "chain_processor", + chain_id = self.chain_id, + block_number = block.number, + "Invalidated block already set, skipping unsafe event processing" + ); + return Ok(block); + } + if self.rollup_config.is_post_interop(block.timestamp) { self.log_indexer.clone().sync_logs(block); return Ok(block); @@ -504,8 +608,8 @@ mod tests { config::Genesis, event::ChainEvent, syncnode::{ - BlockProvider, ManagedNodeController, ManagedNodeDataProvider, ManagedNodeError, - NodeSubscriber, + AuthenticationError, BlockProvider, ClientError, ManagedNodeController, + ManagedNodeDataProvider, ManagedNodeError, NodeSubscriber, }, }; use alloy_primitives::B256; @@ -514,7 +618,8 @@ mod tests { use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - DerivationStorageWriter, HeadRefStorageWriter, LogStorageWriter, StorageError, + DerivationStorageReader, DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, + LogStorageWriter, StorageError, }; use kona_supervisor_types::{BlockSeal, Log, OutputV0, Receipts}; use mockall::mock; @@ -605,6 +710,12 @@ mod tests { fn get_logs(&self, block_number: u64) -> Result, StorageError>; } + impl DerivationStorageReader for Db { + fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result; + fn latest_derived_block_at_source(&self, source_block_id: BlockNumHash) -> Result; + fn latest_derivation_state(&self) -> Result; + } + impl DerivationStorageWriter for Db { fn initialise_derivation_storage( &self, @@ -1395,4 +1506,282 @@ mod tests { cancel_token.cancel(); task_handle.await.unwrap(); } + + #[tokio::test] + async fn test_handle_invalidate_block_already_set_skips() { + let mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + // Set up state: invalidated_block is already set + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + { + let mut guard = task.invalidated_block.write().await; + *guard = Some(DerivedRefPair { source: block, derived: block }); + } + + let result = task.handle_invalidate_block(block).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_derived_to_source_error() { + let mut mockdb = MockDb::new(); + let mocknode = MockNode::new(); + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + mockdb.expect_derived_to_source().returning(move |_id| Err(StorageError::FutureData)); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(block).await; + assert!(matches!(result, Err(ChainProcessorError::StorageError(StorageError::FutureData)))); + + // make sure invalidated_block is not set + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_rewind_error() { + let mut mockdb = MockDb::new(); + let mocknode = MockNode::new(); + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + mockdb.expect_derived_to_source().returning(move |_id| Ok(block)); + mockdb.expect_rewind().returning(move |_to| Err(StorageError::DatabaseNotInitialised)); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(block).await; + assert!(matches!( + result, + Err(ChainProcessorError::StorageError(StorageError::DatabaseNotInitialised)) + )); + + // make sure invalidated_block is not set + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_managed_node_error() { + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + let block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + + mockdb.expect_derived_to_source().returning(move |_id| Ok(block)); + mockdb.expect_rewind().returning(move |_to| Ok(())); + mocknode.expect_invalidate_block().returning(move |_seal| { + Err(ManagedNodeError::ClientError(ClientError::Authentication( + AuthenticationError::InvalidHeader, + ))) + }); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(block).await; + assert!(matches!(result, Err(ChainProcessorError::ManagedNode(_)))); + + // make sure invalidated_block is not set + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } + + #[tokio::test] + async fn test_handle_invalidate_block_success_sets_invalidated() { + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + let derived_block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + let source_block = BlockInfo::new(B256::from([2u8; 32]), 41, B256::ZERO, 12344); + + mockdb.expect_derived_to_source().returning(move |_id| Ok(source_block)); + mockdb.expect_rewind().returning(move |_to| Ok(())); + mocknode.expect_invalidate_block().returning(move |_seal| Ok(())); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_invalidate_block(derived_block).await; + assert!(result.is_ok()); + + // make sure invalidated_block is set + let guard = task.invalidated_block.read().await; + let pair = guard.as_ref().expect("invalidated_block should be set"); + assert_eq!(pair.derived, derived_block); + assert_eq!(pair.source, source_block); + } + + #[tokio::test] + async fn test_handle_block_replacement_no_invalidated_block() { + let mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + let replacement = BlockReplacement { + invalidated: B256::from([1u8; 32]), + replacement: BlockInfo::new(B256::from([2u8; 32]), 43, B256::ZERO, 12346), + }; + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + + let result = task.handle_block_replacement(replacement).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_handle_block_replacement_invalidated_hash_mismatch() { + let mockdb = MockDb::new(); + let mocknode = MockNode::new(); + + let invalidated_block = BlockInfo::new(B256::from([3u8; 32]), 42, B256::ZERO, 12345); + let replacement = BlockReplacement { + invalidated: B256::from([1u8; 32]), // does not match invalidated_block.hash + replacement: BlockInfo::new(B256::from([2u8; 32]), 43, B256::ZERO, 12346), + }; + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + { + let mut guard = task.invalidated_block.write().await; + *guard = Some(DerivedRefPair { source: invalidated_block, derived: invalidated_block }); + } + + let result = task.handle_block_replacement(replacement).await; + assert!(result.is_ok()); + + // invalidated_block should remain set + let guard = task.invalidated_block.read().await; + assert!(guard.is_some()); + } + + #[tokio::test] + async fn test_handle_block_replacement_success() { + let mut mockdb = MockDb::new(); + let mut mocknode = MockNode::new(); + + let source_block = BlockInfo::new(B256::from([1u8; 32]), 45, B256::ZERO, 12345); + let invalidated_block = BlockInfo::new(B256::from([1u8; 32]), 42, B256::ZERO, 12345); + let replacement_block = BlockInfo::new(B256::from([2u8; 32]), 42, B256::ZERO, 12346); + + mockdb.expect_save_derived_block().returning(move |_pair| Ok(())); + mockdb.expect_store_block_logs().returning(move |_block, _logs| Ok(())); + + mocknode.expect_fetch_receipts().returning(move |_block_hash| { + assert_eq!(_block_hash, replacement_block.hash); + Ok(Receipts::default()) + }); + + let writer = Arc::new(mockdb); + let managed_node = Arc::new(mocknode); + let cancel_token = CancellationToken::new(); + let (_tx, rx) = mpsc::channel(10); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); + { + let mut guard = task.invalidated_block.write().await; + *guard = Some(DerivedRefPair { source: source_block, derived: invalidated_block }); + } + + let replacement = BlockReplacement { + invalidated: invalidated_block.hash, + replacement: replacement_block, + }; + + let result = task.handle_block_replacement(replacement).await; + assert!(result.is_ok()); + + // invalidated_block should be cleared + let guard = task.invalidated_block.read().await; + assert!(guard.is_none()); + } } diff --git a/crates/supervisor/core/src/event/chain.rs b/crates/supervisor/core/src/event/chain.rs index 0f9b666a8a..69711ae20b 100644 --- a/crates/supervisor/core/src/event/chain.rs +++ b/crates/supervisor/core/src/event/chain.rs @@ -26,6 +26,12 @@ pub enum ChainEvent { origin: BlockInfo, }, + /// An invalidate Block event, indicating that a block has been invalidated. + InvalidateBlock { + /// The [`BlockInfo`] of the block that has been invalidated. + block: BlockInfo, + }, + /// A block replacement event, indicating that a block has been replaced with a new one. BlockReplaced { /// The [`BlockReplacement`] containing the replacement block and the invalidated block diff --git a/crates/supervisor/types/src/types.rs b/crates/supervisor/types/src/types.rs index cb5ddccbf3..44ee01b9ce 100644 --- a/crates/supervisor/types/src/types.rs +++ b/crates/supervisor/types/src/types.rs @@ -4,7 +4,7 @@ //! and the op-node components in the rollup system. It includes block references, //! block seals, derivation events, and event notifications. -use alloy_primitives::{B256, U64}; +use alloy_primitives::B256; use kona_interop::ManagedEvent; use serde::{Deserialize, Serialize}; @@ -18,14 +18,14 @@ pub struct BlockSeal { /// The block's hash pub hash: B256, /// The block number - pub number: U64, + pub number: u64, /// The block's timestamp - pub timestamp: U64, + pub timestamp: u64, } impl BlockSeal { /// Creates a new [`BlockSeal`] with the given hash, number, and timestamp. - pub const fn new(hash: B256, number: U64, timestamp: U64) -> Self { + pub const fn new(hash: B256, number: u64, timestamp: u64) -> Self { Self { hash, number, timestamp } } }