diff --git a/crates/supervisor/storage/src/chaindb.rs b/crates/supervisor/storage/src/chaindb.rs index a7ed40bf83..6edb35cece 100644 --- a/crates/supervisor/storage/src/chaindb.rs +++ b/crates/supervisor/storage/src/chaindb.rs @@ -1,7 +1,7 @@ //! Main database access structure and transaction contexts. use crate::{ - Metrics, + Metrics, Rewinder, error::StorageError, providers::{DerivationProvider, LogProvider, SafetyHeadRefProvider}, traits::{ @@ -369,6 +369,14 @@ impl HeadRefStorageWriter for ChainDb { } } +impl Rewinder for ChainDb { + fn rewind_log_storage(&self, from_block: u64) -> Result<(), StorageError> { + self.observe_call("rewind_log_storage", || { + self.env.update(|tx| LogProvider::new(tx).rewind_to(from_block))? + }) + } +} + impl MetricsReporter for ChainDb { fn report_metrics(&self) { let mut metrics = Vec::new(); diff --git a/crates/supervisor/storage/src/lib.rs b/crates/supervisor/storage/src/lib.rs index 33f567fdcb..52d41e1950 100644 --- a/crates/supervisor/storage/src/lib.rs +++ b/crates/supervisor/storage/src/lib.rs @@ -40,5 +40,5 @@ mod traits; pub use traits::{ CrossChainSafetyProvider, DerivationStorage, DerivationStorageReader, DerivationStorageWriter, FinalizedL1Storage, HeadRefStorage, HeadRefStorageReader, HeadRefStorageWriter, LogStorage, - LogStorageReader, LogStorageWriter, + LogStorageReader, LogStorageWriter, Rewinder, }; diff --git a/crates/supervisor/storage/src/metrics.rs b/crates/supervisor/storage/src/metrics.rs index 06f7759f81..719c192d0e 100644 --- a/crates/supervisor/storage/src/metrics.rs +++ b/crates/supervisor/storage/src/metrics.rs @@ -14,7 +14,7 @@ impl Metrics { "kona_supervisor_storage_duration_seconds"; // List all your ChainDb method names here - const METHODS: [&'static str; 19] = [ + const METHODS: [&'static str; 20] = [ "derived_to_source", "latest_derived_block_at_source", "latest_derivation_state", @@ -34,6 +34,7 @@ impl Metrics { "update_current_cross_safe", "update_finalized_l1", "get_finalized_l1", + "rewind_log_storage", // Add more as needed ]; diff --git a/crates/supervisor/storage/src/providers/log_provider.rs b/crates/supervisor/storage/src/providers/log_provider.rs index 450fc00b90..fb6729f010 100644 --- a/crates/supervisor/storage/src/providers/log_provider.rs +++ b/crates/supervisor/storage/src/providers/log_provider.rs @@ -122,6 +122,19 @@ where } Ok(()) } + + pub(crate) fn rewind_to(&self, block_number: u64) -> Result<(), StorageError> { + let mut cursor = self.tx.cursor_write::()?; + let mut walker = cursor.walk(Some(block_number))?; + + while let Some(Ok((key, _))) = walker.next() { + if key >= block_number { + walker.delete_current()?; // remove the block + self.tx.delete::(key, None)?; // remove the logs of that block + } + } + Ok(()) + } } impl LogProvider<'_, TX> @@ -530,4 +543,52 @@ mod tests { let result = insert_block_logs(&db, &genesis_conflict, Vec::new()); assert!(matches!(result, Err(StorageError::ConflictError))); } + + #[test] + fn test_rewind_block_logs_from() { + let db = setup_db(); + let genesis = genesis_block(); + initialize_db(&db, &genesis).expect("Failed to initialize DB"); + + // Add 5 blocks with logs + let mut blocks = vec![genesis]; + for i in 1..=5 { + let prev = &blocks[i - 1]; + let block = sample_block_info(i as u64, prev.hash); + let logs = (0..3).map(|j| sample_log(j, j % 2 == 0)).collect(); + insert_block_logs(&db, &block, logs).expect("Failed to insert logs"); + blocks.push(block); + } + + // Rewind from block 3, blocks 3, 4, 5 should be removed + let tx = db.tx_mut().expect("Could not get mutable tx"); + let provider = LogProvider::new(&tx); + provider.rewind_to(3).expect("Failed to rewind blocks"); + tx.commit().expect("Failed to commit rewind"); + + let tx = db.tx().expect("Could not get RO tx"); + let provider = LogProvider::new(&tx); + + // Blocks 0,1,2 should still exist + for i in 0..=2 { + assert!(provider.get_block(i).is_ok(), "block {i} should exist after rewind"); + } + + // Logs for blocks 0,1,2 should exist + for i in 1..=2 { + let logs = provider.get_logs(i).expect("logs should exist"); + assert_eq!(logs.len(), 3); + } + + // Blocks 3,4,5 should be gone + for i in 3..=5 { + assert!( + matches!(provider.get_block(i), Err(StorageError::EntryNotFound(_))), + "block {i} should be removed" + ); + + let logs = provider.get_logs(i).expect("get_logs should not fail"); + assert!(logs.is_empty(), "logs for block {i} should be empty"); + } + } } diff --git a/crates/supervisor/storage/src/traits.rs b/crates/supervisor/storage/src/traits.rs index 106380f197..1e2b82e774 100644 --- a/crates/supervisor/storage/src/traits.rs +++ b/crates/supervisor/storage/src/traits.rs @@ -385,3 +385,19 @@ pub trait CrossChainSafetyProvider { block: &BlockInfo, ) -> Result; } + +/// Trait for rewinding supervisor-related state in the database. +/// +/// This trait is used to revert persisted log data, derivation and safety head ref data +/// from a given block onward. It is typically used to handle chain +/// reorganizations or invalid block propagation. +pub trait Rewinder { + /// Rewinds the log storage from the given block number (inclusive) to the latest block. + /// + /// # Arguments + /// - `from_block`: The block number from which to start the rewind (inclusive). + /// + /// # Errors + /// Returns [`StorageError`] if any database operation fails. + fn rewind_log_storage(&self, from_block: u64) -> Result<(), StorageError>; +}