diff --git a/crates/supervisor/core/src/chain_processor/chain.rs b/crates/supervisor/core/src/chain_processor/chain.rs index ce46151d44..a94f9e7a5c 100644 --- a/crates/supervisor/core/src/chain_processor/chain.rs +++ b/crates/supervisor/core/src/chain_processor/chain.rs @@ -1,5 +1,5 @@ use super::{ChainProcessorError, ChainProcessorTask}; -use crate::{event::ChainEvent, syncnode::ManagedNodeProvider}; +use crate::{config::RollupConfig, event::ChainEvent, syncnode::ManagedNodeProvider}; use alloy_primitives::ChainId; use kona_supervisor_storage::{ DerivationStorageWriter, HeadRefStorageWriter, LogStorageReader, LogStorageWriter, @@ -18,6 +18,9 @@ use tracing::warn; // chain processor will support multiple managed nodes in the future. #[derive(Debug)] pub struct ChainProcessor { + // The rollup configuration for the chain + rollup_config: RollupConfig, + // The chainId that this processor is associated with chain_id: ChainId, @@ -51,6 +54,7 @@ where { /// Creates a new instance of [`ChainProcessor`]. pub fn new( + rollup_config: RollupConfig, chain_id: ChainId, managed_node: Arc

, state_manager: Arc, @@ -58,6 +62,7 @@ where ) -> Self { // todo: validate chain_id against managed_node Self { + rollup_config, chain_id, event_tx: None, metrics_enabled: None, @@ -94,11 +99,12 @@ where } // todo: figure out value for buffer size - let (event_tx, event_rx) = mpsc::channel::(100); + let (event_tx, event_rx) = mpsc::channel::(1000); self.event_tx = Some(event_tx.clone()); self.managed_node.start_subscription(event_tx.clone()).await?; let mut task = ChainProcessorTask::new( + self.rollup_config.clone(), self.chain_id, self.managed_node.clone(), self.state_manager.clone(), @@ -235,6 +241,11 @@ mod tests { pub Db {} impl LogStorageWriter for Db { + fn initialise_log_storage( + &self, + block: BlockInfo, + ) -> Result<(), StorageError>; + fn store_block_logs( &self, block: &BlockInfo, @@ -250,6 +261,11 @@ mod tests { } impl DerivationStorageWriter for Db { + fn initialise_derivation_storage( + &self, + incoming_pair: DerivedRefPair, + ) -> Result<(), StorageError>; + fn save_derived_block( &self, incoming_pair: DerivedRefPair, @@ -285,8 +301,14 @@ mod tests { let storage = Arc::new(MockDb::new()); let cancel_token = CancellationToken::new(); - let mut processor = - ChainProcessor::new(1, Arc::clone(&mock_node), Arc::clone(&storage), cancel_token); + let rollup_config = RollupConfig::default(); + let mut processor = ChainProcessor::new( + rollup_config, + 1, + Arc::clone(&mock_node), + Arc::clone(&storage), + cancel_token, + ); assert!(processor.start().await.is_ok()); diff --git a/crates/supervisor/core/src/chain_processor/task.rs b/crates/supervisor/core/src/chain_processor/task.rs index d4471cd7f1..792eca941e 100644 --- a/crates/supervisor/core/src/chain_processor/task.rs +++ b/crates/supervisor/core/src/chain_processor/task.rs @@ -1,5 +1,8 @@ use super::Metrics; -use crate::{ChainProcessorError, LogIndexer, event::ChainEvent, syncnode::ManagedNodeProvider}; +use crate::{ + ChainProcessorError, LogIndexer, config::RollupConfig, event::ChainEvent, + syncnode::ManagedNodeProvider, +}; use alloy_primitives::ChainId; use kona_interop::{BlockReplacement, DerivedRefPair}; use kona_protocol::BlockInfo; @@ -15,6 +18,7 @@ use tracing::{debug, error, info}; /// It listens for events emitted by the managed node and handles them accordingly. #[derive(Debug)] pub struct ChainProcessorTask { + _rollup_config: RollupConfig, chain_id: ChainId, metrics_enabled: Option, @@ -41,6 +45,7 @@ where { /// Creates a new [`ChainProcessorTask`]. pub fn new( + rollup_config: RollupConfig, chain_id: u64, managed_node: Arc

, state_manager: Arc, @@ -49,6 +54,7 @@ where ) -> Self { let log_indexer = LogIndexer::new(managed_node.clone(), state_manager.clone()); Self { + _rollup_config: rollup_config, chain_id, metrics_enabled: None, cancel_token, @@ -288,7 +294,7 @@ where ); match self.state_manager.save_source_block(origin) { Ok(_) => Ok(()), - Err(StorageError::BlockOutOfOrder) => { + Err(StorageError::BlockOutOfOrder | StorageError::ConflictError(_)) => { error!( target: "chain_processor", chain_id = self.chain_id, @@ -322,7 +328,7 @@ where ); match self.state_manager.save_derived_block(derived_ref_pair) { Ok(_) => Ok(derived_ref_pair.derived), - Err(StorageError::BlockOutOfOrder) => { + Err(StorageError::BlockOutOfOrder | StorageError::ConflictError(_)) => { error!( target: "chain_processor", chain_id = self.chain_id, @@ -488,6 +494,11 @@ mod tests { pub Db {} impl LogStorageWriter for Db { + fn initialise_log_storage( + &self, + block: BlockInfo, + ) -> Result<(), StorageError>; + fn store_block_logs( &self, block: &BlockInfo, @@ -503,6 +514,11 @@ mod tests { } impl DerivationStorageWriter for Db { + fn initialise_derivation_storage( + &self, + incoming_pair: DerivedRefPair, + ) -> Result<(), StorageError>; + fn save_derived_block( &self, incoming_pair: DerivedRefPair, @@ -552,7 +568,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); tx.send(ChainEvent::UnsafeBlock { block }).await.unwrap(); @@ -597,7 +621,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); // Send unsafe block event tx.send(ChainEvent::DerivedBlock { derived_ref_pair: block_pair }).await.unwrap(); @@ -632,7 +664,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); // Send derivation origin update event tx.send(ChainEvent::DerivationOriginUpdate { origin }).await.unwrap(); @@ -678,7 +718,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); // Send FinalizedSourceUpdate event tx.send(ChainEvent::FinalizedSourceUpdate { finalized_source_block }).await.unwrap(); @@ -715,7 +763,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); // Send FinalizedSourceUpdate event tx.send(ChainEvent::FinalizedSourceUpdate { finalized_source_block }).await.unwrap(); @@ -749,7 +805,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); // Send derivation origin update event tx.send(ChainEvent::CrossUnsafeUpdate { block }).await.unwrap(); @@ -786,7 +850,15 @@ mod tests { let cancel_token = CancellationToken::new(); let (tx, rx) = mpsc::channel(10); - let task = ChainProcessorTask::new(1, managed_node, writer, cancel_token.clone(), rx); + let rollup_config = RollupConfig::default(); + let task = ChainProcessorTask::new( + rollup_config, + 1, + managed_node, + writer, + cancel_token.clone(), + rx, + ); // Send derivation origin update event tx.send(ChainEvent::CrossSafeUpdate { diff --git a/crates/supervisor/core/src/config/rollup_config_set.rs b/crates/supervisor/core/src/config/rollup_config_set.rs index 3483840f8a..3ce6437484 100644 --- a/crates/supervisor/core/src/config/rollup_config_set.rs +++ b/crates/supervisor/core/src/config/rollup_config_set.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use crate::SupervisorError; /// Genesis provides the genesis information relevant for Interop. -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone)] pub struct Genesis { /// The L1 [`BlockInfo`] that the rollup starts after. pub l1: BlockInfo, @@ -36,7 +36,7 @@ impl Genesis { } /// RollupConfig contains the configuration for the Optimism rollup. -#[derive(Debug, Clone)] +#[derive(Debug, Default, Clone)] pub struct RollupConfig { /// Genesis anchor information for the rollup. pub genesis: Genesis, diff --git a/crates/supervisor/core/src/logindexer/indexer.rs b/crates/supervisor/core/src/logindexer/indexer.rs index b158ce0ba8..2cfd30d6c0 100644 --- a/crates/supervisor/core/src/logindexer/indexer.rs +++ b/crates/supervisor/core/src/logindexer/indexer.rs @@ -167,6 +167,7 @@ mod tests { pub Db {} impl LogStorageWriter for Db { + fn initialise_log_storage(&self, _block: BlockInfo) -> Result<(), StorageError>; fn store_block_logs(&self, block: &BlockInfo, logs: Vec) -> Result<(), StorageError>; } diff --git a/crates/supervisor/core/src/supervisor.rs b/crates/supervisor/core/src/supervisor.rs index 2be40314eb..038683573c 100644 --- a/crates/supervisor/core/src/supervisor.rs +++ b/crates/supervisor/core/src/supervisor.rs @@ -11,8 +11,8 @@ use kona_interop::{ }; use kona_protocol::BlockInfo; use kona_supervisor_storage::{ - ChainDb, ChainDbFactory, DerivationStorageReader, FinalizedL1Storage, HeadRefStorageReader, - LogStorageReader, + ChainDb, ChainDbFactory, DerivationStorageReader, DerivationStorageWriter, FinalizedL1Storage, + HeadRefStorageReader, LogStorageReader, LogStorageWriter, }; use kona_supervisor_types::{SuperHead, parse_access_list}; use op_alloy_rpc_types::SuperchainDAError; @@ -142,7 +142,9 @@ impl Supervisor { for (chain_id, config) in self.config.rollup_config_set.rollups.iter() { // Initialise the database for each chain. let db = self.database_factory.get_or_create_db(*chain_id)?; - db.initialise(config.genesis.get_anchor())?; + let anchor = config.genesis.get_anchor(); + db.initialise_log_storage(anchor.derived)?; + db.initialise_derivation_storage(anchor)?; info!(target: "supervisor_service", chain_id, "Database initialized successfully"); } Ok(()) @@ -159,9 +161,19 @@ impl Supervisor { chain_id )))?; + let rollup_config = + self.config.rollup_config_set.get(*chain_id).ok_or(SupervisorError::Initialise( + format!("no rollup config found for chain {}", chain_id), + ))?; + // initialise chain processor for the chain. - let mut processor = - ChainProcessor::new(*chain_id, managed_node.clone(), db, self.cancel_token.clone()); + let mut processor = ChainProcessor::new( + rollup_config.clone(), + *chain_id, + managed_node.clone(), + db, + self.cancel_token.clone(), + ); // todo: enable metrics only if configured processor = processor.with_metrics(); diff --git a/crates/supervisor/storage/src/chaindb.rs b/crates/supervisor/storage/src/chaindb.rs index d0255fdc38..0ffa0f5a48 100644 --- a/crates/supervisor/storage/src/chaindb.rs +++ b/crates/supervisor/storage/src/chaindb.rs @@ -67,17 +67,9 @@ impl ChainDb { f() } } - - /// initialises the database with a given anchor derived block pair. - pub fn initialise(&self, anchor: DerivedRefPair) -> Result<(), StorageError> { - self.env.update(|tx| { - DerivationProvider::new(tx).initialise(anchor)?; - LogProvider::new(tx).initialise(anchor.derived)?; - SafetyHeadRefProvider::new(tx).initialise(anchor.derived) - })? - } } +// todo: make sure all get method return DatabaseNotInitialised error if db is not initialised impl DerivationStorageReader for ChainDb { fn derived_to_source(&self, derived_block_id: BlockNumHash) -> Result { self.observe_call("derived_to_source", || { @@ -97,16 +89,30 @@ impl DerivationStorageReader for ChainDb { } fn latest_derivation_state(&self) -> Result { - self.observe_call("latest_derived_block_pair", || { + self.observe_call("latest_derivation_state", || { self.env.view(|tx| DerivationProvider::new(tx).latest_derivation_state()) })? } } impl DerivationStorageWriter for ChainDb { - // Todo: better name save_derived_block_pair + fn initialise_derivation_storage( + &self, + incoming_pair: DerivedRefPair, + ) -> Result<(), StorageError> { + self.observe_call("initialise_derivation_storage", || { + self.env.update(|ctx| { + DerivationProvider::new(ctx).initialise(incoming_pair)?; + SafetyHeadRefProvider::new(ctx) + .update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived)?; + SafetyHeadRefProvider::new(ctx) + .update_safety_head_ref(SafetyLevel::CrossSafe, &incoming_pair.derived) + }) + })? + } + fn save_derived_block(&self, incoming_pair: DerivedRefPair) -> Result<(), StorageError> { - self.observe_call("save_derived_block_pair", || { + self.observe_call("save_derived_block", || { self.env.update(|ctx| { let derived_block = incoming_pair.derived; let block = LogProvider::new(ctx).get_block(derived_block.number).map_err( @@ -123,7 +129,7 @@ impl DerivationStorageWriter for ChainDb { "conflict between unsafe block and derived block".to_string(), )); } - DerivationProvider::new(ctx).save_derived_block_pair(incoming_pair)?; + DerivationProvider::new(ctx).save_derived_block(incoming_pair)?; SafetyHeadRefProvider::new(ctx) .update_safety_head_ref(SafetyLevel::LocalSafe, &incoming_pair.derived) }) @@ -131,12 +137,13 @@ impl DerivationStorageWriter for ChainDb { } fn save_source_block(&self, incoming_source: BlockInfo) -> Result<(), StorageError> { - self.observe_call("save_block_traversal", || { + self.observe_call("save_source_block", || { self.env.update(|ctx| DerivationProvider::new(ctx).save_source_block(incoming_source)) })? } } +// todo: make sure all get method return DatabaseNotInitialised error if db is not initialised impl LogStorageReader for ChainDb { fn get_latest_block(&self) -> Result { self.observe_call("get_latest_block", || { @@ -164,6 +171,18 @@ impl LogStorageReader for ChainDb { } impl LogStorageWriter for ChainDb { + fn initialise_log_storage(&self, block: BlockInfo) -> Result<(), StorageError> { + self.observe_call("initialise_log_storage", || { + self.env.update(|ctx| { + LogProvider::new(ctx).initialise(block)?; + SafetyHeadRefProvider::new(ctx) + .update_safety_head_ref(SafetyLevel::LocalUnsafe, &block)?; + SafetyHeadRefProvider::new(ctx) + .update_safety_head_ref(SafetyLevel::CrossUnsafe, &block) + }) + })? + } + fn store_block_logs(&self, block: &BlockInfo, logs: Vec) -> Result<(), StorageError> { self.observe_call("store_block_logs", || { self.env.update(|ctx| { @@ -214,28 +233,31 @@ impl HeadRefStorageWriter for ChainDb { &self, finalized_source_block: BlockInfo, ) -> Result { - self.env.update(|tx| { - let sp = SafetyHeadRefProvider::new(tx); - let safe = sp.get_safety_head_ref(SafetyLevel::CrossSafe)?; - - let dp = DerivationProvider::new(tx); - let safe_block_pair = dp.get_derived_block_pair(safe.id())?; - - if finalized_source_block.number >= safe_block_pair.source.number { - // this could happen during initial sync - warn!( - target: "supervisor_storage", - l1_finalized_block_number = finalized_source_block.number, - safe_source_block_number = safe_block_pair.source.number, - "L1 finalized block is greater than safe block", - ); - sp.update_safety_head_ref(SafetyLevel::Finalized, &safe)?; - return Ok(safe); - } - - let latest_derived = dp.latest_derived_block_at_source(finalized_source_block.id())?; - sp.update_safety_head_ref(SafetyLevel::Finalized, &latest_derived)?; - Ok(latest_derived) + self.observe_call("update_finalized_using_source", || { + self.env.update(|tx| { + let sp = SafetyHeadRefProvider::new(tx); + let safe = sp.get_safety_head_ref(SafetyLevel::CrossSafe)?; + + let dp = DerivationProvider::new(tx); + let safe_block_pair = dp.get_derived_block_pair(safe.id())?; + + if finalized_source_block.number >= safe_block_pair.source.number { + // this could happen during initial sync + warn!( + target: "supervisor_storage", + l1_finalized_block_number = finalized_source_block.number, + safe_source_block_number = safe_block_pair.source.number, + "L1 finalized block is greater than safe block", + ); + sp.update_safety_head_ref(SafetyLevel::Finalized, &safe)?; + return Ok(safe); + } + + let latest_derived = + dp.latest_derived_block_at_source(finalized_source_block.id())?; + sp.update_safety_head_ref(SafetyLevel::Finalized, &latest_derived)?; + Ok(latest_derived) + }) })? } @@ -414,7 +436,8 @@ mod tests { }, }; - db.initialise(anchor).expect("initialise db"); + db.initialise_log_storage(anchor.derived).expect("initialise log storage"); + db.initialise_derivation_storage(anchor).expect("initialise derivation storage"); let block = BlockInfo { hash: B256::from([4u8; 32]), @@ -441,6 +464,39 @@ mod tests { assert_eq!(log, logs[1], "Block by log should match stored block"); } + #[test] + fn test_super_head_empty() { + let tmp_dir = TempDir::new().expect("create temp dir"); + let db_path = tmp_dir.path().join("chaindb_super_head_empty"); + let db = ChainDb::new(1, &db_path).expect("create db"); + + // Get super head when no blocks are stored + let err = db.get_super_head().unwrap_err(); + assert!(matches!(err, StorageError::DatabaseNotInitialised)); + } + + #[test] + fn test_latest_derivation_state_empty() { + let tmp_dir = TempDir::new().expect("create temp dir"); + let db_path = tmp_dir.path().join("chaindb_latest_derivation_empty"); + let db = ChainDb::new(1, &db_path).expect("create db"); + + // Get latest derivation state when no blocks are stored + let err = db.latest_derivation_state().unwrap_err(); + assert!(matches!(err, StorageError::DatabaseNotInitialised)); + } + + #[test] + fn test_get_latest_block_empty() { + let tmp_dir = TempDir::new().expect("create temp dir"); + let db_path = tmp_dir.path().join("chaindb_latest_block_empty"); + let db = ChainDb::new(1, &db_path).expect("create db"); + + // Get latest block when no blocks are stored + let err = db.get_latest_block().unwrap_err(); + assert!(matches!(err, StorageError::DatabaseNotInitialised)); + } + #[test] fn test_derivation_storage() { let tmp_dir = TempDir::new().expect("create temp dir"); @@ -479,7 +535,8 @@ mod tests { }; // Initialise the database with the anchor derived block pair - db.initialise(anchor).expect("initialise db with anchor"); + db.initialise_log_storage(anchor.derived).expect("initialise log storage"); + db.initialise_derivation_storage(anchor).expect("initialise derivation storage"); // Save derived block pair - should error conflict let err = db.save_derived_block(derived_pair).unwrap_err(); @@ -545,7 +602,9 @@ mod tests { timestamp: 1, }; - db.initialise(DerivedRefPair { source, derived: block1 }).unwrap(); + db.initialise_log_storage(block1).expect("initialise log storage"); + db.initialise_derivation_storage(DerivedRefPair { source, derived: block1 }) + .expect("initialise derivation storage"); // should error as block2 must be child of block1 let err = db.update_current_cross_unsafe(&block2).expect_err("should return an error"); @@ -585,7 +644,9 @@ mod tests { timestamp: 1, }; - db.initialise(DerivedRefPair { source, derived: block1 }).unwrap(); + db.initialise_log_storage(block1).expect("initialise log storage"); + db.initialise_derivation_storage(DerivedRefPair { source, derived: block1 }) + .expect("initialise derivation storage"); // should error as block2 must be child of block1 let err = db.update_current_cross_safe(&block2).expect_err("should return an error"); @@ -634,7 +695,10 @@ mod tests { timestamp: 9101, }; - assert!(db.initialise(DerivedRefPair { source: source1, derived: derived1 }).is_ok()); + db.initialise_log_storage(derived1).expect("initialise log storage"); + db.initialise_derivation_storage(DerivedRefPair { source: source1, derived: derived1 }) + .expect("initialise derivation storage"); + assert!(db.save_source_block(source2).is_ok()); // Retrieve latest source block @@ -662,7 +726,9 @@ mod tests { timestamp: 1234, }, }; - assert!(db.initialise(anchor).is_ok()); + + db.initialise_log_storage(anchor.derived).expect("initialise log storage"); + db.initialise_derivation_storage(anchor).expect("initialise derivation storage"); let source1 = BlockInfo { hash: B256::from([2u8; 32]), diff --git a/crates/supervisor/storage/src/metrics.rs b/crates/supervisor/storage/src/metrics.rs index f2f4d5c7fc..06f7759f81 100644 --- a/crates/supervisor/storage/src/metrics.rs +++ b/crates/supervisor/storage/src/metrics.rs @@ -14,25 +14,26 @@ impl Metrics { "kona_supervisor_storage_duration_seconds"; // List all your ChainDb method names here - const METHODS: [&'static str; 18] = [ + const METHODS: [&'static str; 19] = [ "derived_to_source", "latest_derived_block_at_source", - "latest_derived_block_pair", - "save_derived_block_pair", + "latest_derivation_state", + "initialise_derivation_storage", + "save_derived_block", + "save_source_block", "get_latest_block", "get_block", "get_log", "get_logs", + "initialise_log_storage", "store_block_logs", - "get_current_l1", "get_safety_head_ref", "get_super_head", - "update_current_l1", - "update_safety_head_ref", - "update_finalized_l1", - "get_finalized_l1", + "update_finalized_using_source", "update_current_cross_unsafe", "update_current_cross_safe", + "update_finalized_l1", + "get_finalized_l1", // Add more as needed ]; diff --git a/crates/supervisor/storage/src/providers/derivation_provider.rs b/crates/supervisor/storage/src/providers/derivation_provider.rs index 01c32aa32a..115e8296c1 100644 --- a/crates/supervisor/storage/src/providers/derivation_provider.rs +++ b/crates/supervisor/storage/src/providers/derivation_provider.rs @@ -189,7 +189,7 @@ where target: "supervisor_storage", "No blocks found in storage" ); - StorageError::EntryNotFound("no blocks found".to_string()) + StorageError::DatabaseNotInitialised })?; let latest_source_block = self.latest_source_block().inspect_err(|err| { @@ -252,7 +252,7 @@ where Ok(_) => Err(StorageError::InvalidAnchor), Err(StorageError::EntryNotFound(_)) => { self.save_source_block_internal(anchor.source)?; - self.save_derived_block_pair_internal(anchor)?; + self.save_derived_block_internal(anchor)?; Ok(()) } Err(err) => Err(err), @@ -262,7 +262,7 @@ where /// Saves a [`StoredDerivedBlockPair`] to [`DerivedBlocks`](`crate::models::DerivedBlocks`) /// table and [`SourceBlockTraversal`] to [`BlockTraversal`](`crate::models::BlockTraversal`) /// table in the database. - pub(crate) fn save_derived_block_pair( + pub(crate) fn save_derived_block( &self, incoming_pair: DerivedRefPair, ) -> Result<(), StorageError> { @@ -293,7 +293,7 @@ where } else { error!( target: "supervisor_storage", - latest_derived_block_pair = %latest_derivation_state, + %latest_derivation_state, incoming_derived_block_pair = %incoming_pair, "Incoming derived block is not consistent with the latest stored derived block" ); @@ -318,20 +318,20 @@ where if !latest_derivation_state.derived.is_parent_of(&incoming_pair.derived) { warn!( target: "supervisor_storage", - latest_derived_block_pair = %latest_derivation_state, + %latest_derivation_state, incoming_derived_block_pair = %incoming_pair, "Latest stored derived block is not parent of the incoming derived block" ); return Err(StorageError::DerivedBlockOutOfOrder); } - self.save_derived_block_pair_internal(incoming_pair) + self.save_derived_block_internal(incoming_pair) } /// Internal function to save a derived block pair. /// This function does not perform checks on the incoming derived pair, /// it assumes that the pair is valid and the latest derived block is its parent. - fn save_derived_block_pair_internal( + fn save_derived_block_internal( &self, incoming_pair: DerivedRefPair, ) -> Result<(), StorageError> { @@ -514,7 +514,7 @@ mod tests { fn insert_pair(db: &DatabaseEnv, pair: &DerivedRefPair) -> Result<(), StorageError> { let tx = db.tx_mut().expect("Could not get mutable tx"); let provider = DerivationProvider::new(&tx); - let res = provider.save_derived_block_pair(*pair); + let res = provider.save_derived_block(*pair); if res.is_ok() { tx.commit().expect("Failed to commit transaction"); } @@ -561,7 +561,7 @@ mod tests { // First initialise assert!(initialize_db(&db, &anchor).is_ok()); // Second initialise with the same anchor should succeed (idempotent) - assert!(initialize_db(&db, &anchor).is_ok()); + assert!(insert_pair(&db, &anchor).is_ok()); } #[test] @@ -575,15 +575,15 @@ mod tests { assert!(initialize_db(&db, &anchor).is_ok()); // Try to initialise with a different anchor (different hash) - let wrong_derived = block_info(1, B256::from([42u8; 32]), 200); + let wrong_derived = block_info(0, B256::from([42u8; 32]), 200); let wrong_anchor = derived_pair(source, wrong_derived); - let result = initialize_db(&db, &wrong_anchor); - assert!(matches!(result, Err(StorageError::InvalidAnchor))); + let result = insert_pair(&db, &wrong_anchor); + assert!(matches!(result, Err(StorageError::ConflictError(_)))); } #[test] - fn save_derived_block_pair_positive() { + fn save_derived_block_positive() { let db = setup_db(); let source1 = block_info(100, B256::from([100u8; 32]), 200); @@ -603,7 +603,7 @@ mod tests { } #[test] - fn save_derived_block_pair_wrong_parent_should_fail() { + fn save_derived_block_wrong_parent_should_fail() { let db = setup_db(); let source1 = block_info(100, B256::from([100u8; 32]), 200); @@ -619,7 +619,7 @@ mod tests { } #[test] - fn save_derived_block_pair_gap_in_number_should_fail() { + fn save_derived_block_gap_in_number_should_fail() { let db = setup_db(); let source1 = block_info(100, B256::from([100u8; 32]), 200); @@ -768,6 +768,21 @@ mod tests { assert_eq!(latest, pair2); } + #[test] + fn test_latest_derivation_state_empty_storage() { + let db = setup_db(); + + let tx = db.tx().expect("Could not get tx"); + let provider = DerivationProvider::new(&tx); + + let result = provider.latest_derivation_state(); + print!("{:?}", result); + assert!( + matches!(result, Err(StorageError::DatabaseNotInitialised)), + "Should return DatabaseNotInitialised error when no derivation state exists" + ); + } + #[test] fn test_latest_derivation_state_empty_source() { let db = setup_db(); @@ -801,7 +816,10 @@ mod tests { let tx = db.tx().expect("Could not get tx"); let provider = DerivationProvider::new(&tx); - assert!(matches!(provider.latest_derivation_state(), Err(StorageError::EntryNotFound(_)))); + assert!(matches!( + provider.latest_derivation_state(), + Err(StorageError::DatabaseNotInitialised) + )); } #[test] diff --git a/crates/supervisor/storage/src/providers/head_ref_provider.rs b/crates/supervisor/storage/src/providers/head_ref_provider.rs index 2a7f2a7896..8e80dc7d3b 100644 --- a/crates/supervisor/storage/src/providers/head_ref_provider.rs +++ b/crates/supervisor/storage/src/providers/head_ref_provider.rs @@ -45,33 +45,40 @@ impl SafetyHeadRefProvider<'_, Tx> where Tx: DbTxMut + DbTx, { - pub(crate) fn initialise(&self, anchor: BlockInfo) -> Result<(), StorageError> { - match self.get_safety_head_ref(SafetyLevel::LocalUnsafe) { - Ok(_) => Ok(()), // if it is set already, skip. - Err(StorageError::EntryNotFound(_)) => { - self.update_safety_head_ref(SafetyLevel::LocalUnsafe, &anchor)?; - self.update_safety_head_ref(SafetyLevel::CrossUnsafe, &anchor)?; - self.update_safety_head_ref(SafetyLevel::LocalSafe, &anchor)?; - self.update_safety_head_ref(SafetyLevel::CrossSafe, &anchor) - } - Err(err) => Err(err), - } - } + /// Updates the safety head reference with the provided block info. + /// If the block info's number is less than the current head reference's number, + /// it will not update the head reference and will log a warning. pub(crate) fn update_safety_head_ref( &self, safety_level: SafetyLevel, - block_info: &BlockInfo, + incoming_head_ref: &BlockInfo, ) -> Result<(), StorageError> { - self.tx.put::(safety_level.into(), (*block_info).into()).inspect_err( - |err| { + // Ensure the block_info.number is greater than the stored head reference + // If the head reference is not set, this check will be skipped. + if let Ok(current_head_ref) = self.get_safety_head_ref(safety_level) { + if current_head_ref.number > incoming_head_ref.number { + warn!( + target: "supervisor_storage", + %current_head_ref, + %incoming_head_ref, + %safety_level, + "Attempting to update head reference with a block that has a lower number than the current head reference", + ); + return Ok(()); + } + } + + self.tx + .put::(safety_level.into(), (*incoming_head_ref).into()) + .inspect_err(|err| { error!( target: "supervisor_storage", + %incoming_head_ref, %safety_level, %err, "Failed to store head reference" ) - }, - )?; + })?; Ok(()) } } diff --git a/crates/supervisor/storage/src/providers/log_provider.rs b/crates/supervisor/storage/src/providers/log_provider.rs index a3b165786e..92e81d51c1 100644 --- a/crates/supervisor/storage/src/providers/log_provider.rs +++ b/crates/supervisor/storage/src/providers/log_provider.rs @@ -68,6 +68,24 @@ where Err(e) => return Err(e), }; + if latest_block.number >= block.number { + // If the latest block is ahead of the incoming block, it means + // the incoming block is old block, check if it is same as the stored block. + let stored_block = self.get_block(block.number)?; + if stored_block == *block { + return Ok(()); + } + error!( + target: "supervisor_storage", + %stored_block, + incoming_block = %block, + "Incoming log block is not consistent with the stored log block", + ); + return Err(StorageError::ConflictError( + "incoming log block is not consistent with the stored log block".to_string(), + )) + } + if !latest_block.is_parent_of(block) { warn!( target: "supervisor_storage", @@ -144,7 +162,7 @@ where let (_, block) = result.ok_or_else(|| { warn!(target: "supervisor_storage", "No blocks found in storage"); - StorageError::EntryNotFound("no blocks found".to_string()) + StorageError::DatabaseNotInitialised })?; Ok(block.into()) } @@ -350,6 +368,17 @@ mod tests { assert!(matches!(result, Err(StorageError::InvalidAnchor))); } + #[test] + fn test_get_latest_block_empty() { + let db = setup_db(); + + let tx = db.tx().expect("Failed to start RO tx"); + let log_reader = LogProvider::new(&tx); + + let result = log_reader.get_latest_block(); + assert!(matches!(result, Err(StorageError::DatabaseNotInitialised))); + } + #[test] fn test_storage_read_write_success() { let db = setup_db(); @@ -411,7 +440,7 @@ mod tests { let log_reader = LogProvider::new(&tx); let result = log_reader.get_latest_block(); - assert!(matches!(result, Err(StorageError::EntryNotFound(_)))); + assert!(matches!(result, Err(StorageError::DatabaseNotInitialised))); // Initialize with genesis block let genesis = genesis_block(); @@ -453,4 +482,54 @@ mod tests { let result = insert_block_logs(&db, &block2, logs2); assert!(matches!(result, Err(StorageError::BlockOutOfOrder))); } + + #[test] + fn store_block_logs_skips_if_block_already_exists() { + let db = setup_db(); + let genesis = genesis_block(); + initialize_db(&db, &genesis).expect("Failed to initialize DB with genesis block"); + + let block1 = sample_block_info(1, genesis.hash); + let logs1 = vec![sample_log(0, false)]; + + // Store block1 for the first time + assert!(insert_block_logs(&db, &block1, logs1.clone()).is_ok()); + + // Try storing the same block again (should skip and succeed) + assert!(insert_block_logs(&db, &block1, logs1.clone()).is_ok()); + + // Try storing genesis block again (should skip and succeed) + assert!(insert_block_logs(&db, &genesis, Vec::new()).is_ok()); + + // Check that the logs are still present and correct + let tx = db.tx().expect("Failed to start RO tx"); + let log_reader = LogProvider::new(&tx); + let logs = log_reader.get_logs(block1.number).expect("Should get logs"); + assert_eq!(logs, logs1); + } + + #[test] + fn store_block_logs_returns_conflict_if_block_exists_with_different_data() { + let db = setup_db(); + let genesis = genesis_block(); + initialize_db(&db, &genesis).expect("Failed to initialize DB with genesis block"); + + let block1 = sample_block_info(1, genesis.hash); + let logs1 = vec![sample_log(0, false)]; + assert!(insert_block_logs(&db, &block1, logs1).is_ok()); + + // Try storing block1 again with a different hash (simulate conflict) + let mut block1_conflict = block1; + block1_conflict.hash = B256::from([0x22; 32]); + let logs1_conflict = vec![sample_log(0, false)]; + + let result = insert_block_logs(&db, &block1_conflict, logs1_conflict); + assert!(matches!(result, Err(StorageError::ConflictError(_)))); + + // Try storing genesis block again with a different hash (simulate conflict) + let mut genesis_conflict = genesis; + genesis_conflict.hash = B256::from([0x33; 32]); + let result = insert_block_logs(&db, &genesis_conflict, Vec::new()); + assert!(matches!(result, Err(StorageError::ConflictError(_)))); + } } diff --git a/crates/supervisor/storage/src/traits.rs b/crates/supervisor/storage/src/traits.rs index 38f2adf4fe..106380f197 100644 --- a/crates/supervisor/storage/src/traits.rs +++ b/crates/supervisor/storage/src/traits.rs @@ -60,6 +60,20 @@ pub trait DerivationStorageReader: Debug { /// /// Implementations are expected to provide persistent and thread-safe access to block data. pub trait DerivationStorageWriter: Debug { + /// Initializes the derivation storage with a given [`DerivedRefPair`]. + /// This method is typically called once to set up the storage with the initial pair. + /// + /// # Arguments + /// * `incoming_pair` - The derived block pair to initialize the storage with. + /// + /// # Returns + /// * `Ok(())` if the storage was successfully initialized. + /// * `Err(StorageError)` if there is an issue initializing the storage. + fn initialise_derivation_storage( + &self, + incoming_pair: DerivedRefPair, + ) -> Result<(), StorageError>; + /// Saves a [`DerivedRefPair`] to the storage. /// /// This method is **append-only**: it does not overwrite existing pairs. @@ -155,6 +169,17 @@ pub trait LogStorageReader: Debug { /// /// Implementations are expected to provide persistent and thread-safe access to block logs. pub trait LogStorageWriter: Send + Sync + Debug { + /// Initializes the log storage with a given [`BlockInfo`]. + /// This method is typically called once to set up the storage with the initial block. + /// + /// # Arguments + /// * `block` - The [`BlockInfo`] to initialize the storage with. + /// + /// # Returns + /// * `Ok(())` if the storage was successfully initialized. + /// * `Err(StorageError)` if there is an issue initializing the storage. + fn initialise_log_storage(&self, block: BlockInfo) -> Result<(), StorageError>; + /// Stores [`BlockInfo`] and [`Log`]s in the storage. /// This method is append-only and does not overwrite existing logs. /// Ensures that the latest stored block is the parent of the incoming block before saving. diff --git a/docker/recipes/kona-supervisor/grafana/dashboard/kona-supervisor.json b/docker/recipes/kona-supervisor/grafana/dashboard/kona-supervisor.json index 694b1e693e..afed19e13a 100644 --- a/docker/recipes/kona-supervisor/grafana/dashboard/kona-supervisor.json +++ b/docker/recipes/kona-supervisor/grafana/dashboard/kona-supervisor.json @@ -188,7 +188,7 @@ "datasource": "Prometheus", "targets": [ { - "expr": "kona_supervisor_storage_duration_seconds{quantile=\"0.95\",chain_id=~\"$chain_id\",method=~\"derived_to_source|latest_derived_block_at_source|latest_derived_block_pair|save_derived_block_pair\"}", + "expr": "kona_supervisor_storage_duration_seconds{quantile=\"0.95\",chain_id=~\"$chain_id\",method=~\"derived_to_source|latest_derived_block_at_source|latest_derivation_state|save_derived_block\"}", "legendFormat": "{{method}}" } ],