diff --git a/Cargo.lock b/Cargo.lock index bb1ceccc1f..2241acded6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5406,6 +5406,7 @@ dependencies = [ "bytes", "derive_more", "eyre", + "kona-cli", "kona-interop", "kona-protocol", "kona-supervisor-metrics", @@ -5420,6 +5421,7 @@ dependencies = [ "tempfile", "test-fuzz", "thiserror 2.0.16", + "tokio", "tracing", ] diff --git a/crates/supervisor/storage/Cargo.toml b/crates/supervisor/storage/Cargo.toml index d34a4b15b6..ea995d1042 100644 --- a/crates/supervisor/storage/Cargo.toml +++ b/crates/supervisor/storage/Cargo.toml @@ -41,9 +41,14 @@ reth-db-api = { workspace = true } reth-db = { workspace = true } reth-codecs = { workspace = true } +# HTTP client and TLS for remote signer +tokio = { workspace = true, features = ["full"] } + [dev-dependencies] test-fuzz = { workspace = true } tempfile = { workspace = true } +tokio.workspace = true +kona-cli.workspace = true [lints] workspace = true diff --git a/crates/supervisor/storage/src/providers/derivation_provider.rs b/crates/supervisor/storage/src/providers/derivation_provider.rs index f19459b7a3..3551d2b387 100644 --- a/crates/supervisor/storage/src/providers/derivation_provider.rs +++ b/crates/supervisor/storage/src/providers/derivation_provider.rs @@ -7,20 +7,37 @@ use crate::{ }; use alloy_eips::eip1898::BlockNumHash; use alloy_primitives::ChainId; -use derive_more::Constructor; use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use reth_db_api::{ cursor::DbCursorRO, transaction::{DbTx, DbTxMut}, }; -use tracing::{error, warn}; +use tracing::{error, info, trace, warn}; + +const DEFAULT_LOG_INTERVAL: u64 = 100; /// Provides access to derivation storage operations within a transaction. -#[derive(Debug, Constructor)] +#[derive(Debug)] pub(crate) struct DerivationProvider<'tx, TX> { tx: &'tx TX, chain_id: ChainId, + #[doc(hidden)] + observability_interval: u64, +} + +impl<'tx, TX> DerivationProvider<'tx, TX> { + pub(crate) const fn new(tx: &'tx TX, chain_id: ChainId) -> Self { + Self::new_with_observability_interval(tx, chain_id, DEFAULT_LOG_INTERVAL) + } + + pub(crate) const fn new_with_observability_interval( + tx: &'tx TX, + chain_id: ChainId, + observability_interval: u64, + ) -> Self { + Self { tx, chain_id, observability_interval } + } } impl DerivationProvider<'_, TX> @@ -489,15 +506,92 @@ where /// This removes all derived blocks with number >= the given block number /// and updates the traversal state accordingly. pub(crate) fn rewind_to(&self, block: &BlockNumHash) -> Result<(), StorageError> { + info!( + target: "supervisor::storage", + chain_id = %self.chain_id, + target_block_number = %block.number, + target_block_hash = %block.hash, + "Starting rewind of derivation storage" + ); + + // Validate the block exists and get the block pair - this provides hash validation let block_pair = self.get_derived_block_pair(*block)?; - // Delete all derived blocks with number ≥ `block_info.number` + // Get the latest block number from DerivedBlocks + let latest_block = { + let mut cursor = self.tx.cursor_read::()?; + cursor.last()?.map(|(num, _)| num).unwrap_or(block.number) + }; + + // Check for future block + if block.number > latest_block { + error!( + target: "supervisor::storage", + chain_id = %self.chain_id, + target_block_number = %block.number, + latest_block, + "Cannot rewind to future block" + ); + return Err(StorageError::FutureData) + } + + // total blocks to rewind down to and including tgt block + let total_blocks = latest_block - block.number + 1; + let mut processed_blocks = 0; + + // Delete all derived blocks with number ≥ `block.number` { let mut cursor = self.tx.cursor_write::()?; let mut walker = cursor.walk(Some(block.number))?; - while let Some(Ok((_, _))) = walker.next() { - walker.delete_current()?; // we’re already walking from the rewind point + + trace!( + target: "supervisor::storage", + chain_id = %self.chain_id, + target_block_number = %block.number, + target_block_hash = %block.hash, + latest_block, + total_blocks, + observability_interval = %self.observability_interval, + "Rewinding derived block storage..." + ); + + while let Some(Ok((key, _stored_block))) = walker.next() { + // Remove the block first + walker.delete_current()?; + + // Only count as processed after successful deletion + processed_blocks += 1; + + // Log progress periodically or on last block + if processed_blocks % self.observability_interval == 0 || + processed_blocks == total_blocks + { + let percentage = if total_blocks > 0 { + (processed_blocks as f64 / total_blocks as f64 * 100.0).min(100.0) + } else { + 100.0 + }; + + info!( + target: "supervisor::storage", + chain_id = %self.chain_id, + block_number = %key, + percentage = %format!("{:.2}%", percentage), + processed_blocks, + total_blocks, + "Rewind progress" + ); + } } + + info!( + target: "supervisor::storage", + target_block_number = %block.number, + target_block_hash = %block.hash, + chain_id = %self.chain_id, + total_blocks, + "Rewind completed successfully" + ); } self.rewind_block_traversal_to(&block_pair) @@ -598,6 +692,7 @@ mod tests { use super::*; use crate::models::Tables; use alloy_primitives::B256; + use kona_cli::init_test_tracing; use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use reth_db::{ @@ -1111,77 +1206,6 @@ mod tests { assert!(tx.put::(source1.number, block_traversal).is_ok()); } - #[test] - fn test_rewind_block_to_success() { - let db = setup_db(); - let tx = db.tx_mut().expect("Failed to get mutable tx"); - let provider = DerivationProvider::new(&tx, CHAIN_ID); - let derived_genesis = block_info(9, genesis_block().hash, 201); - provider - .initialise(derived_pair(genesis_block(), derived_genesis)) - .expect("initialise should succeed"); - - // Setup: - // Block 1: derived blocks 10, 11, 12 - // Block 2: derived blocks 13, 14 - // We will rewind to (source: 1, derived: 11) - // After rewind: - // - Block 1 keeps only 10 - // - Block 2 is removed completely - - // Rewind 2 - to block 10 - // After rewind: Only genesis left - - let (source1, source2) = { - let s1 = block_info(1, genesis_block().hash, 200); - let s2 = block_info(2, s1.hash, 300); - (s1, s2) - }; - let (derived10, derived11, derived12, derived13, derived14) = { - let d1 = block_info(10, derived_genesis.hash, 195); - let d2 = block_info(11, d1.hash, 197); - let d3 = block_info(12, d2.hash, 290); - let d4 = block_info(13, d3.hash, 292); - let d5 = block_info(14, d4.hash, 295); - (d1, d2, d3, d4, d5) - }; - - provider.save_source_block(source1).expect("Failed to save source block 1"); - provider - .save_derived_block(derived_pair(source1, derived10)) - .expect("Failed to save derived block 10"); - provider - .save_derived_block(derived_pair(source1, derived11)) - .expect("Failed to save derived block 11"); - provider - .save_derived_block(derived_pair(source1, derived12)) - .expect("Failed to save derived block 12"); - - provider.save_source_block(source2).expect("Failed to save source block 2"); - provider - .save_derived_block(derived_pair(source2, derived13)) - .expect("Failed to save derived block 13"); - provider - .save_derived_block(derived_pair(source2, derived14)) - .expect("Failed to save derived block 14"); - - let rewind_point = derived11; - provider.rewind_to(&rewind_point.id()).expect("rewind should succeed"); - - // Expect: source block 1 retains only derived 10 - let new_state = provider.latest_derivation_state().expect("should succeed"); - assert_eq!(new_state.derived, derived10); - assert_eq!(new_state.source, source1); - - let rewind_point = derived10; - provider.rewind_to(&rewind_point.id()).expect("rewind should succeed"); - - // Expect: source block 1 retains nothing, genesis should be new latest source - let new_state = provider.latest_derivation_state().expect("should succeed"); - assert_eq!(new_state.derived, derived_genesis); - assert_eq!(new_state.source, genesis_block()); - } - #[test] fn test_get_activation_block_returns_error_if_empty() { let db = setup_db(); @@ -1288,6 +1312,45 @@ mod tests { assert!(matches!(res, Err(StorageError::EntryNotFound(_)))); } + #[test] + fn rewind_to_deletes_derived_blocks_and_returns_target() { + init_test_tracing(); + + let db = setup_db(); + + let source0 = block_info(100, B256::from([100u8; 32]), 200); + let derived0 = block_info(0, genesis_block().hash, 200); + let pair0 = derived_pair(source0, derived0); + assert!(initialize_db(&db, &pair0).is_ok()); + + // Setup source1 with derived 10,11,12 and source2 with 13,14 + let source1 = block_info(101, source0.hash, 200); + let source2 = block_info(102, source1.hash, 300); + let derived1 = block_info(1, derived0.hash, 195); + let derived2 = block_info(2, derived1.hash, 197); + let derived3 = block_info(3, derived2.hash, 290); + let derived4 = block_info(4, derived3.hash, 292); + let derived5 = block_info(5, derived4.hash, 295); + + assert!(insert_source_block(&db, &source1).is_ok()); + assert!(insert_pair(&db, &derived_pair(source1, derived1)).is_ok()); + assert!(insert_pair(&db, &derived_pair(source1, derived2)).is_ok()); + assert!(insert_pair(&db, &derived_pair(source1, derived3)).is_ok()); + + assert!(insert_source_block(&db, &source2).is_ok()); + assert!(insert_pair(&db, &derived_pair(source2, derived4)).is_ok()); + assert!(insert_pair(&db, &derived_pair(source2, derived5)).is_ok()); + + // Perform rewind_to_source starting at source1 + let tx = db.tx_mut().expect("Could not get mutable tx"); + let provider = DerivationProvider::new_with_observability_interval(&tx, CHAIN_ID, 1); + let derived_id = BlockNumHash { number: derived1.number, hash: derived1.hash }; + provider.rewind_to(&derived_id).expect("rewind should succeed"); + + let res = provider.get_derived_block_pair_by_number(1); + assert!(matches!(res, Err(StorageError::EntryNotFound(_)))); + } + #[test] fn rewind_to_source_with_empty_derived_list_returns_none() { let db = setup_db(); diff --git a/crates/supervisor/storage/src/providers/log_provider.rs b/crates/supervisor/storage/src/providers/log_provider.rs index 2f350f4145..f6e355a680 100644 --- a/crates/supervisor/storage/src/providers/log_provider.rs +++ b/crates/supervisor/storage/src/providers/log_provider.rs @@ -19,7 +19,6 @@ use crate::{ }; use alloy_eips::BlockNumHash; use alloy_primitives::ChainId; -use derive_more::Constructor; use kona_protocol::BlockInfo; use kona_supervisor_types::Log; use reth_db_api::{ @@ -27,13 +26,31 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, }; use std::fmt::Debug; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, trace, warn}; + +const DEFAULT_LOG_INTERVAL: u64 = 100; /// A log storage that wraps a transactional reference to the MDBX backend. -#[derive(Debug, Constructor)] +#[derive(Debug)] pub(crate) struct LogProvider<'tx, TX> { tx: &'tx TX, chain_id: ChainId, + #[doc(hidden)] + observability_interval: u64, +} + +impl<'tx, TX> LogProvider<'tx, TX> { + pub(crate) const fn new(tx: &'tx TX, chain_id: ChainId) -> Self { + Self::new_with_observability_interval(tx, chain_id, DEFAULT_LOG_INTERVAL) + } + + pub(crate) const fn new_with_observability_interval( + tx: &'tx TX, + chain_id: ChainId, + observability_interval: u64, + ) -> Self { + Self { tx, chain_id, observability_interval } + } } impl LogProvider<'_, TX> @@ -142,23 +159,103 @@ where /// Rewinds the log storage by deleting all blocks and logs from the given block onward. /// Fails if the given block exists with a mismatching hash (to prevent unsafe deletion). pub(crate) fn rewind_to(&self, block: &BlockNumHash) -> Result<(), StorageError> { - let mut cursor = self.tx.cursor_write::()?; - let mut walker = cursor.walk(Some(block.number))?; + info!( + target: "supervisor::storage", + chain_id = %self.chain_id, + target_block_number = %block.number, + target_block_hash = %block.hash, + "Starting rewind of log storage" + ); - while let Some(Ok((key, stored_block))) = walker.next() { - if key == block.number && block.hash != stored_block.hash { - warn!( - target: "supervisor::storage", - chain_id = %self.chain_id, - %stored_block, - incoming_block = ?block, - "Requested block to rewind does not match stored block", - ); - return Err(StorageError::ConflictError) + // Get the latest block number from BlockRefs + let latest_block = { + let mut cursor = self.tx.cursor_read::()?; + cursor.last()?.map(|(num, _)| num).unwrap_or(block.number) + }; + + // Check for future block + if block.number > latest_block { + error!( + target: "supervisor::storage", + chain_id = %self.chain_id, + target_block_number = %block.number, + latest_block, + "Cannot rewind to future block" + ); + return Err(StorageError::FutureData); + } + + // total blocks to rewind down to and including tgt block + let total_blocks = latest_block - block.number + 1; + let mut processed_blocks = 0; + + // Delete all blocks and logs with number ≥ `block.number` + { + let mut cursor = self.tx.cursor_write::()?; + let mut walker = cursor.walk(Some(block.number))?; + + trace!( + target: "supervisor::storage", + chain_id = %self.chain_id, + target_block_number = %block.number, + target_block_hash = %block.hash, + latest_block, + total_blocks, + observability_interval = %self.observability_interval, + "Rewinding log storage..." + ); + + while let Some(Ok((key, stored_block))) = walker.next() { + if key == block.number && block.hash != stored_block.hash { + warn!( + target: "supervisor::storage", + chain_id = %self.chain_id, + %stored_block, + incoming_block = ?block, + "Requested block to rewind does not match stored block" + ); + return Err(StorageError::ConflictError); + } + // remove the block + walker.delete_current()?; + + // remove the logs of that block + self.tx.delete::(key, None)?; + + processed_blocks += 1; + + // Log progress periodically or on last block + if processed_blocks % self.observability_interval == 0 || + processed_blocks == total_blocks + { + let percentage = if total_blocks > 0 { + (processed_blocks as f64 / total_blocks as f64 * 100.0).min(100.0) + } else { + 100.0 + }; + + info!( + target: "supervisor::storage", + chain_id = %self.chain_id, + block_number = %key, + percentage = %format!("{:.2}%", percentage), + processed_blocks, + total_blocks, + "Rewind progress" + ); + } } - walker.delete_current()?; // remove the block - self.tx.delete::(key, None)?; // remove the logs of that block + + info!( + target: "supervisor::storage", + target_block_number = ?block.number, + target_block_hash = %block.hash, + chain_id = %self.chain_id, + total_blocks, + "Rewind completed successfully" + ); } + Ok(()) } } @@ -319,6 +416,7 @@ mod tests { use super::*; use crate::models::Tables; use alloy_primitives::B256; + use kona_cli::init_test_tracing; use kona_protocol::BlockInfo; use kona_supervisor_types::{ExecutingMessage, Log}; use reth_db::{ @@ -611,6 +709,8 @@ mod tests { #[test] fn test_rewind_to() { + init_test_tracing(); + let db = setup_db(); let genesis = genesis_block(); initialize_db(&db, &genesis).expect("Failed to initialize DB"); @@ -627,12 +727,12 @@ mod tests { // Rewind to block 3, blocks 3, 4, 5 should be removed let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = LogProvider::new(&tx, CHAIN_ID); + let provider = LogProvider::new_with_observability_interval(&tx, CHAIN_ID, 1); provider.rewind_to(&blocks[3].id()).expect("Failed to rewind blocks"); tx.commit().expect("Failed to commit rewind"); let tx = db.tx().expect("Could not get RO tx"); - let provider = LogProvider::new(&tx, CHAIN_ID); + let provider = LogProvider::new_with_observability_interval(&tx, CHAIN_ID, 1); // Blocks 0,1,2 should still exist for i in 0..=2 { @@ -656,6 +756,7 @@ mod tests { assert!(logs.is_empty(), "logs for block {i} should be empty"); } } + #[test] fn test_rewind_to_conflict_hash() { let db = setup_db();