This repository was archived by the owner on Jan 16, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 212
Add observability to rewind progress #2614
Merged
emhane
merged 52 commits into
op-rs:main
from
JoshdfG:Add-observability-to-rewind-progress
Sep 24, 2025
Merged
Changes from all commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
f1f04b2
feat: Add observability to rewind progress #2593
JoshdfG 7e8eb88
feat: Add observability to rewind progress #2593
JoshdfG 7253661
Merge branch 'main' into Add-observability-to-rewind-progress
JoshdfG 0ae5996
feat: updates
JoshdfG f862e01
Merge branch 'main' into Add-observability-to-rewind-progress
JoshdfG 334c6e6
feat: update to meet requirements
JoshdfG f5cdd28
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG e01404e
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 77730b4
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG a23b965
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 525361f
Merge branch 'main' into Add-observability-to-rewind-progress
dhyaniarun1993 a45379d
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG a241c65
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 2f6ca06
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 7ed8209
feat: add logging logic to log_provider
JoshdfG 0cfc04c
Merge branch 'main' into Add-observability-to-rewind-progress
emhane 6de979a
feat: enabled tracing ini derivation_provider tests
JoshdfG f807841
Merge branch 'main' into Add-observability-to-rewind-progress
JoshdfG 17a53e2
feat: update log_provider to test logs
JoshdfG 25e871f
feat: update fmt
JoshdfG fdd251c
feat: update incremental percentage for derivation provider
JoshdfG 0e8fc9c
feat: add line space
JoshdfG c19135b
feat: update implementation
JoshdfG 416128b
Merge branch 'main' into Add-observability-to-rewind-progress
JoshdfG 7de8308
feat: update fmt
JoshdfG 16fd895
feat: update fmt + lint
JoshdfG 176638a
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 246785a
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 0f153d1
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG bdae009
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG db6c1d4
feat: update to meet requirements
JoshdfG 01abedf
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 159efab
feat: move percentage into interval scope
JoshdfG f980f67
feat: update
JoshdfG d8ef10b
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 640caf0
Update crates/supervisor/storage/src/providers/log_provider.rs
JoshdfG b038a1a
Update crates/supervisor/storage/src/providers/derivation_provider.rs
JoshdfG 9287bee
feat: update log_provider
JoshdfG 3609bc1
feat: update
JoshdfG d12cedf
feat: update
JoshdfG cd33e7d
Merge branch 'main' into Add-observability-to-rewind-progress
JoshdfG 55883f7
Update crates/supervisor/storage/src/providers/log_provider.rs
JoshdfG 407b37f
Update crates/supervisor/storage/src/providers/log_provider.rs
JoshdfG 2ef6286
fix: fix indentation
JoshdfG 463def4
Fix broken test
emhane ff12c61
Fix bug calculating total blocks to rewind
emhane 1565269
Fix log rewind observability
emhane 5a91b09
Nitpicks
emhane ee2540f
Merge branch 'main' into Add-observability-to-rewind-progress
emhane 60b6a05
Fix lint
emhane 768f7ea
Merge branch 'Add-observability-to-rewind-progress' of github.com:Jos…
emhane 901146c
Merge branch 'main' into Add-observability-to-rewind-progress
emhane File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,20 +7,37 @@ use crate::{ | |
| }; | ||
| use alloy_eips::eip1898::BlockNumHash; | ||
| use alloy_primitives::ChainId; | ||
| use derive_more::Constructor; | ||
| use kona_interop::DerivedRefPair; | ||
| use kona_protocol::BlockInfo; | ||
| use reth_db_api::{ | ||
| cursor::DbCursorRO, | ||
| transaction::{DbTx, DbTxMut}, | ||
| }; | ||
| use tracing::{error, warn}; | ||
| use tracing::{error, info, trace, warn}; | ||
|
|
||
| const DEFAULT_LOG_INTERVAL: u64 = 100; | ||
|
|
||
| /// Provides access to derivation storage operations within a transaction. | ||
| #[derive(Debug, Constructor)] | ||
| #[derive(Debug)] | ||
| pub(crate) struct DerivationProvider<'tx, TX> { | ||
| tx: &'tx TX, | ||
| chain_id: ChainId, | ||
| #[doc(hidden)] | ||
| observability_interval: u64, | ||
| } | ||
|
|
||
| impl<'tx, TX> DerivationProvider<'tx, TX> { | ||
| pub(crate) const fn new(tx: &'tx TX, chain_id: ChainId) -> Self { | ||
| Self::new_with_observability_interval(tx, chain_id, DEFAULT_LOG_INTERVAL) | ||
| } | ||
|
|
||
| pub(crate) const fn new_with_observability_interval( | ||
| tx: &'tx TX, | ||
| chain_id: ChainId, | ||
| observability_interval: u64, | ||
| ) -> Self { | ||
| Self { tx, chain_id, observability_interval } | ||
| } | ||
| } | ||
|
|
||
| impl<TX> DerivationProvider<'_, TX> | ||
|
|
@@ -489,15 +506,92 @@ where | |
| /// This removes all derived blocks with number >= the given block number | ||
| /// and updates the traversal state accordingly. | ||
| pub(crate) fn rewind_to(&self, block: &BlockNumHash) -> Result<(), StorageError> { | ||
| info!( | ||
| target: "supervisor::storage", | ||
| chain_id = %self.chain_id, | ||
| target_block_number = %block.number, | ||
| target_block_hash = %block.hash, | ||
| "Starting rewind of derivation storage" | ||
| ); | ||
|
|
||
| // Validate the block exists and get the block pair - this provides hash validation | ||
| let block_pair = self.get_derived_block_pair(*block)?; | ||
|
|
||
| // Delete all derived blocks with number ≥ `block_info.number` | ||
| // Get the latest block number from DerivedBlocks | ||
| let latest_block = { | ||
| let mut cursor = self.tx.cursor_read::<DerivedBlocks>()?; | ||
| cursor.last()?.map(|(num, _)| num).unwrap_or(block.number) | ||
| }; | ||
|
emhane marked this conversation as resolved.
|
||
|
|
||
| // Check for future block | ||
| if block.number > latest_block { | ||
| error!( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can the node recover from this error? if it can, this should be downgraded to |
||
| target: "supervisor::storage", | ||
| chain_id = %self.chain_id, | ||
| target_block_number = %block.number, | ||
| latest_block, | ||
| "Cannot rewind to future block" | ||
| ); | ||
| return Err(StorageError::FutureData) | ||
| } | ||
|
Comment on lines
+526
to
+536
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice |
||
|
|
||
| // total blocks to rewind down to and including tgt block | ||
| let total_blocks = latest_block - block.number + 1; | ||
| let mut processed_blocks = 0; | ||
|
|
||
| // Delete all derived blocks with number ≥ `block.number` | ||
| { | ||
| let mut cursor = self.tx.cursor_write::<DerivedBlocks>()?; | ||
| let mut walker = cursor.walk(Some(block.number))?; | ||
| while let Some(Ok((_, _))) = walker.next() { | ||
| walker.delete_current()?; // we’re already walking from the rewind point | ||
|
|
||
| trace!( | ||
| target: "supervisor::storage", | ||
| chain_id = %self.chain_id, | ||
| target_block_number = %block.number, | ||
| target_block_hash = %block.hash, | ||
| latest_block, | ||
| total_blocks, | ||
| observability_interval = %self.observability_interval, | ||
| "Rewinding derived block storage..." | ||
| ); | ||
|
|
||
| while let Some(Ok((key, _stored_block))) = walker.next() { | ||
| // Remove the block first | ||
| walker.delete_current()?; | ||
|
|
||
| // Only count as processed after successful deletion | ||
| processed_blocks += 1; | ||
|
|
||
| // Log progress periodically or on last block | ||
| if processed_blocks % self.observability_interval == 0 || | ||
| processed_blocks == total_blocks | ||
| { | ||
| let percentage = if total_blocks > 0 { | ||
| (processed_blocks as f64 / total_blocks as f64 * 100.0).min(100.0) | ||
| } else { | ||
| 100.0 | ||
| }; | ||
|
|
||
| info!( | ||
| target: "supervisor::storage", | ||
| chain_id = %self.chain_id, | ||
| block_number = %key, | ||
| percentage = %format!("{:.2}%", percentage), | ||
| processed_blocks, | ||
| total_blocks, | ||
| "Rewind progress" | ||
| ); | ||
| } | ||
| } | ||
|
|
||
| info!( | ||
| target: "supervisor::storage", | ||
| target_block_number = %block.number, | ||
| target_block_hash = %block.hash, | ||
| chain_id = %self.chain_id, | ||
| total_blocks, | ||
| "Rewind completed successfully" | ||
| ); | ||
| } | ||
|
|
||
| self.rewind_block_traversal_to(&block_pair) | ||
|
|
@@ -598,6 +692,7 @@ mod tests { | |
| use super::*; | ||
| use crate::models::Tables; | ||
| use alloy_primitives::B256; | ||
| use kona_cli::init_test_tracing; | ||
| use kona_interop::DerivedRefPair; | ||
| use kona_protocol::BlockInfo; | ||
| use reth_db::{ | ||
|
|
@@ -1111,77 +1206,6 @@ mod tests { | |
| assert!(tx.put::<BlockTraversal>(source1.number, block_traversal).is_ok()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_rewind_block_to_success() { | ||
| let db = setup_db(); | ||
| let tx = db.tx_mut().expect("Failed to get mutable tx"); | ||
| let provider = DerivationProvider::new(&tx, CHAIN_ID); | ||
| let derived_genesis = block_info(9, genesis_block().hash, 201); | ||
| provider | ||
| .initialise(derived_pair(genesis_block(), derived_genesis)) | ||
| .expect("initialise should succeed"); | ||
|
|
||
| // Setup: | ||
| // Block 1: derived blocks 10, 11, 12 | ||
| // Block 2: derived blocks 13, 14 | ||
| // We will rewind to (source: 1, derived: 11) | ||
| // After rewind: | ||
| // - Block 1 keeps only 10 | ||
| // - Block 2 is removed completely | ||
|
|
||
| // Rewind 2 - to block 10 | ||
| // After rewind: Only genesis left | ||
|
|
||
| let (source1, source2) = { | ||
| let s1 = block_info(1, genesis_block().hash, 200); | ||
| let s2 = block_info(2, s1.hash, 300); | ||
| (s1, s2) | ||
| }; | ||
| let (derived10, derived11, derived12, derived13, derived14) = { | ||
| let d1 = block_info(10, derived_genesis.hash, 195); | ||
| let d2 = block_info(11, d1.hash, 197); | ||
| let d3 = block_info(12, d2.hash, 290); | ||
| let d4 = block_info(13, d3.hash, 292); | ||
| let d5 = block_info(14, d4.hash, 295); | ||
| (d1, d2, d3, d4, d5) | ||
| }; | ||
|
|
||
| provider.save_source_block(source1).expect("Failed to save source block 1"); | ||
| provider | ||
| .save_derived_block(derived_pair(source1, derived10)) | ||
| .expect("Failed to save derived block 10"); | ||
| provider | ||
| .save_derived_block(derived_pair(source1, derived11)) | ||
| .expect("Failed to save derived block 11"); | ||
| provider | ||
| .save_derived_block(derived_pair(source1, derived12)) | ||
| .expect("Failed to save derived block 12"); | ||
|
|
||
| provider.save_source_block(source2).expect("Failed to save source block 2"); | ||
| provider | ||
| .save_derived_block(derived_pair(source2, derived13)) | ||
| .expect("Failed to save derived block 13"); | ||
| provider | ||
| .save_derived_block(derived_pair(source2, derived14)) | ||
| .expect("Failed to save derived block 14"); | ||
|
|
||
| let rewind_point = derived11; | ||
| provider.rewind_to(&rewind_point.id()).expect("rewind should succeed"); | ||
|
|
||
| // Expect: source block 1 retains only derived 10 | ||
| let new_state = provider.latest_derivation_state().expect("should succeed"); | ||
| assert_eq!(new_state.derived, derived10); | ||
| assert_eq!(new_state.source, source1); | ||
|
|
||
| let rewind_point = derived10; | ||
| provider.rewind_to(&rewind_point.id()).expect("rewind should succeed"); | ||
|
|
||
| // Expect: source block 1 retains nothing, genesis should be new latest source | ||
| let new_state = provider.latest_derivation_state().expect("should succeed"); | ||
| assert_eq!(new_state.derived, derived_genesis); | ||
| assert_eq!(new_state.source, genesis_block()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_get_activation_block_returns_error_if_empty() { | ||
| let db = setup_db(); | ||
|
|
@@ -1288,6 +1312,45 @@ mod tests { | |
| assert!(matches!(res, Err(StorageError::EntryNotFound(_)))); | ||
| } | ||
|
|
||
| #[test] | ||
| fn rewind_to_deletes_derived_blocks_and_returns_target() { | ||
| init_test_tracing(); | ||
|
|
||
| let db = setup_db(); | ||
|
|
||
| let source0 = block_info(100, B256::from([100u8; 32]), 200); | ||
| let derived0 = block_info(0, genesis_block().hash, 200); | ||
| let pair0 = derived_pair(source0, derived0); | ||
| assert!(initialize_db(&db, &pair0).is_ok()); | ||
|
|
||
| // Setup source1 with derived 10,11,12 and source2 with 13,14 | ||
| let source1 = block_info(101, source0.hash, 200); | ||
| let source2 = block_info(102, source1.hash, 300); | ||
| let derived1 = block_info(1, derived0.hash, 195); | ||
| let derived2 = block_info(2, derived1.hash, 197); | ||
| let derived3 = block_info(3, derived2.hash, 290); | ||
| let derived4 = block_info(4, derived3.hash, 292); | ||
| let derived5 = block_info(5, derived4.hash, 295); | ||
|
|
||
| assert!(insert_source_block(&db, &source1).is_ok()); | ||
| assert!(insert_pair(&db, &derived_pair(source1, derived1)).is_ok()); | ||
| assert!(insert_pair(&db, &derived_pair(source1, derived2)).is_ok()); | ||
| assert!(insert_pair(&db, &derived_pair(source1, derived3)).is_ok()); | ||
|
|
||
| assert!(insert_source_block(&db, &source2).is_ok()); | ||
| assert!(insert_pair(&db, &derived_pair(source2, derived4)).is_ok()); | ||
| assert!(insert_pair(&db, &derived_pair(source2, derived5)).is_ok()); | ||
|
|
||
| // Perform rewind_to_source starting at source1 | ||
| let tx = db.tx_mut().expect("Could not get mutable tx"); | ||
| let provider = DerivationProvider::new_with_observability_interval(&tx, CHAIN_ID, 1); | ||
| let derived_id = BlockNumHash { number: derived1.number, hash: derived1.hash }; | ||
| provider.rewind_to(&derived_id).expect("rewind should succeed"); | ||
|
|
||
| let res = provider.get_derived_block_pair_by_number(1); | ||
| assert!(matches!(res, Err(StorageError::EntryNotFound(_)))); | ||
| } | ||
|
|
||
| #[test] | ||
| fn rewind_to_source_with_empty_derived_list_returns_none() { | ||
| let db = setup_db(); | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.