diff --git a/crates/supervisor/storage/src/chaindb.rs b/crates/supervisor/storage/src/chaindb.rs index 5b274e399a..093ecaeeb9 100644 --- a/crates/supervisor/storage/src/chaindb.rs +++ b/crates/supervisor/storage/src/chaindb.rs @@ -73,7 +73,9 @@ impl ChainDb { impl DerivationStorageReader for ChainDb { fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result { self.observe_call("derived_to_source", || { - self.env.view(|tx| DerivationProvider::new(tx).derived_to_source(derived_block_id)) + self.env.view(|tx| { + DerivationProvider::new(tx, self.chain_id).derived_to_source(derived_block_id) + }) })? } @@ -83,14 +85,15 @@ impl DerivationStorageReader for ChainDb { ) -> Result { self.observe_call("latest_derived_block_at_source", || { self.env.view(|tx| { - DerivationProvider::new(tx).latest_derived_block_at_source(source_block_id) + DerivationProvider::new(tx, self.chain_id) + .latest_derived_block_at_source(source_block_id) }) })? } fn latest_derivation_state(&self) -> Result { self.observe_call("latest_derivation_state", || { - self.env.view(|tx| DerivationProvider::new(tx).latest_derivation_state()) + self.env.view(|tx| DerivationProvider::new(tx, self.chain_id).latest_derivation_state()) })? } } @@ -102,10 +105,10 @@ impl DerivationStorageWriter for ChainDb { ) -> Result<(), StorageError> { self.observe_call("initialise_derivation_storage", || { self.env.update(|ctx| { - DerivationProvider::new(ctx).initialise(incoming_pair)?; - SafetyHeadRefProvider::new(ctx) + DerivationProvider::new(ctx, self.chain_id).initialise(incoming_pair)?; + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived)?; - SafetyHeadRefProvider::new(ctx) + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::CrossSafe, &incoming_pair.derived) }) })? @@ -115,31 +118,33 @@ impl DerivationStorageWriter for ChainDb { self.observe_call("save_derived_block", || { self.env.update(|ctx| { let derived_block = incoming_pair.derived; - let block = LogProvider::new(ctx).get_block(derived_block.number).map_err( - |err| match err { + let block = LogProvider::new(ctx, self.chain_id) + .get_block(derived_block.number) + .map_err(|err| match err { StorageError::EntryNotFound(_) => { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_block = %derived_block, "Derived block not found in log storage: {derived_block:?}" ); StorageError::ConflictError } other => other, // propagate other errors as-is - }, - )?; + })?; if block != derived_block { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_block = %derived_block, stored_log_block = %block, "Derived block does not match the stored log block" ); return Err(StorageError::ConflictError); } - DerivationProvider::new(ctx).save_derived_block(incoming_pair)?; - SafetyHeadRefProvider::new(ctx) + DerivationProvider::new(ctx, self.chain_id).save_derived_block(incoming_pair)?; + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived) }) })? @@ -147,7 +152,9 @@ impl DerivationStorageWriter for ChainDb { fn save_source_block(&self, incoming_source: BlockInfo) -> Result<(), StorageError> { self.observe_call("save_source_block", || { - self.env.update(|ctx| DerivationProvider::new(ctx).save_source_block(incoming_source)) + self.env.update(|ctx| { + DerivationProvider::new(ctx, self.chain_id).save_source_block(incoming_source) + }) })? } } @@ -156,25 +163,25 @@ impl DerivationStorageWriter for ChainDb { impl LogStorageReader for ChainDb { fn get_latest_block(&self) -> Result { self.observe_call("get_latest_block", || { - self.env.view(|tx| LogProvider::new(tx).get_latest_block()) + self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_latest_block()) })? } fn get_block(&self, block_number: u64) -> Result { self.observe_call("get_block", || { - self.env.view(|tx| LogProvider::new(tx).get_block(block_number)) + self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_block(block_number)) })? } fn get_log(&self, block_number: u64, log_index: u32) -> Result { self.observe_call("get_log", || { - self.env.view(|tx| LogProvider::new(tx).get_log(block_number, log_index)) + self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_log(block_number, log_index)) })? } fn get_logs(&self, block_number: u64) -> Result, StorageError> { self.observe_call("get_logs", || { - self.env.view(|tx| LogProvider::new(tx).get_logs(block_number)) + self.env.view(|tx| LogProvider::new(tx, self.chain_id).get_logs(block_number)) })? } } @@ -183,10 +190,10 @@ impl LogStorageWriter for ChainDb { fn initialise_log_storage(&self, block: BlockInfo) -> Result<(), StorageError> { self.observe_call("initialise_log_storage", || { self.env.update(|ctx| { - LogProvider::new(ctx).initialise(block)?; - SafetyHeadRefProvider::new(ctx) + LogProvider::new(ctx, self.chain_id).initialise(block)?; + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::LocalUnsafe, &block)?; - SafetyHeadRefProvider::new(ctx) + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::CrossUnsafe, &block) }) })? @@ -195,9 +202,9 @@ impl LogStorageWriter for ChainDb { fn store_block_logs(&self, block: &BlockInfo, logs: Vec) -> Result<(), StorageError> { self.observe_call("store_block_logs", || { self.env.update(|ctx| { - LogProvider::new(ctx).store_block_logs(block, logs)?; + LogProvider::new(ctx, self.chain_id).store_block_logs(block, logs)?; - SafetyHeadRefProvider::new(ctx) + SafetyHeadRefProvider::new(ctx, self.chain_id) .update_safety_head_ref(SafetyLevel::LocalUnsafe, block) }) })? @@ -207,7 +214,9 @@ impl LogStorageWriter for ChainDb { impl HeadRefStorageReader for ChainDb { fn get_safety_head_ref(&self, safety_level: SafetyLevel) -> Result { self.observe_call("get_safety_head_ref", || { - self.env.view(|tx| SafetyHeadRefProvider::new(tx).get_safety_head_ref(safety_level)) + self.env.view(|tx| { + SafetyHeadRefProvider::new(tx, self.chain_id).get_safety_head_ref(safety_level) + }) })? } @@ -215,7 +224,7 @@ impl HeadRefStorageReader for ChainDb { fn get_super_head(&self) -> Result { self.observe_call("get_super_head", || { self.env.view(|tx| { - let sp = SafetyHeadRefProvider::new(tx); + let sp = SafetyHeadRefProvider::new(tx, self.chain_id); let local_unsafe = sp.get_safety_head_ref(SafetyLevel::LocalUnsafe).map_err(|err| { if matches!(err, StorageError::FutureData) { @@ -249,11 +258,12 @@ impl HeadRefStorageReader for ChainDb { Err(err) => return Err(err), }; - let l1_source = match DerivationProvider::new(tx).latest_derivation_state() { - Ok(pair) => Some(pair.source), - Err(StorageError::DatabaseNotInitialised) => None, - Err(err) => return Err(err), - }; + let l1_source = + match DerivationProvider::new(tx, self.chain_id).latest_derivation_state() { + Ok(pair) => Some(pair.source), + Err(StorageError::DatabaseNotInitialised) => None, + Err(err) => return Err(err), + }; Ok(SuperHead { l1_source, @@ -275,16 +285,17 @@ impl HeadRefStorageWriter for ChainDb { ) -> Result { self.observe_call("update_finalized_using_source", || { self.env.update(|tx| { - let sp = SafetyHeadRefProvider::new(tx); + let sp = SafetyHeadRefProvider::new(tx, self.chain_id); let safe = sp.get_safety_head_ref(SafetyLevel::CrossSafe)?; - let dp = DerivationProvider::new(tx); + let dp = DerivationProvider::new(tx, self.chain_id); let safe_block_pair = dp.get_derived_block_pair(safe.id())?; if finalized_source_block.number >= safe_block_pair.source.number { // this could happen during initial sync warn!( target: "supervisor_storage", + chain_id = %self.chain_id, l1_finalized_block_number = finalized_source_block.number, safe_source_block_number = safe_block_pair.source.number, "L1 finalized block is greater than safe block", @@ -304,14 +315,15 @@ impl HeadRefStorageWriter for ChainDb { fn update_current_cross_unsafe(&self, block: &BlockInfo) -> Result<(), StorageError> { self.observe_call("update_current_cross_unsafe", || { self.env.update(|tx| { - let lp = LogProvider::new(tx); - let sp = SafetyHeadRefProvider::new(tx); + let lp = LogProvider::new(tx, self.chain_id); + let sp = SafetyHeadRefProvider::new(tx, self.chain_id); // Check parent-child relationship with current CrossUnsafe head, if it exists. let parent = sp.get_safety_head_ref(SafetyLevel::CrossUnsafe)?; if !parent.is_parent_of(block) { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_block = %block, latest_block = %parent, "Incoming block is not the child of the current cross-unsafe head", @@ -324,17 +336,15 @@ impl HeadRefStorageWriter for ChainDb { if stored_block.hash != block.hash { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_block_hash = %block.hash, stored_block_hash = %stored_block.hash, "Hash mismatch while updating CrossUnsafe head", ); - return Err(StorageError::EntryNotFound( - "block hash does not match".to_string(), - )); + return Err(StorageError::ConflictError); } sp.update_safety_head_ref(SafetyLevel::CrossUnsafe, block)?; - Ok(()) })? }) @@ -343,14 +353,15 @@ impl HeadRefStorageWriter for ChainDb { fn update_current_cross_safe(&self, block: &BlockInfo) -> Result { self.observe_call("update_current_cross_safe", || { self.env.update(|tx| { - let dp = DerivationProvider::new(tx); - let sp = SafetyHeadRefProvider::new(tx); + let dp = DerivationProvider::new(tx, self.chain_id); + let sp = SafetyHeadRefProvider::new(tx, self.chain_id); // Check parent-child relationship with current CrossUnsafe head, if it exists. let parent = sp.get_safety_head_ref(SafetyLevel::CrossSafe)?; if !parent.is_parent_of(block) { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_block = %block, latest_block = %parent, "Incoming block is not the child of the current cross-safe head", @@ -373,8 +384,8 @@ impl Rewinder for ChainDb { fn rewind_log_storage(&self, to: &BlockNumHash) -> Result<(), StorageError> { self.observe_call("rewind_log_storage", || { self.env.update(|tx| { - let lp = LogProvider::new(tx); - let hp = SafetyHeadRefProvider::new(tx); + let lp = LogProvider::new(tx, self.chain_id); + let hp = SafetyHeadRefProvider::new(tx, self.chain_id); lp.rewind_to(to)?; @@ -390,9 +401,9 @@ impl Rewinder for ChainDb { fn rewind(&self, to: &BlockNumHash) -> Result<(), StorageError> { self.observe_call("rewind", || { self.env.update(|tx| { - let lp = LogProvider::new(tx); - let dp = DerivationProvider::new(tx); - let hp = SafetyHeadRefProvider::new(tx); + let lp = LogProvider::new(tx, self.chain_id); + let dp = DerivationProvider::new(tx, self.chain_id); + let hp = SafetyHeadRefProvider::new(tx, self.chain_id); lp.rewind_to(to)?; dp.rewind_to(to)?; @@ -579,7 +590,7 @@ mod tests { let _ = db .env .update(|ctx| { - let sp = SafetyHeadRefProvider::new(ctx); + let sp = SafetyHeadRefProvider::new(ctx, 1); sp.update_safety_head_ref(SafetyLevel::Finalized, &block) }) .unwrap(); diff --git a/crates/supervisor/storage/src/providers/derivation_provider.rs b/crates/supervisor/storage/src/providers/derivation_provider.rs index e7b8edfaef..fcc23c7a62 100644 --- a/crates/supervisor/storage/src/providers/derivation_provider.rs +++ b/crates/supervisor/storage/src/providers/derivation_provider.rs @@ -6,6 +6,8 @@ use crate::{ }, }; use alloy_eips::eip1898::BlockNumHash; +use alloy_primitives::ChainId; +use derive_more::Constructor; use kona_interop::DerivedRefPair; use kona_protocol::BlockInfo; use reth_db_api::{ @@ -15,16 +17,10 @@ use reth_db_api::{ use tracing::{error, warn}; /// Provides access to derivation storage operations within a transaction. -#[derive(Debug)] +#[derive(Debug, Constructor)] pub(crate) struct DerivationProvider<'tx, TX> { tx: &'tx TX, -} - -impl<'tx, TX> DerivationProvider<'tx, TX> { - /// Creates a new [`DerivationProvider`] instance. - pub(crate) const fn new(tx: &'tx TX) -> Self { - Self { tx } - } + chain_id: ChainId, } impl DerivationProvider<'_, TX> @@ -40,6 +36,7 @@ where self.tx.get::(derived_block_number).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, derived_block_number, %err, "Failed to get derived block pair" @@ -49,6 +46,7 @@ where let derived_block_pair = derived_block_pair_opt.ok_or_else(|| { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, derived_block_number, "Derived block not found" ); @@ -70,6 +68,7 @@ where if derived_block_pair.derived.hash != derived_block_id.hash { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, derived_block_number = derived_block_id.number, expected_hash = %derived_block_id.hash, actual_hash = %derived_block_pair.derived.hash, @@ -106,6 +105,7 @@ where self.tx.get::(source_block_number).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, source_block_number, %err, "Failed to get block traversal info for source block" @@ -115,6 +115,7 @@ where block_traversal.ok_or_else(|| { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, source_block_number, "source block not found" ); @@ -139,6 +140,7 @@ where if block_traversal.source.hash != source_block_id.hash { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, source_block_hash = %source_block_id.hash, "Source block hash mismatch" ); @@ -169,6 +171,7 @@ where let mut cursor = self.tx.cursor_read::().inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %err, "Failed to get cursor for DerivedBlocks" ); @@ -177,6 +180,7 @@ where let result = cursor.last().inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %err, "Failed to seek to last block" ); @@ -185,6 +189,7 @@ where let (_, block) = result.ok_or_else(|| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, "No blocks found in storage" ); StorageError::DatabaseNotInitialised @@ -193,6 +198,7 @@ where let latest_source_block = self.latest_source_block().inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %err, "Failed to get latest source block" ); @@ -224,6 +230,7 @@ where let block = self.latest_source_block_traversal().inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %err, "Failed to get latest source block traversal" ); @@ -277,6 +284,7 @@ where .inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_derived_block_pair = %incoming_pair, %err, "Failed to get derived block pair" @@ -288,6 +296,7 @@ where } else { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %latest_derivation_state, incoming_derived_block_pair = %incoming_pair, "Incoming derived block is not consistent with the latest stored derived block" @@ -300,6 +309,7 @@ where if latest_derivation_state.source != incoming_pair.source { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, latest_source_block = %latest_derivation_state.source, incoming_source = %incoming_pair.source, "Latest source block does not match the incoming derived block source" @@ -310,6 +320,7 @@ where if !latest_derivation_state.derived.is_parent_of(&incoming_pair.derived) { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, %latest_derivation_state, incoming_derived_block_pair = %incoming_pair, "Latest stored derived block is not parent of the incoming derived block" @@ -331,6 +342,7 @@ where let mut block_traversal = self.latest_source_block_traversal().inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_derived_block_pair = %incoming_pair, %err, "Failed to get latest source block traversal" @@ -341,6 +353,7 @@ where if incoming_pair.source != latest_source_block { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, latest_source_block = %latest_source_block, incoming_source = %incoming_pair.source, "Latest source block does not match the incoming derived block source" @@ -357,6 +370,7 @@ where .inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_derived_block_pair = %incoming_pair, %err, "Failed to save derived block pair" @@ -368,6 +382,7 @@ where |err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_derived_block_pair = %incoming_pair, %err, "Failed to save derived block numbers for source block" @@ -402,6 +417,7 @@ where self.get_source_block(incoming_source.number).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, incoming_source = %incoming_source, %err, "Failed to get source block" @@ -413,6 +429,7 @@ where } else { error!( target: "supervisor_storage", + chain_id = %self.chain_id, latest_source_block = %latest_source_block, incoming_source = %incoming_source, "Incoming source block is not consistent with the latest source block" @@ -424,6 +441,7 @@ where if !latest_source_block.is_parent_of(&incoming_source) { error!( target: "supervisor_storage", + chain_id = %self.chain_id, latest_source_block = %latest_source_block, incoming_source = %incoming_source, "Stored latest source block is not parent of the incoming source block" @@ -443,7 +461,7 @@ where self.tx.put::(incoming_source.number, block_traversal).inspect_err( |err| { - error!(target: "supervisor_storage", %err, "Failed to save block traversal"); + error!(target: "supervisor_storage", chain_id = %self.chain_id, %err, "Failed to save block traversal"); }, )?; @@ -487,7 +505,7 @@ where if !traversal.derived_block_numbers.is_empty() { self.tx.put::(block_pair.source.number, traversal).inspect_err( |err| { - error!(target: "supervisor_storage", %err, "Failed to update block traversal"); + error!(target: "supervisor_storage", chain_id = %self.chain_id, %err, "Failed to update block traversal"); }, )?; walk_from += 1; @@ -517,6 +535,8 @@ mod tests { }; use tempfile::TempDir; + static CHAIN_ID: ChainId = 1; + fn block_info(number: u64, parent_hash: B256, timestamp: u64) -> BlockInfo { BlockInfo { hash: B256::from([number as u8; 32]), number, parent_hash, timestamp } } @@ -544,7 +564,7 @@ mod tests { /// Helper to initialize database in a new transaction, committing if successful. fn initialize_db(db: &DatabaseEnv, pair: &DerivedRefPair) -> Result<(), StorageError> { let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let res = provider.initialise(*pair); if res.is_ok() { tx.commit().expect("Failed to commit transaction"); @@ -555,7 +575,7 @@ mod tests { /// Helper to insert a pair in a new transaction, committing if successful. fn insert_pair(db: &DatabaseEnv, pair: &DerivedRefPair) -> Result<(), StorageError> { let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let res = provider.save_derived_block(*pair); if res.is_ok() { tx.commit().expect("Failed to commit transaction"); @@ -566,7 +586,7 @@ mod tests { /// Helper to insert a source block in a new transaction, committing if successful. fn insert_source_block(db: &DatabaseEnv, source: &BlockInfo) -> Result<(), StorageError> { let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let res = provider.save_source_block(*source); if res.is_ok() { tx.commit().expect("Failed to commit transaction"); @@ -587,7 +607,7 @@ mod tests { // Check that the anchor is present let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let stored = provider.get_derived_block_pair_by_number(0).expect("should exist"); assert_eq!(stored.source.hash, anchor.source.hash); assert_eq!(stored.derived.hash, anchor.derived.hash); @@ -748,7 +768,7 @@ mod tests { // Now check latest_derived_block_at_source returns derived2 for source1 let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let source_id1 = BlockNumHash { number: source1.number, hash: source1.hash }; let latest = provider.latest_derived_block_at_source(source_id1).expect("should exist"); assert_eq!(latest.number, derived2.number); @@ -766,7 +786,7 @@ mod tests { // Use a source block that does not exist let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let source_id = BlockNumHash { number: 9999, hash: B256::from([99u8; 32]) }; let result = provider.latest_derived_block_at_source(source_id); assert!(matches!(result, Err(StorageError::EntryNotFound(_)))); @@ -783,7 +803,7 @@ mod tests { // Use correct number but wrong hash let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let wrong_hash = B256::from([123u8; 32]); let source_id = BlockNumHash { number: source1.number, hash: wrong_hash }; let result = provider.latest_derived_block_at_source(source_id); @@ -804,7 +824,7 @@ mod tests { assert!(insert_pair(&db, &pair2).is_ok()); let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let latest = provider.latest_derivation_state().expect("should exist"); assert_eq!(latest, pair2); @@ -815,7 +835,7 @@ mod tests { let db = setup_db(); let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let result = provider.latest_derivation_state(); print!("{:?}", result); @@ -843,7 +863,7 @@ mod tests { let source3 = block_info(102, source2.hash, 400); assert!(insert_source_block(&db, &source3).is_ok()); let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let latest = provider.latest_derivation_state().expect("should exist"); let expected_derivation_state = DerivedRefPair { source: source3, derived: derived2 }; @@ -857,7 +877,7 @@ mod tests { .expect("Failed to init database"); let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); assert!(matches!( provider.latest_derivation_state(), Err(StorageError::DatabaseNotInitialised) @@ -874,7 +894,7 @@ mod tests { assert!(initialize_db(&db, &pair1).is_ok()); let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let derived_block_id = BlockNumHash { number: derived1.number, hash: derived1.hash }; let source = provider.derived_to_source(derived_block_id).expect("should exist"); @@ -886,7 +906,7 @@ mod tests { let db = setup_db(); let tx = db.tx().expect("Could not get tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let derived_block_id = BlockNumHash { number: 9999, hash: B256::from([9u8; 32]) }; let result = provider.derived_to_source(derived_block_id); @@ -1013,7 +1033,7 @@ mod tests { let derived1 = block_info(100, derived0.hash, 200); let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let mut block_traversal = provider.get_block_traversal(source1.number).expect("should exist"); block_traversal.derived_block_numbers.push(derived1.number); @@ -1024,7 +1044,7 @@ mod tests { fn test_rewind_block_to_success() { let db = setup_db(); let tx = db.tx_mut().expect("Failed to get mutable tx"); - let provider = DerivationProvider::new(&tx); + let provider = DerivationProvider::new(&tx, CHAIN_ID); let derived_genesis = block_info(9, genesis_block().hash, 201); provider .initialise(derived_pair(genesis_block(), derived_genesis)) diff --git a/crates/supervisor/storage/src/providers/head_ref_provider.rs b/crates/supervisor/storage/src/providers/head_ref_provider.rs index 3074f77537..fb0f59f47a 100644 --- a/crates/supervisor/storage/src/providers/head_ref_provider.rs +++ b/crates/supervisor/storage/src/providers/head_ref_provider.rs @@ -1,19 +1,17 @@ //! Provider for tracking block safety head reference use crate::{StorageError, models::SafetyHeadRefs}; +use alloy_primitives::ChainId; +use derive_more::Constructor; use kona_protocol::BlockInfo; use op_alloy_consensus::interop::SafetyLevel; use reth_db_api::transaction::{DbTx, DbTxMut}; use tracing::{error, warn}; /// A Safety Head Reference storage that wraps transactional reference. +#[derive(Debug, Constructor)] pub(crate) struct SafetyHeadRefProvider<'tx, TX> { tx: &'tx TX, -} - -impl<'tx, TX> SafetyHeadRefProvider<'tx, TX> { - pub(crate) const fn new(tx: &'tx TX) -> Self { - Self { tx } - } + chain_id: ChainId, } impl SafetyHeadRefProvider<'_, TX> @@ -28,6 +26,7 @@ where let result = self.tx.get::(head_ref_key).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %safety_level, %err, "Failed to seek head reference" @@ -56,6 +55,7 @@ where if current_head_ref.number > incoming_head_ref.number { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, %current_head_ref, %incoming_head_ref, %safety_level, @@ -70,6 +70,7 @@ where .inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %incoming_head_ref, %safety_level, %err, @@ -107,6 +108,7 @@ where .inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %incoming_head_ref, %safety_level, %err, @@ -129,6 +131,8 @@ mod tests { use reth_db_api::Database; use tempfile::TempDir; + static CHAIN_ID: ChainId = 1; + fn setup_db() -> DatabaseEnv { let temp_dir = TempDir::new().expect("Could not create temp dir"); init_db_for::<_, Tables>(temp_dir.path(), DatabaseArguments::default()) @@ -141,7 +145,7 @@ mod tests { // Create write transaction first let write_tx = db.tx_mut().expect("Failed to create write transaction"); - let write_provider = SafetyHeadRefProvider::new(&write_tx); + let write_provider = SafetyHeadRefProvider::new(&write_tx, CHAIN_ID); // Initially, there should be no head ref let result = write_provider.get_safety_head_ref(SafetyLevel::CrossSafe); @@ -158,7 +162,7 @@ mod tests { // Create a new read transaction to verify let tx = db.tx().expect("Failed to create transaction"); - let provider = SafetyHeadRefProvider::new(&tx); + let provider = SafetyHeadRefProvider::new(&tx, CHAIN_ID); let result = provider.get_safety_head_ref(SafetyLevel::CrossSafe).expect("Failed to get head ref"); assert_eq!(result, block_info); @@ -168,7 +172,7 @@ mod tests { fn test_safety_head_ref_update() { let db = setup_db(); let write_tx = db.tx_mut().expect("Failed to create write transaction"); - let write_provider = SafetyHeadRefProvider::new(&write_tx); + let write_provider = SafetyHeadRefProvider::new(&write_tx, CHAIN_ID); // Create initial block info let initial_block_info = BlockInfo { @@ -198,7 +202,7 @@ mod tests { // Verify the updated value let tx = db.tx().expect("Failed to create transaction"); - let provider = SafetyHeadRefProvider::new(&tx); + let provider = SafetyHeadRefProvider::new(&tx, CHAIN_ID); let result = provider.get_safety_head_ref(SafetyLevel::CrossSafe).expect("Failed to get head ref"); assert_eq!(result, updated_block_info); @@ -208,7 +212,7 @@ mod tests { fn test_reset_safety_head_ref_if_ahead() { let db = setup_db(); let tx = db.tx_mut().expect("Failed to start write tx"); - let provider = SafetyHeadRefProvider::new(&tx); + let provider = SafetyHeadRefProvider::new(&tx, CHAIN_ID); // Set initial head at 100 let head_100 = BlockInfo { @@ -246,7 +250,7 @@ mod tests { fn test_reset_safety_head_ref_should_ignore_future_data() { let db = setup_db(); let tx = db.tx_mut().expect("Failed to start write tx"); - let provider = SafetyHeadRefProvider::new(&tx); + let provider = SafetyHeadRefProvider::new(&tx, CHAIN_ID); // Set initial head at 100 let head_100 = BlockInfo { diff --git a/crates/supervisor/storage/src/providers/log_provider.rs b/crates/supervisor/storage/src/providers/log_provider.rs index 355d751ddf..c1dde0d1a6 100644 --- a/crates/supervisor/storage/src/providers/log_provider.rs +++ b/crates/supervisor/storage/src/providers/log_provider.rs @@ -18,6 +18,8 @@ use crate::{ models::{BlockRefs, LogEntries}, }; use alloy_eips::BlockNumHash; +use alloy_primitives::ChainId; +use derive_more::Constructor; use kona_protocol::BlockInfo; use kona_supervisor_types::Log; use reth_db_api::{ @@ -28,16 +30,10 @@ use std::fmt::Debug; use tracing::{debug, error, warn}; /// A log storage that wraps a transactional reference to the MDBX backend. -#[derive(Debug)] +#[derive(Debug, Constructor)] pub(crate) struct LogProvider<'tx, TX> { tx: &'tx TX, -} - -/// Internal constructor and setup methods for [`LogProvider`]. -impl<'tx, TX> LogProvider<'tx, TX> { - pub(crate) const fn new(tx: &'tx TX) -> Self { - Self { tx } - } + chain_id: ChainId, } impl LogProvider<'_, TX> @@ -61,7 +57,12 @@ where block: &BlockInfo, logs: Vec, ) -> Result<(), StorageError> { - debug!(target: "supervisor_storage", block_number = block.number, "Storing logs"); + debug!( + target: "supervisor_storage", + chain_id = %self.chain_id, + block_number = block.number, + "Storing logs", + ); let latest_block = match self.get_latest_block() { Ok(block) => block, @@ -78,6 +79,7 @@ where } error!( target: "supervisor_storage", + chain_id = %self.chain_id, %stored_block, incoming_block = %block, "Incoming log block is not consistent with the stored log block", @@ -88,6 +90,7 @@ where if !latest_block.is_parent_of(block) { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, %latest_block, incoming_block = %block, "Incoming block does not follow latest stored block" @@ -104,17 +107,29 @@ where logs: Vec, ) -> Result<(), StorageError> { self.tx.put::(block.number, (*block).into()).inspect_err(|err| { - error!(target: "supervisor_storage", block_number = block.number, %err, "Failed to insert block"); + error!( + target: "supervisor_storage", + chain_id = %self.chain_id, + block_number = block.number, + %err, + "Failed to insert block" + ); })?; let mut cursor = self.tx.cursor_dup_write::().inspect_err(|err| { - error!(target: "supervisor_storage", %err, "Failed to get dup cursor"); + error!( + target: "supervisor_storage", + chain_id = %self.chain_id, + %err, + "Failed to get dup cursor" + ); })?; for log in logs { cursor.append_dup(block.number, log.into()).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number = block.number, %err, "Failed to append logs" @@ -134,6 +149,7 @@ where if key == block.number && block.hash != stored_block.hash { error!( target: "supervisor_storage", + chain_id = %self.chain_id, %stored_block, incoming_block = ?block, "Requested block to rewind does not match stored block", @@ -152,11 +168,17 @@ where TX: DbTx, { pub(crate) fn get_block(&self, block_number: u64) -> Result { - debug!(target: "supervisor_storage", block_number, "Fetching block"); + debug!( + target: "supervisor_storage", + chain_id = %self.chain_id, + block_number, + "Fetching block" + ); let block_option = self.tx.get::(block_number).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number, %err, "Failed to read block", @@ -164,25 +186,44 @@ where })?; let block = block_option.ok_or_else(|| { - warn!(target: "supervisor_storage", block_number, "Block not found"); + warn!( + target: "supervisor_storage", + chain_id = %self.chain_id, + block_number, + "Block not found" + ); StorageError::EntryNotFound(format!("block {block_number} not found")) })?; Ok(block.into()) } pub(crate) fn get_latest_block(&self) -> Result { - debug!(target: "supervisor_storage", "Fetching latest block"); + debug!(target: "supervisor_storage", chain_id = %self.chain_id, "Fetching latest block"); let mut cursor = self.tx.cursor_read::().inspect_err(|err| { - error!(target: "supervisor_storage", %err, "Failed to get cursor"); + error!( + target: "supervisor_storage", + chain_id = %self.chain_id, + %err, + "Failed to get cursor" + ); })?; let result = cursor.last().inspect_err(|err| { - error!(target: "supervisor_storage", %err, "Failed to seek to last block"); + error!( + target: "supervisor_storage", + chain_id = %self.chain_id, + %err, + "Failed to seek to last block" + ); })?; let (_, block) = result.ok_or_else(|| { - warn!(target: "supervisor_storage", "No blocks found in storage"); + warn!( + target: "supervisor_storage", + chain_id = %self.chain_id, + "No blocks found in storage" + ); StorageError::DatabaseNotInitialised })?; Ok(block.into()) @@ -191,18 +232,25 @@ where pub(crate) fn get_log(&self, block_number: u64, log_index: u32) -> Result { debug!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number, log_index, "Fetching block by log" ); let mut cursor = self.tx.cursor_dup_read::().inspect_err(|err| { - error!(target: "supervisor_storage", %err, "Failed to get cursor for LogEntries"); + error!( + target: "supervisor_storage", + chain_id = %self.chain_id, + %err, + "Failed to get cursor for LogEntries" + ); })?; let result = cursor.seek_by_key_subkey(block_number, log_index).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number, log_index, %err, @@ -213,6 +261,7 @@ where let log_entry = result.ok_or_else(|| { warn!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number, log_index, "Log not found" @@ -227,15 +276,21 @@ where } pub(crate) fn get_logs(&self, block_number: u64) -> Result, StorageError> { - debug!(target: "supervisor_storage", block_number, "Fetching logs"); + debug!(target: "supervisor_storage", chain_id = %self.chain_id, block_number, "Fetching logs"); let mut cursor = self.tx.cursor_dup_read::().inspect_err(|err| { - error!(target: "supervisor_storage", %err, "Failed to get dup cursor"); + error!( + target: "supervisor_storage", + chain_id = %self.chain_id, + %err, + "Failed to get dup cursor" + ); })?; let walker = cursor.walk_range(block_number..=block_number).inspect_err(|err| { error!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number, %err, "Failed to walk dup range", @@ -249,6 +304,7 @@ where Err(err) => { error!( target: "supervisor_storage", + chain_id = %self.chain_id, block_number, %err, "Failed to read log entry", @@ -275,6 +331,8 @@ mod tests { use reth_db_api::Database; use tempfile::TempDir; + static CHAIN_ID: ChainId = 1; + fn genesis_block() -> BlockInfo { BlockInfo { hash: B256::from([0u8; 32]), @@ -321,7 +379,7 @@ mod tests { /// Helper to initialize database in a new transaction, committing if successful. fn initialize_db(db: &DatabaseEnv, block: &BlockInfo) -> Result<(), StorageError> { let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = LogProvider::new(&tx); + let provider = LogProvider::new(&tx, CHAIN_ID); let res = provider.initialise(*block); if res.is_ok() { tx.commit().expect("Failed to commit transaction"); @@ -338,7 +396,7 @@ mod tests { logs: Vec, ) -> Result<(), StorageError> { let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = LogProvider::new(&tx); + let provider = LogProvider::new(&tx, CHAIN_ID); let res = provider.store_block_logs(block, logs); if res.is_ok() { tx.commit().expect("Failed to commit transaction"); @@ -356,7 +414,7 @@ mod tests { // Check that the anchor is present let tx = db.tx().expect("Could not get tx"); - let provider = LogProvider::new(&tx); + let provider = LogProvider::new(&tx, CHAIN_ID); let stored = provider.get_block(genesis.number).expect("should exist"); assert_eq!(stored.hash, genesis.hash); } @@ -394,7 +452,7 @@ mod tests { let db = setup_db(); let tx = db.tx().expect("Failed to start RO tx"); - let log_reader = LogProvider::new(&tx); + let log_reader = LogProvider::new(&tx, CHAIN_ID); let result = log_reader.get_latest_block(); assert!(matches!(result, Err(StorageError::DatabaseNotInitialised))); @@ -432,7 +490,7 @@ mod tests { assert!(insert_block_logs(&db, &block3, logs3).is_ok()); let tx = db.tx().expect("Failed to start RO tx"); - let log_reader = LogProvider::new(&tx); + let log_reader = LogProvider::new(&tx, CHAIN_ID); // get_block let block = log_reader.get_block(block2.number).expect("Failed to get block"); @@ -458,7 +516,7 @@ mod tests { let db = setup_db(); let tx = db.tx().expect("Failed to start RO tx"); - let log_reader = LogProvider::new(&tx); + let log_reader = LogProvider::new(&tx, CHAIN_ID); let result = log_reader.get_latest_block(); assert!(matches!(result, Err(StorageError::DatabaseNotInitialised))); @@ -524,7 +582,7 @@ mod tests { // Check that the logs are still present and correct let tx = db.tx().expect("Failed to start RO tx"); - let log_reader = LogProvider::new(&tx); + let log_reader = LogProvider::new(&tx, CHAIN_ID); let logs = log_reader.get_logs(block1.number).expect("Should get logs"); assert_eq!(logs, logs1); } @@ -572,12 +630,12 @@ mod tests { // Rewind to block 3, blocks 3, 4, 5 should be removed let tx = db.tx_mut().expect("Could not get mutable tx"); - let provider = LogProvider::new(&tx); + let provider = LogProvider::new(&tx, CHAIN_ID); provider.rewind_to(&blocks[3].id()).expect("Failed to rewind blocks"); tx.commit().expect("Failed to commit rewind"); let tx = db.tx().expect("Could not get RO tx"); - let provider = LogProvider::new(&tx); + let provider = LogProvider::new(&tx, CHAIN_ID); // Blocks 0,1,2 should still exist for i in 0..=2 { @@ -616,7 +674,7 @@ mod tests { conflicting_block1.hash = B256::from([0xAB; 32]); // different hash let tx = db.tx_mut().expect("Failed to get tx"); - let provider = LogProvider::new(&tx); + let provider = LogProvider::new(&tx, CHAIN_ID); let result = provider.rewind_to(&conflicting_block1.id()); assert!(