diff --git a/crates/cli/commands/src/init_state/mod.rs b/crates/cli/commands/src/init_state/mod.rs index fcf8adf11e2..68618361e7f 100644 --- a/crates/cli/commands/src/init_state/mod.rs +++ b/crates/cli/commands/src/init_state/mod.rs @@ -10,7 +10,8 @@ use reth_db_common::init::init_from_state_dump; use reth_node_api::NodePrimitives; use reth_primitives_traits::{BlockHeader, SealedHeader}; use reth_provider::{ - BlockNumReader, DatabaseProviderFactory, StaticFileProviderFactory, StaticFileWriter, + BlockNumReader, DBProvider, DatabaseProviderFactory, StaticFileProviderFactory, + StaticFileWriter, }; use std::{io::BufReader, path::PathBuf, str::FromStr, sync::Arc}; use tracing::info; diff --git a/crates/cli/commands/src/init_state/without_evm.rs b/crates/cli/commands/src/init_state/without_evm.rs index 3a85b175eb4..09711d45880 100644 --- a/crates/cli/commands/src/init_state/without_evm.rs +++ b/crates/cli/commands/src/init_state/without_evm.rs @@ -6,7 +6,7 @@ use reth_node_builder::NodePrimitives; use reth_primitives_traits::{SealedBlock, SealedHeader, SealedHeaderFor}; use reth_provider::{ providers::StaticFileProvider, BlockWriter, ProviderResult, StageCheckpointWriter, - StaticFileProviderFactory, StaticFileWriter, StorageLocation, + StaticFileProviderFactory, StaticFileWriter, }; use reth_stages::{StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; @@ -81,7 +81,6 @@ where ) .try_recover() .expect("no senders or txes"), - StorageLocation::Database, )?; let sf_provider = provider_rw.static_file_provider(); diff --git a/crates/cli/commands/src/stage/drop.rs b/crates/cli/commands/src/stage/drop.rs index 1684264213d..66227e10271 100644 --- a/crates/cli/commands/src/stage/drop.rs +++ b/crates/cli/commands/src/stage/drop.rs @@ -15,9 +15,7 @@ use reth_db_common::{ }; use reth_node_api::{HeaderTy, ReceiptTy, TxTy}; use reth_node_core::args::StageEnum; -use reth_provider::{ - writer::UnifiedStorageWriter, DatabaseProviderFactory, StaticFileProviderFactory, -}; +use reth_provider::{DBProvider, DatabaseProviderFactory, StaticFileProviderFactory}; use reth_prune::PruneSegment; use reth_stages::StageId; use reth_static_file_types::StaticFileSegment; @@ -160,7 +158,7 @@ impl Command { tx.put::(StageId::Finish.to_string(), Default::default())?; - UnifiedStorageWriter::commit_unwind(provider_rw)?; + provider_rw.commit()?; Ok(()) } diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index aad1ac8f3ad..4e577af06be 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -30,8 +30,8 @@ use reth_node_metrics::{ version::VersionInfo, }; use reth_provider::{ - writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory, - StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, + ChainSpecProvider, DBProvider, DatabaseProviderFactory, StageCheckpointReader, + StageCheckpointWriter, StaticFileProviderFactory, }; use reth_stages::{ stages::{ @@ -342,7 +342,7 @@ impl } if self.commit { - UnifiedStorageWriter::commit_unwind(provider_rw)?; + provider_rw.commit()?; provider_rw = provider_factory.database_provider_rw()?; } } @@ -365,7 +365,7 @@ impl provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?; } if self.commit { - UnifiedStorageWriter::commit(provider_rw)?; + provider_rw.commit()?; provider_rw = provider_factory.database_provider_rw()?; } diff --git a/crates/cli/commands/src/stage/unwind.rs b/crates/cli/commands/src/stage/unwind.rs index f10aea99497..9ef2085a065 100644 --- a/crates/cli/commands/src/stage/unwind.rs +++ b/crates/cli/commands/src/stage/unwind.rs @@ -17,7 +17,7 @@ use reth_evm::ConfigureEvm; use reth_exex::ExExManagerHandle; use reth_provider::{ providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainStateBlockReader, - ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, StorageLocation, + ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory, }; use reth_stages::{ sets::{DefaultStages, OfflineStages}, @@ -97,7 +97,7 @@ impl> Command let provider = provider_factory.provider_rw()?; provider - .remove_block_and_execution_above(target, StorageLocation::Both) + .remove_block_and_execution_above(target) .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?; // update finalized block if needed diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 1c417e357fb..a9e6d653936 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -6,8 +6,8 @@ use reth_errors::ProviderError; use reth_ethereum_primitives::EthPrimitives; use reth_primitives_traits::NodePrimitives; use reth_provider::{ - providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, - ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory, + providers::ProviderNodeTypes, BlockExecutionWriter, BlockHashReader, ChainStateBlockWriter, + DBProvider, DatabaseProviderFactory, ProviderFactory, }; use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory}; use reth_stages_api::{MetricEvent, MetricEventsSender}; @@ -128,11 +128,10 @@ where debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks"); let start_time = Instant::now(); let provider_rw = self.provider.database_provider_rw()?; - let sf_provider = self.provider.static_file_provider(); let new_tip_hash = provider_rw.block_hash(new_tip_num)?; - UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; - UnifiedStorageWriter::commit_unwind(provider_rw)?; + provider_rw.remove_block_and_execution_above(new_tip_num)?; + provider_rw.commit()?; debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk"); self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); @@ -152,10 +151,9 @@ where if last_block_hash_num.is_some() { let provider_rw = self.provider.database_provider_rw()?; - let static_file_provider = self.provider.static_file_provider(); - UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(blocks)?; - UnifiedStorageWriter::commit(provider_rw)?; + provider_rw.save_blocks(blocks)?; + provider_rw.commit()?; } self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); Ok(last_block_hash_num) diff --git a/crates/era-utils/src/history.rs b/crates/era-utils/src/history.rs index 31ac8825dc2..12bafed6113 100644 --- a/crates/era-utils/src/history.rs +++ b/crates/era-utils/src/history.rs @@ -19,15 +19,15 @@ use reth_etl::Collector; use reth_fs_util as fs; use reth_primitives_traits::{Block, FullBlockBody, FullBlockHeader, NodePrimitives}; use reth_provider::{ - providers::StaticFileProviderRWRefMut, writer::UnifiedStorageWriter, BlockWriter, - ProviderError, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter, + providers::StaticFileProviderRWRefMut, BlockWriter, ProviderError, StaticFileProviderFactory, + StaticFileSegment, StaticFileWriter, }; use reth_stages_types::{ CheckpointBlockRange, EntitiesCheckpoint, HeadersCheckpoint, StageCheckpoint, StageId, }; use reth_storage_api::{ errors::ProviderResult, DBProvider, DatabaseProviderFactory, HeaderProvider, - NodePrimitivesProvider, StageCheckpointWriter, StorageLocation, + NodePrimitivesProvider, StageCheckpointWriter, }; use std::{ collections::Bound, @@ -102,14 +102,14 @@ where save_stage_checkpoints(&provider, from, height, height, height)?; - UnifiedStorageWriter::commit(provider)?; + provider.commit()?; } let provider = provider_factory.database_provider_rw()?; build_index(&provider, hash_collector)?; - UnifiedStorageWriter::commit(provider)?; + provider.commit()?; Ok(height) } @@ -318,11 +318,7 @@ where writer.append_header(&header, *total_difficulty, &hash)?; // Write bodies to database. - provider.append_block_bodies( - vec![(header.number(), Some(body))], - // We are writing transactions directly to static files. - StorageLocation::StaticFiles, - )?; + provider.append_block_bodies(vec![(header.number(), Some(body))])?; hash_collector.insert(hash, number)?; } diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index d69d04c0e39..9ebf69457b6 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -667,7 +667,7 @@ mod tests { use reth_primitives_traits::RecoveredBlock; use reth_provider::{ providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader, - BlockWriter, Chain, DatabaseProviderFactory, StorageLocation, TransactionVariant, + BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; @@ -1303,7 +1303,7 @@ mod tests { .try_recover() .unwrap(); let provider_rw = provider_factory.database_provider_rw().unwrap(); - provider_rw.insert_block(block.clone(), StorageLocation::Database).unwrap(); + provider_rw.insert_block(block.clone()).unwrap(); provider_rw.commit().unwrap(); let provider = BlockchainProvider::new(provider_factory).unwrap(); diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index c624fd4ff4e..0def7a510ed 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -457,7 +457,7 @@ mod tests { use reth_primitives_traits::Block as _; use reth_provider::{ providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter, - Chain, DatabaseProviderFactory, StorageLocation, + Chain, DBProvider, DatabaseProviderFactory, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; use tokio::sync::mpsc; @@ -483,8 +483,7 @@ mod tests { BlockParams { parent: Some(genesis_hash), tx_count: Some(0), ..Default::default() }, ); let provider_rw = provider_factory.provider_rw()?; - provider_rw - .insert_block(node_head_block.clone().try_recover()?, StorageLocation::Database)?; + provider_rw.insert_block(node_head_block.clone().try_recover()?)?; provider_rw.commit()?; let node_head = node_head_block.num_hash(); @@ -614,7 +613,7 @@ mod tests { .try_recover()?; let node_head = node_head_block.num_hash(); let provider_rw = provider.database_provider_rw()?; - provider_rw.insert_block(node_head_block, StorageLocation::Database)?; + provider_rw.insert_block(node_head_block)?; provider_rw.commit()?; let node_head_notification = ExExNotification::ChainCommitted { new: Arc::new( diff --git a/crates/optimism/cli/src/commands/import_receipts.rs b/crates/optimism/cli/src/commands/import_receipts.rs index 0bf9d44b056..db25afe9099 100644 --- a/crates/optimism/cli/src/commands/import_receipts.rs +++ b/crates/optimism/cli/src/commands/import_receipts.rs @@ -17,9 +17,9 @@ use reth_optimism_chainspec::OpChainSpec; use reth_optimism_primitives::{bedrock::is_dup_tx, OpPrimitives, OpReceipt}; use reth_primitives_traits::NodePrimitives; use reth_provider::{ - providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory, - OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter, - StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation, + providers::ProviderNodeTypes, DBProvider, DatabaseProviderFactory, OriginalValuesKnown, + ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StateWriter, + StaticFileProviderFactory, StatsReader, }; use reth_stages::{StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; @@ -224,18 +224,11 @@ where // Update total_receipts after all filtering total_receipts += receipts.iter().map(|v| v.len()).sum::(); - // We're reusing receipt writing code internal to - // `UnifiedStorageWriter::append_receipts_from_blocks`, so we just use a default empty - // `BundleState`. let execution_outcome = ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default()); // finally, write the receipts - provider.write_state( - &execution_outcome, - OriginalValuesKnown::Yes, - StorageLocation::StaticFiles, - )?; + provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?; } // Only commit if we have imported as many receipts as the number of transactions. @@ -260,7 +253,7 @@ where provider .save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?; - UnifiedStorageWriter::commit(provider)?; + provider.commit()?; Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns }) } diff --git a/crates/optimism/cli/src/commands/init_state.rs b/crates/optimism/cli/src/commands/init_state.rs index 92cd92de0a3..0d065c29442 100644 --- a/crates/optimism/cli/src/commands/init_state.rs +++ b/crates/optimism/cli/src/commands/init_state.rs @@ -12,8 +12,8 @@ use reth_optimism_primitives::{ }; use reth_primitives_traits::SealedHeader; use reth_provider::{ - BlockNumReader, ChainSpecProvider, DatabaseProviderFactory, StaticFileProviderFactory, - StaticFileWriter, + BlockNumReader, ChainSpecProvider, DBProvider, DatabaseProviderFactory, + StaticFileProviderFactory, StaticFileWriter, }; use std::{io::BufReader, sync::Arc}; use tracing::info; diff --git a/crates/prune/prune/src/segments/mod.rs b/crates/prune/prune/src/segments/mod.rs index c34e3a322aa..1daade01358 100644 --- a/crates/prune/prune/src/segments/mod.rs +++ b/crates/prune/prune/src/segments/mod.rs @@ -149,6 +149,7 @@ mod tests { use reth_provider::{ providers::BlockchainProvider, test_utils::{create_test_provider_factory, MockEthProvider}, + BlockWriter, }; use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams}; @@ -190,7 +191,7 @@ mod tests { let provider_rw = factory.provider_rw().expect("failed to get provider_rw"); for block in &blocks { provider_rw - .insert_historical_block( + .insert_block( block.clone().try_recover().expect("failed to seal block with senders"), ) .expect("failed to insert block"); @@ -228,7 +229,7 @@ mod tests { let provider_rw = factory.provider_rw().expect("failed to get provider_rw"); for block in &blocks { provider_rw - .insert_historical_block( + .insert_block( block.clone().try_recover().expect("failed to seal block with senders"), ) .expect("failed to insert block"); @@ -274,7 +275,7 @@ mod tests { let provider_rw = factory.provider_rw().expect("failed to get provider_rw"); for block in &blocks { provider_rw - .insert_historical_block( + .insert_block( block.clone().try_recover().expect("failed to seal block with senders"), ) .expect("failed to insert block"); @@ -310,7 +311,7 @@ mod tests { let provider_rw = factory.provider_rw().expect("failed to get provider_rw"); for block in &blocks { provider_rw - .insert_historical_block( + .insert_block( block.clone().try_recover().expect("failed to seal block with senders"), ) .expect("failed to insert block"); diff --git a/crates/prune/prune/src/segments/receipts.rs b/crates/prune/prune/src/segments/receipts.rs index 393ca638b89..12ad6e2c203 100644 --- a/crates/prune/prune/src/segments/receipts.rs +++ b/crates/prune/prune/src/segments/receipts.rs @@ -89,7 +89,7 @@ mod tests { Itertools, }; use reth_db_api::tables; - use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; + use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment, }; diff --git a/crates/prune/prune/src/segments/static_file/headers.rs b/crates/prune/prune/src/segments/static_file/headers.rs index d8b7e6a5398..9f3c291bf44 100644 --- a/crates/prune/prune/src/segments/static_file/headers.rs +++ b/crates/prune/prune/src/segments/static_file/headers.rs @@ -220,7 +220,7 @@ mod tests { use assert_matches::assert_matches; use reth_db_api::{tables, transaction::DbTx}; use reth_provider::{ - DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, + DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, }; use reth_prune_types::{ diff --git a/crates/prune/prune/src/segments/static_file/transactions.rs b/crates/prune/prune/src/segments/static_file/transactions.rs index 409e7f9b3d3..115ee2ca39a 100644 --- a/crates/prune/prune/src/segments/static_file/transactions.rs +++ b/crates/prune/prune/src/segments/static_file/transactions.rs @@ -101,7 +101,7 @@ mod tests { }; use reth_db_api::tables; use reth_provider::{ - DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, + DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, StaticFileProviderFactory, }; use reth_prune_types::{ diff --git a/crates/prune/prune/src/segments/user/account_history.rs b/crates/prune/prune/src/segments/user/account_history.rs index 7780a9e07e6..3c18cd1befc 100644 --- a/crates/prune/prune/src/segments/user/account_history.rs +++ b/crates/prune/prune/src/segments/user/account_history.rs @@ -133,7 +133,7 @@ mod tests { use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; use reth_db_api::{tables, BlockNumberList}; - use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; + use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment, }; diff --git a/crates/prune/prune/src/segments/user/receipts_by_logs.rs b/crates/prune/prune/src/segments/user/receipts_by_logs.rs index bb214ea1679..0849db52518 100644 --- a/crates/prune/prune/src/segments/user/receipts_by_logs.rs +++ b/crates/prune/prune/src/segments/user/receipts_by_logs.rs @@ -232,7 +232,9 @@ mod tests { use assert_matches::assert_matches; use reth_db_api::{cursor::DbCursorRO, tables, transaction::DbTx}; use reth_primitives_traits::InMemorySize; - use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider}; + use reth_provider::{ + DBProvider, DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider, + }; use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::generators::{ diff --git a/crates/prune/prune/src/segments/user/sender_recovery.rs b/crates/prune/prune/src/segments/user/sender_recovery.rs index f379fb99519..35ee487203a 100644 --- a/crates/prune/prune/src/segments/user/sender_recovery.rs +++ b/crates/prune/prune/src/segments/user/sender_recovery.rs @@ -91,7 +91,7 @@ mod tests { }; use reth_db_api::tables; use reth_primitives_traits::SignerRecoverable; - use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; + use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams}; diff --git a/crates/prune/prune/src/segments/user/storage_history.rs b/crates/prune/prune/src/segments/user/storage_history.rs index aa9cb846448..ee7447c37da 100644 --- a/crates/prune/prune/src/segments/user/storage_history.rs +++ b/crates/prune/prune/src/segments/user/storage_history.rs @@ -140,7 +140,7 @@ mod tests { use alloy_primitives::{BlockNumber, B256}; use assert_matches::assert_matches; use reth_db_api::{tables, BlockNumberList}; - use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; + use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use reth_testing_utils::generators::{ diff --git a/crates/prune/prune/src/segments/user/transaction_lookup.rs b/crates/prune/prune/src/segments/user/transaction_lookup.rs index dcf7c195d9e..2ed08f7d1a7 100644 --- a/crates/prune/prune/src/segments/user/transaction_lookup.rs +++ b/crates/prune/prune/src/segments/user/transaction_lookup.rs @@ -139,7 +139,7 @@ mod tests { Itertools, }; use reth_db_api::tables; - use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader}; + use reth_provider::{DBProvider, DatabaseProviderFactory, PruneCheckpointReader}; use reth_prune_types::{ PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment, }; diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 22e00abcf2e..0a9aaef73de 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -7,8 +7,8 @@ pub use event::*; use futures_util::Future; use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH; use reth_provider::{ - providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, BlockNumReader, - ChainStateBlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, + providers::ProviderNodeTypes, BlockHashReader, BlockNumReader, ChainStateBlockReader, + ChainStateBlockWriter, DBProvider, DatabaseProviderFactory, ProviderFactory, PruneCheckpointReader, StageCheckpointReader, StageCheckpointWriter, }; use reth_prune::PrunerBuilder; @@ -391,7 +391,7 @@ impl Pipeline { ))?; } - UnifiedStorageWriter::commit_unwind(provider_rw)?; + provider_rw.commit()?; stage.post_unwind_commit()?; @@ -481,7 +481,7 @@ impl Pipeline { provider_rw.save_stage_checkpoint(stage_id, checkpoint)?; // Commit processed data to the database. - UnifiedStorageWriter::commit(provider_rw)?; + provider_rw.commit()?; // Invoke stage post commit hook. self.stage(stage_index).post_execute_commit()?; @@ -579,7 +579,7 @@ impl Pipeline { prev_checkpoint.unwrap_or_default(), )?; - UnifiedStorageWriter::commit(provider_rw)?; + provider_rw.commit()?; // We unwind because of a validation error. If the unwind itself // fails, we bail entirely, diff --git a/crates/stages/stages/benches/criterion.rs b/crates/stages/stages/benches/criterion.rs index c804d582363..655b990f254 100644 --- a/crates/stages/stages/benches/criterion.rs +++ b/crates/stages/stages/benches/criterion.rs @@ -5,7 +5,9 @@ use alloy_primitives::BlockNumber; use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion}; use reth_config::config::{EtlConfig, TransactionLookupConfig}; use reth_db::{test_utils::TempDatabase, Database, DatabaseEnv}; -use reth_provider::{test_utils::MockNodeTypesWithDB, DatabaseProvider, DatabaseProviderFactory}; +use reth_provider::{ + test_utils::MockNodeTypesWithDB, DBProvider, DatabaseProvider, DatabaseProviderFactory, +}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TransactionLookupStage}, test_utils::TestStageDB, diff --git a/crates/stages/stages/benches/setup/mod.rs b/crates/stages/stages/benches/setup/mod.rs index d5ea62ba4e0..bd1fb59ebe9 100644 --- a/crates/stages/stages/benches/setup/mod.rs +++ b/crates/stages/stages/benches/setup/mod.rs @@ -9,7 +9,8 @@ use reth_db_api::{ }; use reth_primitives_traits::{Account, SealedBlock, SealedHeader}; use reth_provider::{ - test_utils::MockNodeTypesWithDB, DatabaseProvider, DatabaseProviderFactory, TrieWriter, + test_utils::MockNodeTypesWithDB, DBProvider, DatabaseProvider, DatabaseProviderFactory, + TrieWriter, }; use reth_stages::{ stages::{AccountHashingStage, StorageHashingStage}, diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 4eca51d00a7..d1386dded4b 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -8,7 +8,7 @@ use reth_db_api::{ use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}; use reth_provider::{ providers::StaticFileWriter, BlockReader, BlockWriter, DBProvider, ProviderError, - StaticFileProviderFactory, StatsReader, StorageLocation, + StaticFileProviderFactory, StatsReader, }; use reth_stages_api::{ EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, @@ -206,8 +206,6 @@ where .into_iter() .map(|response| (response.block_number(), response.into_body())) .collect(), - // We are writing transactions directly to static files. - StorageLocation::StaticFiles, )?; // The stage is "done" if: @@ -230,7 +228,7 @@ where self.buffer.take(); ensure_consistency(provider, Some(input.unwind_to))?; - provider.remove_bodies_above(input.unwind_to, StorageLocation::Both)?; + provider.remove_bodies_above(input.unwind_to)?; Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 1270033b885..3736fa523cb 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -13,7 +13,7 @@ use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, BlockHashReader, BlockReader, DBProvider, ExecutionOutcome, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StateWriter, - StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant, + StaticFileProviderFactory, StatsReader, TransactionVariant, }; use reth_revm::database::StateProviderDatabase; use reth_stages_api::{ @@ -452,7 +452,7 @@ where } // write output - provider.write_state(&state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?; + provider.write_state(&state, OriginalValuesKnown::Yes)?; let db_write_duration = time.elapsed(); debug!( @@ -504,8 +504,7 @@ where // Unwind account and storage changesets, as well as receipts. // // This also updates `PlainStorageState` and `PlainAccountState`. - let bundle_state_with_receipts = - provider.take_state_above(unwind_to, StorageLocation::Both)?; + let bundle_state_with_receipts = provider.take_state_above(unwind_to)?; // Prepare the input for post unwind commit hook, where an `ExExNotification` will be sent. if self.exex_manager_handle.has_exexs() { @@ -675,8 +674,8 @@ mod tests { use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::{Account, Bytecode, SealedBlock, StorageEntry}; use reth_provider::{ - test_utils::create_test_provider_factory, AccountReader, DatabaseProviderFactory, - ReceiptProvider, StaticFileProviderFactory, + test_utils::create_test_provider_factory, AccountReader, BlockWriter, + DatabaseProviderFactory, ReceiptProvider, StaticFileProviderFactory, }; use reth_prune::PruneModes; use reth_prune_types::{PruneMode, ReceiptsLogPruneConfig}; @@ -737,8 +736,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider.insert_block(block.clone().try_recover().unwrap()).unwrap(); provider .static_file_provider() .latest_writer(StaticFileSegment::Headers) @@ -778,8 +777,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider.insert_block(block.clone().try_recover().unwrap()).unwrap(); provider .static_file_provider() .latest_writer(StaticFileSegment::Headers) @@ -819,8 +818,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider.insert_block(block.clone().try_recover().unwrap()).unwrap(); provider .static_file_provider() .latest_writer(StaticFileSegment::Headers) @@ -852,8 +851,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider.insert_block(block.clone().try_recover().unwrap()).unwrap(); provider .static_file_provider() .latest_writer(StaticFileSegment::Headers) @@ -994,8 +993,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider.insert_block(block.clone().try_recover().unwrap()).unwrap(); provider .static_file_provider() .latest_writer(StaticFileSegment::Headers) @@ -1066,6 +1065,8 @@ mod tests { ) .unwrap(); + provider.static_file_provider().commit().unwrap(); + assert_matches!(result, UnwindOutput { checkpoint: StageCheckpoint { block_number: 0, @@ -1102,8 +1103,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f9025ff901f7a0c86e8cc0310ae7c531c758678ddbfd16fc51c8cef8cec650b032de9869e8b94fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa050554882fbbda2c2fd93fdc466db9946ea262a67f7a76cc169e714f105ab583da00967f09ef1dfed20c0eacfaa94d5cd4002eda3242ac47eae68972d07b106d192a0e3c8b47fbfc94667ef4cceb17e5cc21e3b1eebd442cebb27f07562b33836290db90100000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000008302000001830f42408238108203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f862f860800a83061a8094095e7baea6a6c7c4c2dfeb977efac326af552d8780801ba072ed817487b84ba367d15d2f039b5fc5f087d0a8882fbdf73e8cb49357e1ce30a0403d800545b8fc544f92ce8124e2255f8c3c6af93f28243a120585d4c4c6a2a3c0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider.insert_block(block.clone().try_recover().unwrap()).unwrap(); provider .static_file_provider() .latest_writer(StaticFileSegment::Headers) diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index b45f3a519a7..c48381d4fe9 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -71,7 +71,7 @@ impl AccountHashingStage { { use alloy_primitives::U256; use reth_db_api::models::AccountBeforeTx; - use reth_provider::{StaticFileProviderFactory, StaticFileWriter}; + use reth_provider::{BlockWriter, StaticFileProviderFactory, StaticFileWriter}; use reth_testing_utils::{ generators, generators::{random_block_range, random_eoa_accounts, BlockRangeParams}, @@ -86,7 +86,7 @@ impl AccountHashingStage { ); for block in blocks { - provider.insert_historical_block(block.try_recover().unwrap()).unwrap(); + provider.insert_block(block.try_recover().unwrap()).unwrap(); } provider .static_file_provider() @@ -453,7 +453,7 @@ mod tests { let provider = self.db.factory.database_provider_rw()?; let res = Ok(AccountHashingStage::seed( &provider, - SeedOpts { blocks: 1..=input.target(), accounts: 10, txs: 0..3 }, + SeedOpts { blocks: 0..=input.target(), accounts: 10, txs: 0..3 }, ) .unwrap()); provider.commit().expect("failed to commit"); diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 9147ebb844c..d3e690dc516 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -401,13 +401,9 @@ mod tests { }; use alloy_primitives::B256; use assert_matches::assert_matches; - use reth_ethereum_primitives::BlockBody; - use reth_execution_types::ExecutionOutcome; - use reth_primitives_traits::{RecoveredBlock, SealedBlock}; - use reth_provider::{BlockWriter, ProviderFactory, StaticFileProviderFactory}; + use reth_provider::{DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory}; use reth_stages_api::StageUnitCheckpoint; use reth_testing_utils::generators::{self, random_header, random_header_range}; - use reth_trie::HashedPostStateSorted; use std::sync::Arc; use test_runner::HeadersTestRunner; @@ -629,29 +625,29 @@ mod tests { assert!(runner.stage().header_collector.is_empty()); // let's insert some blocks using append_blocks_with_state - let sealed_headers = - random_header_range(&mut generators::rng(), tip.number..tip.number + 10, tip.hash()); - - // make them sealed blocks with senders by converting them to empty blocks - let sealed_blocks = sealed_headers - .iter() - .map(|header| { - RecoveredBlock::new_sealed( - SealedBlock::from_sealed_parts(header.clone(), BlockBody::default()), - vec![], - ) - }) - .collect(); + let sealed_headers = random_header_range( + &mut generators::rng(), + tip.number + 1..tip.number + 10, + tip.hash(), + ); + + let provider = runner.db().factory.database_provider_rw().unwrap(); + let static_file_provider = provider.static_file_provider(); + let mut writer = static_file_provider.latest_writer(StaticFileSegment::Headers).unwrap(); + for header in sealed_headers { + let ttd = if header.number() == 0 { + header.difficulty() + } else { + let parent_block_number = header.number() - 1; + let parent_ttd = + provider.header_td_by_number(parent_block_number).unwrap().unwrap_or_default(); + parent_ttd + header.difficulty() + }; + + writer.append_header(header.header(), ttd, &header.hash()).unwrap(); + } + drop(writer); - // append the blocks - let provider = runner.db().factory.provider_rw().unwrap(); - provider - .append_blocks_with_state( - sealed_blocks, - &ExecutionOutcome::default(), - HashedPostStateSorted::default(), - ) - .unwrap(); provider.commit().unwrap(); // now we can unwind 10 blocks diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 785b9be2eac..f9b2312f5ab 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -67,9 +67,9 @@ mod tests { use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, test_utils::MockNodeTypesWithDB, - AccountExtReader, BlockBodyIndicesProvider, DatabaseProviderFactory, ProviderFactory, - ProviderResult, ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory, - StorageReader, + AccountExtReader, BlockBodyIndicesProvider, BlockWriter, DatabaseProviderFactory, + ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter, + StaticFileProviderFactory, StorageReader, }; use reth_prune_types::{PruneMode, PruneModes}; use reth_stages_api::{ @@ -93,8 +93,8 @@ mod tests { let genesis = SealedBlock::::decode(&mut genesis_rlp).unwrap(); let mut block_rlp = hex!("f90262f901f9a075c371ba45999d87f4542326910a11af515897aebce5265d3f6acd1f1161f82fa01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa098f2dcd87c8ae4083e7017a05456c14eea4b1db2032126e27b3b1563d57d7cc0a08151d548273f6683169524b66ca9fe338b9ce42bc3540046c828fd939ae23bcba03f4e5c2ec5b2170b711d97ee755c160457bb58d8daa338e835ec02ae6860bbabb901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000018502540be40082a8798203e800a00000000000000000000000000000000000000000000000000000000000000000880000000000000000f863f861800a8405f5e10094100000000000000000000000000000000000000080801ba07e09e26678ed4fac08a249ebe8ed680bf9051a5e14ad223e4b2b9d26e0208f37a05f6e3f188e3e6eab7d7d3b6568f5eac7d687b08d307d3154ccd8c87b4630509bc0").as_slice(); let block = SealedBlock::::decode(&mut block_rlp).unwrap(); - provider_rw.insert_historical_block(genesis.try_recover().unwrap()).unwrap(); - provider_rw.insert_historical_block(block.clone().try_recover().unwrap()).unwrap(); + provider_rw.insert_block(genesis.try_recover().unwrap()).unwrap(); + provider_rw.insert_block(block.clone().try_recover().unwrap()).unwrap(); // Fill with bogus blocks to respect PruneMode distance. let mut head = block.hash(); @@ -106,7 +106,7 @@ mod tests { generators::BlockParams { parent: Some(head), ..Default::default() }, ); head = nblock.hash(); - provider_rw.insert_historical_block(nblock.try_recover().unwrap()).unwrap(); + provider_rw.insert_block(nblock.try_recover().unwrap()).unwrap(); } provider_rw .static_file_provider() diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index 244f023ef33..9b75e5683d9 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -289,6 +289,7 @@ mod tests { .expect("get static file writer for headers"); static_file_writer.prune_headers(blocks.len() as u64).unwrap(); static_file_writer.commit().expect("prune headers"); + drop(static_file_writer); let tx = db.factory.db_ref().tx_mut().expect("init tx"); for block in &blocks { diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index 87bb2ce98a0..48442aab381 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -11,11 +11,11 @@ use reth_etl::Collector; use reth_execution_errors::StateRootError; use reth_primitives_traits::{Account, Bytecode, GotExpected, NodePrimitives, StorageEntry}; use reth_provider::{ - errors::provider::ProviderResult, providers::StaticFileWriter, writer::UnifiedStorageWriter, - BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider, - DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, - OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointReader, StageCheckpointWriter, - StateWriter, StaticFileProviderFactory, StorageLocation, TrieWriter, + errors::provider::ProviderResult, providers::StaticFileWriter, BlockHashReader, BlockNumReader, + BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome, + HashingWriter, HeaderProvider, HistoryWriter, OriginalValuesKnown, ProviderError, RevertsInit, + StageCheckpointReader, StageCheckpointWriter, StateWriter, StaticFileProviderFactory, + TrieWriter, }; use reth_stages_types::{StageCheckpoint, StageId}; use reth_static_file_types::StaticFileSegment; @@ -154,17 +154,14 @@ where provider_rw.save_stage_checkpoint(stage, Default::default())?; } - let static_file_provider = provider_rw.static_file_provider(); // Static file segments start empty, so we need to initialize the genesis block. - let segment = StaticFileSegment::Receipts; - static_file_provider.latest_writer(segment)?.increment_block(0)?; - - let segment = StaticFileSegment::Transactions; - static_file_provider.latest_writer(segment)?.increment_block(0)?; + let static_file_provider = provider_rw.static_file_provider(); + static_file_provider.latest_writer(StaticFileSegment::Receipts)?.increment_block(0)?; + static_file_provider.latest_writer(StaticFileSegment::Transactions)?.increment_block(0)?; // `commit_unwind`` will first commit the DB and then the static file provider, which is // necessary on `init_genesis`. - UnifiedStorageWriter::commit_unwind(provider_rw)?; + provider_rw.commit()?; Ok(hash) } @@ -264,11 +261,7 @@ where Vec::new(), ); - provider.write_state( - &execution_outcome, - OriginalValuesKnown::Yes, - StorageLocation::Database, - )?; + provider.write_state(&execution_outcome, OriginalValuesKnown::Yes)?; trace!(target: "reth::cli", "Inserted state"); diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index f6c3d30150e..c5ba6890ee8 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -749,7 +749,6 @@ mod tests { create_test_provider_factory, create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB, }, - writer::UnifiedStorageWriter, BlockWriter, CanonChainTracker, ProviderFactory, StaticFileProviderFactory, StaticFileWriter, }; @@ -780,15 +779,15 @@ mod tests { use reth_static_file_types::StaticFileSegment; use reth_storage_api::{ BlockBodyIndicesProvider, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, - BlockReaderIdExt, BlockSource, ChangeSetReader, DatabaseProviderFactory, HeaderProvider, - ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, TransactionVariant, - TransactionsProvider, + BlockReaderIdExt, BlockSource, ChangeSetReader, DBProvider, DatabaseProviderFactory, + HeaderProvider, ReceiptProvider, ReceiptProviderIdExt, StateProviderFactory, StateWriter, + TransactionVariant, TransactionsProvider, }; use reth_testing_utils::generators::{ self, random_block, random_block_range, random_changeset_range, random_eoa_accounts, random_receipt, BlockParams, BlockRangeParams, }; - use revm_database::BundleState; + use revm_database::{BundleState, OriginalValuesKnown}; use std::{ ops::{Bound, Deref, Range, RangeBounds}, sync::Arc, @@ -863,37 +862,27 @@ mod tests { let factory = create_test_provider_factory_with_chain_spec(chain_spec); let provider_rw = factory.database_provider_rw()?; - let static_file_provider = factory.static_file_provider(); - - // Write transactions to static files with the right `tx_num`` - let mut tx_num = provider_rw - .block_body_indices(database_blocks.first().as_ref().unwrap().number.saturating_sub(1))? - .map(|indices| indices.next_tx_num()) - .unwrap_or_default(); // Insert blocks into the database - for (block, receipts) in database_blocks.iter().zip(&receipts) { - // TODO: this should be moved inside `insert_historical_block`: - let mut transactions_writer = - static_file_provider.latest_writer(StaticFileSegment::Transactions)?; - let mut receipts_writer = - static_file_provider.latest_writer(StaticFileSegment::Receipts)?; - transactions_writer.increment_block(block.number)?; - receipts_writer.increment_block(block.number)?; - - for (tx, receipt) in block.body().transactions().zip(receipts) { - transactions_writer.append_transaction(tx_num, tx)?; - receipts_writer.append_receipt(tx_num, receipt)?; - tx_num += 1; - } - - provider_rw.insert_historical_block( + for block in &database_blocks { + provider_rw.insert_block( block.clone().try_recover().expect("failed to seal block with senders"), )?; } - // Commit to both storages: database and static files - UnifiedStorageWriter::commit(provider_rw)?; + // Insert receipts into the database + if let Some(first_block) = database_blocks.first() { + provider_rw.write_state( + &ExecutionOutcome { + first_block: first_block.number, + receipts: receipts.iter().take(database_blocks.len()).cloned().collect(), + ..Default::default() + }, + OriginalValuesKnown::No, + )?; + } + + provider_rw.commit()?; let provider = BlockchainProvider::new(factory)?; @@ -978,10 +967,8 @@ mod tests { // Push to disk let provider_rw = hook_provider.database_provider_rw().unwrap(); - UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider()) - .save_blocks(vec![lowest_memory_block]) - .unwrap(); - UnifiedStorageWriter::commit(provider_rw).unwrap(); + provider_rw.save_blocks(vec![lowest_memory_block]).unwrap(); + provider_rw.commit().unwrap(); // Remove from memory hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash); @@ -1006,7 +993,7 @@ mod tests { // Insert first 5 blocks into the database let provider_rw = factory.provider_rw()?; for block in database_blocks { - provider_rw.insert_historical_block( + provider_rw.insert_block( block.clone().try_recover().expect("failed to seal block with senders"), )?; } @@ -1110,7 +1097,7 @@ mod tests { // Insert first 5 blocks into the database let provider_rw = factory.provider_rw()?; for block in database_blocks { - provider_rw.insert_historical_block( + provider_rw.insert_block( block.clone().try_recover().expect("failed to seal block with senders"), )?; } @@ -1348,7 +1335,7 @@ mod tests { // Insert and commit the block. let provider_rw = factory.provider_rw()?; - provider_rw.insert_historical_block(block_1)?; + provider_rw.insert_block(block_1)?; provider_rw.commit()?; let provider = BlockchainProvider::new(factory)?; diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index 4a4a2df2779..cd74ab36965 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -1529,7 +1529,7 @@ mod tests { // Insert first 5 blocks into the database let provider_rw = factory.provider_rw()?; for block in database_blocks { - provider_rw.insert_historical_block( + provider_rw.insert_block( block.clone().try_recover().expect("failed to seal block with senders"), )?; } @@ -1646,7 +1646,7 @@ mod tests { // Insert first 5 blocks into the database let provider_rw = factory.provider_rw()?; for block in database_blocks { - provider_rw.insert_historical_block( + provider_rw.insert_block( block.clone().try_recover().expect("failed to seal block with senders"), )?; } diff --git a/crates/storage/provider/src/providers/consistent_view.rs b/crates/storage/provider/src/providers/consistent_view.rs index 8edf062d269..d8404af5416 100644 --- a/crates/storage/provider/src/providers/consistent_view.rs +++ b/crates/storage/provider/src/providers/consistent_view.rs @@ -83,31 +83,27 @@ mod tests { use std::str::FromStr; use super::*; - use crate::{ - test_utils::create_test_provider_factory_with_chain_spec, BlockWriter, - StaticFileProviderFactory, StaticFileWriter, - }; + use crate::{test_utils::create_test_provider_factory, BlockWriter}; use alloy_primitives::Bytes; use assert_matches::assert_matches; - use reth_chainspec::{EthChainSpec, MAINNET}; + use reth_chainspec::{ChainSpecProvider, EthChainSpec}; use reth_ethereum_primitives::{Block, BlockBody}; use reth_primitives_traits::{block::TestBlock, RecoveredBlock, SealedBlock}; - use reth_static_file_types::StaticFileSegment; - use reth_storage_api::StorageLocation; #[test] fn test_consistent_view_extend() { - let provider_factory = create_test_provider_factory_with_chain_spec(MAINNET.clone()); + let provider_factory = create_test_provider_factory(); - let genesis_header = MAINNET.genesis_header(); - let genesis_block = - SealedBlock::::seal_parts(genesis_header.clone(), BlockBody::default()); + let genesis_block = SealedBlock::::seal_parts( + provider_factory.chain_spec().genesis_header().clone(), + BlockBody::default(), + ); let genesis_hash: B256 = genesis_block.hash(); let genesis_block = RecoveredBlock::new_sealed(genesis_block, vec![]); // insert the block let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.insert_block(genesis_block, StorageLocation::StaticFiles).unwrap(); + provider_rw.insert_block(genesis_block).unwrap(); provider_rw.commit().unwrap(); // create a consistent view provider and check that a ro provider can be made @@ -125,7 +121,7 @@ mod tests { // insert the block let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.insert_block(recovered_block, StorageLocation::StaticFiles).unwrap(); + provider_rw.insert_block(recovered_block).unwrap(); provider_rw.commit().unwrap(); // ensure successful creation of a read-only provider, based on this new db state. @@ -140,7 +136,7 @@ mod tests { // insert the block let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.insert_block(recovered_block, StorageLocation::StaticFiles).unwrap(); + provider_rw.insert_block(recovered_block).unwrap(); provider_rw.commit().unwrap(); // check that creation of a read-only provider still works @@ -149,18 +145,18 @@ mod tests { #[test] fn test_consistent_view_remove() { - let provider_factory = create_test_provider_factory_with_chain_spec(MAINNET.clone()); + let provider_factory = create_test_provider_factory(); - let genesis_header = MAINNET.genesis_header(); - let genesis_block = - SealedBlock::::seal_parts(genesis_header.clone(), BlockBody::default()); + let genesis_block = SealedBlock::::seal_parts( + provider_factory.chain_spec().genesis_header().clone(), + BlockBody::default(), + ); let genesis_hash: B256 = genesis_block.hash(); let genesis_block = RecoveredBlock::new_sealed(genesis_block, vec![]); // insert the block let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.insert_block(genesis_block, StorageLocation::Both).unwrap(); - provider_rw.0.static_file_provider().commit().unwrap(); + provider_rw.insert_block(genesis_block).unwrap(); provider_rw.commit().unwrap(); // create a consistent view provider and check that a ro provider can be made @@ -178,8 +174,7 @@ mod tests { // insert the block let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.insert_block(recovered_block, StorageLocation::Both).unwrap(); - provider_rw.0.static_file_provider().commit().unwrap(); + provider_rw.insert_block(recovered_block).unwrap(); provider_rw.commit().unwrap(); // create a second consistent view provider and check that a ro provider can be made @@ -191,10 +186,7 @@ mod tests { // remove the block above the genesis block let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.remove_blocks_above(0, StorageLocation::Both).unwrap(); - let sf_provider = provider_rw.0.static_file_provider(); - sf_provider.get_writer(1, StaticFileSegment::Headers).unwrap().prune_headers(1).unwrap(); - sf_provider.commit().unwrap(); + provider_rw.remove_blocks_above(0).unwrap(); provider_rw.commit().unwrap(); // ensure unsuccessful creation of a read-only provider, based on this new db state. @@ -216,8 +208,7 @@ mod tests { // reinsert the block at the same height, but with a different hash let provider_rw = provider_factory.provider_rw().unwrap(); - provider_rw.insert_block(recovered_block, StorageLocation::Both).unwrap(); - provider_rw.0.static_file_provider().commit().unwrap(); + provider_rw.insert_block(recovered_block).unwrap(); provider_rw.commit().unwrap(); // ensure unsuccessful creation of a read-only provider, based on this new db state. diff --git a/crates/storage/provider/src/providers/database/metrics.rs b/crates/storage/provider/src/providers/database/metrics.rs index 1d14ecc4bf0..4923b51db37 100644 --- a/crates/storage/provider/src/providers/database/metrics.rs +++ b/crates/storage/provider/src/providers/database/metrics.rs @@ -41,10 +41,7 @@ pub(crate) enum Action { InsertHashes, InsertHistoryIndices, UpdatePipelineStages, - InsertCanonicalHeaders, - InsertHeaders, InsertHeaderNumbers, - InsertHeaderTerminalDifficulties, InsertBlockBodyIndices, InsertTransactionBlocks, GetNextTxNum, @@ -66,13 +63,8 @@ struct DatabaseProviderMetrics { /// Duration of update pipeline stages update_pipeline_stages: Histogram, /// Duration of insert canonical headers - insert_canonical_headers: Histogram, - /// Duration of insert headers - insert_headers: Histogram, /// Duration of insert header numbers insert_header_numbers: Histogram, - /// Duration of insert header TD - insert_header_td: Histogram, /// Duration of insert block body indices insert_block_body_indices: Histogram, /// Duration of insert transaction blocks @@ -92,10 +84,7 @@ impl DatabaseProviderMetrics { Action::InsertHashes => self.insert_hashes.record(duration), Action::InsertHistoryIndices => self.insert_history_indices.record(duration), Action::UpdatePipelineStages => self.update_pipeline_stages.record(duration), - Action::InsertCanonicalHeaders => self.insert_canonical_headers.record(duration), - Action::InsertHeaders => self.insert_headers.record(duration), Action::InsertHeaderNumbers => self.insert_header_numbers.record(duration), - Action::InsertHeaderTerminalDifficulties => self.insert_header_td.record(duration), Action::InsertBlockBodyIndices => self.insert_block_body_indices.record(duration), Action::InsertTransactionBlocks => self.insert_tx_blocks.record(duration), Action::GetNextTxNum => self.get_next_tx_num.record(duration), diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 6a5b26ca6e6..c9a19936af8 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -621,7 +621,7 @@ mod tests { providers::{StaticFileProvider, StaticFileWriter}, test_utils::{blocks::TEST_BLOCK, create_test_provider_factory, MockNodeTypesWithDB}, BlockHashReader, BlockNumReader, BlockWriter, DBProvider, HeaderSyncGapProvider, - StorageLocation, TransactionsProvider, + TransactionsProvider, }; use alloy_primitives::{TxNumber, B256, U256}; use assert_matches::assert_matches; @@ -684,16 +684,12 @@ mod tests { #[test] fn insert_block_with_prune_modes() { - let factory = create_test_provider_factory(); - let block = TEST_BLOCK.clone(); + { + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); - assert_matches!( - provider - .insert_block(block.clone().try_recover().unwrap(), StorageLocation::Database), - Ok(_) - ); + assert_matches!(provider.insert_block(block.clone().try_recover().unwrap()), Ok(_)); assert_matches!( provider.transaction_sender(0), Ok(Some(sender)) if sender == block.body().transactions[0].recover_signer().unwrap() @@ -710,12 +706,9 @@ mod tests { transaction_lookup: Some(PruneMode::Full), ..PruneModes::none() }; + let factory = create_test_provider_factory(); let provider = factory.with_prune_modes(prune_modes).provider_rw().unwrap(); - assert_matches!( - provider - .insert_block(block.clone().try_recover().unwrap(), StorageLocation::Database), - Ok(_) - ); + assert_matches!(provider.insert_block(block.clone().try_recover().unwrap()), Ok(_)); assert_matches!(provider.transaction_sender(0), Ok(None)); assert_matches!( provider.transaction_id(*block.body().transactions[0].tx_hash()), @@ -726,21 +719,16 @@ mod tests { #[test] fn take_block_transaction_range_recover_senders() { - let factory = create_test_provider_factory(); - let mut rng = generators::rng(); let block = random_block(&mut rng, 0, BlockParams { tx_count: Some(3), ..Default::default() }); let tx_ranges: Vec> = vec![0..=0, 1..=1, 2..=2, 0..=1, 1..=2]; for range in tx_ranges { + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); - assert_matches!( - provider - .insert_block(block.clone().try_recover().unwrap(), StorageLocation::Database), - Ok(_) - ); + assert_matches!(provider.insert_block(block.clone().try_recover().unwrap()), Ok(_)); let senders = provider.take::(range.clone()); assert_eq!( diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 6a16dbcbf5f..bdde71b2ce1 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -15,12 +15,12 @@ use crate::{ HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, - StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, + StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter, }; use alloy_consensus::{ transaction::{SignerRecoverable, TransactionMeta, TxHashRef}, - BlockHeader, Header, TxReceipt, + BlockHeader, TxReceipt, }; use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber}; use alloy_primitives::{ @@ -30,6 +30,7 @@ use alloy_primitives::{ }; use itertools::Itertools; use rayon::slice::ParallelSliceMut; +use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates}; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, EthereumHardforks}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, @@ -46,8 +47,8 @@ use reth_db_api::{ use reth_execution_types::{Chain, ExecutionOutcome}; use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy}; use reth_primitives_traits::{ - Account, Block as _, BlockBody as _, Bytecode, GotExpected, NodePrimitives, RecoveredBlock, - SealedHeader, StorageEntry, + Account, Block as _, BlockBody as _, Bytecode, GotExpected, RecoveredBlock, SealedHeader, + StorageEntry, }; use reth_prune_types::{ PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE, @@ -72,7 +73,7 @@ use std::{ cmp::Ordering, collections::{BTreeMap, BTreeSet}, fmt::Debug, - ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive}, + ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeInclusive}, sync::{mpsc, Arc}, }; use tracing::{debug, trace}; @@ -252,6 +253,64 @@ impl AsRef for DatabaseProvider { } impl DatabaseProvider { + /// Writes executed blocks and state to storage. + pub fn save_blocks( + &self, + blocks: Vec>, + ) -> ProviderResult<()> { + if blocks.is_empty() { + debug!(target: "providers::db", "Attempted to write empty block range"); + return Ok(()) + } + + // NOTE: checked non-empty above + let first_block = blocks.first().unwrap().recovered_block(); + + let last_block = blocks.last().unwrap().recovered_block(); + let first_number = first_block.number(); + let last_block_number = last_block.number(); + + debug!(target: "providers::db", block_count = %blocks.len(), "Writing blocks and execution data to storage"); + + // TODO: Do performant / batched writes for each type of object + // instead of a loop over all blocks, + // meaning: + // * blocks + // * state + // * hashed state + // * trie updates (cannot naively extend, need helper) + // * indices (already done basically) + // Insert the blocks + for ExecutedBlockWithTrieUpdates { + block: ExecutedBlock { recovered_block, execution_output, hashed_state }, + trie, + } in blocks + { + let block_hash = recovered_block.hash(); + self.insert_block(Arc::unwrap_or_clone(recovered_block))?; + + // Write state and changesets to the database. + // Must be written after blocks because of the receipt lookup. + self.write_state(&execution_output, OriginalValuesKnown::No)?; + + // insert hashes and intermediate merkle nodes + self.write_hashed_state(&Arc::unwrap_or_clone(hashed_state).into_sorted())?; + self.write_trie_updates( + trie.as_ref().ok_or(ProviderError::MissingTrieUpdates(block_hash))?, + )?; + } + + // update history indices + self.update_history_indices(first_number..=last_block_number)?; + + // Update pipeline progress + self.update_pipeline_stages(last_block_number, false)?; + + debug!(target: "providers::db", range = ?first_number..=last_block_number, "Appended block data"); + + Ok(()) + } + /// Unwinds trie state for the given range. /// /// This includes calculating the resulted state root and comparing it with the parent block @@ -344,14 +403,11 @@ impl DatabaseProvider ProviderResult<()> { - if remove_from.database() { - // iterate over block body and remove receipts - self.remove::>>(from_tx..)?; - } + // iterate over block body and remove receipts + self.remove::>>(from_tx..)?; - if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() { + if !self.prune_modes.has_receipts_pruning() { let static_file_receipt_num = self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Receipts); @@ -410,44 +466,6 @@ impl TryIntoHistoricalStateProvider for Databa } } -impl< - Tx: DbTx + DbTxMut + 'static, - N: NodeTypesForProvider>, - > DatabaseProvider -{ - // TODO: uncomment below, once `reth debug_cmd` has been feature gated with dev. - // #[cfg(any(test, feature = "test-utils"))] - /// Inserts an historical block. **Used for setting up test environments** - pub fn insert_historical_block( - &self, - block: RecoveredBlock<::Block>, - ) -> ProviderResult { - let ttd = if block.number() == 0 { - block.header().difficulty() - } else { - let parent_block_number = block.number() - 1; - let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default(); - parent_ttd + block.header().difficulty() - }; - - let mut writer = self.static_file_provider.latest_writer(StaticFileSegment::Headers)?; - - // Backfill: some tests start at a forward block number, but static files require no gaps. - let segment_header = writer.user_header(); - if segment_header.block_end().is_none() && segment_header.expected_block_start() == 0 { - for block_number in 0..block.number() { - let mut prev = block.clone_header(); - prev.number = block_number; - writer.append_header(&prev, U256::ZERO, &B256::ZERO)?; - } - } - - writer.append_header(block.header(), ttd, &block.hash())?; - - self.insert_block(block, StorageLocation::Database) - } -} - /// For a given key, unwind all history shards that contain block numbers at or above the given /// block number. /// @@ -800,11 +818,6 @@ impl DatabaseProvider { } impl DatabaseProvider { - /// Commit database transaction. - pub fn commit(self) -> ProviderResult { - Ok(self.tx.commit()?) - } - /// Load shard and remove it. If list is empty, last shard was full or /// there are no shards at all. fn take_shard( @@ -1773,7 +1786,6 @@ impl StateWriter &self, execution_outcome: &ExecutionOutcome, is_value_known: OriginalValuesKnown, - write_receipts_to: StorageLocation, ) -> ProviderResult<()> { let first_block = execution_outcome.first_block(); let block_count = execution_outcome.len() as u64; @@ -1809,15 +1821,13 @@ impl StateWriter // // We are writing to database if requested or if there's any kind of receipt pruning // configured - let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning) - .then(|| self.tx.cursor_write::>()) - .transpose()?; + let mut receipts_cursor = self.tx.cursor_write::>()?; // Prepare receipts static writer if we are going to write receipts to static files // // We are writing to static files if requested and if there's no receipt pruning configured - let mut receipts_static_writer = (write_receipts_to.static_files() && - !has_receipts_pruning) + let mut receipts_static_writer = has_receipts_pruning + .not() .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)) .transpose()?; @@ -1874,9 +1884,7 @@ impl StateWriter writer.append_receipt(receipt_idx, receipt)?; } - if let Some(cursor) = &mut receipts_cursor { - cursor.append(receipt_idx, receipt)?; - } + receipts_cursor.append(receipt_idx, receipt)?; } } @@ -2071,11 +2079,7 @@ impl StateWriter /// 1. Take the old value from the changeset /// 2. Take the new value from the local state /// 3. Set the local state to the value in the changeset - fn remove_state_above( - &self, - block: BlockNumber, - remove_receipts_from: StorageLocation, - ) -> ProviderResult<()> { + fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()> { let range = block + 1..=self.last_block_number()?; if range.is_empty() { @@ -2140,7 +2144,7 @@ impl StateWriter } } - self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?; + self.remove_receipts_from(from_transaction_num, block)?; Ok(()) } @@ -2169,7 +2173,6 @@ impl StateWriter fn take_state_above( &self, block: BlockNumber, - remove_receipts_from: StorageLocation, ) -> ProviderResult> { let range = block + 1..=self.last_block_number()?; @@ -2275,7 +2278,7 @@ impl StateWriter receipts.push(block_receipts); } - self.remove_receipts_from(from_transaction_num, block, remove_receipts_from)?; + self.remove_receipts_from(from_transaction_num, block)?; Ok(ExecutionOutcome::new_init( state, @@ -2650,20 +2653,19 @@ impl BlockExecu fn take_block_and_execution_above( &self, block: BlockNumber, - remove_from: StorageLocation, ) -> ProviderResult> { let range = block + 1..=self.last_block_number()?; self.unwind_trie_state_range(range.clone())?; // get execution res - let execution_state = self.take_state_above(block, remove_from)?; + let execution_state = self.take_state_above(block)?; let blocks = self.recovered_block_range(range)?; // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - self.remove_blocks_above(block, remove_from)?; + self.remove_blocks_above(block)?; // Update pipeline progress self.update_pipeline_stages(block, true)?; @@ -2671,21 +2673,17 @@ impl BlockExecu Ok(Chain::new(blocks, execution_state, None)) } - fn remove_block_and_execution_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()> { + fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> { let range = block + 1..=self.last_block_number()?; self.unwind_trie_state_range(range)?; // remove execution res - self.remove_state_above(block, remove_from)?; + self.remove_state_above(block)?; // remove block bodies it is needed for both get block range and get block execution results // that is why it is deleted afterwards. - self.remove_blocks_above(block, remove_from)?; + self.remove_blocks_above(block)?; // Update pipeline progress self.update_pipeline_stages(block, true)?; @@ -2723,7 +2721,6 @@ impl BlockWrite fn insert_block( &self, block: RecoveredBlock, - write_to: StorageLocation, ) -> ProviderResult { let block_number = block.number(); @@ -2739,23 +2736,9 @@ impl BlockWrite parent_ttd + block.header().difficulty() }; - if write_to.database() { - self.tx.put::(block_number, block.hash())?; - durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders); - - // Put header with canonical hashes. - self.tx.put::>>(block_number, block.header().clone())?; - durations_recorder.record_relative(metrics::Action::InsertHeaders); - - self.tx.put::(block_number, ttd.into())?; - durations_recorder.record_relative(metrics::Action::InsertHeaderTerminalDifficulties); - } - - if write_to.static_files() { - let mut writer = - self.static_file_provider.get_writer(block_number, StaticFileSegment::Headers)?; - writer.append_header(block.header(), ttd, &block.hash())?; - } + self.static_file_provider + .get_writer(block_number, StaticFileSegment::Headers)? + .append_header(block.header(), ttd, &block.hash())?; self.tx.put::(block.hash(), block_number)?; durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers); @@ -2785,7 +2768,7 @@ impl BlockWrite next_tx_num += 1; } - self.append_block_bodies(vec![(block_number, Some(block.into_body()))], write_to)?; + self.append_block_bodies(vec![(block_number, Some(block.into_body()))])?; debug!( target: "providers::db", @@ -2800,35 +2783,22 @@ impl BlockWrite fn append_block_bodies( &self, bodies: Vec<(BlockNumber, Option>)>, - write_to: StorageLocation, ) -> ProviderResult<()> { let Some(from_block) = bodies.first().map(|(block, _)| *block) else { return Ok(()) }; // Initialize writer if we will be writing transactions to staticfiles - let mut tx_static_writer = write_to - .static_files() - .then(|| { - self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions) - }) - .transpose()?; + let mut tx_writer = + self.static_file_provider.get_writer(from_block, StaticFileSegment::Transactions)?; let mut block_indices_cursor = self.tx.cursor_write::()?; let mut tx_block_cursor = self.tx.cursor_write::()?; - // Initialize cursor if we will be writing transactions to database - let mut tx_cursor = write_to - .database() - .then(|| self.tx.cursor_write::>>()) - .transpose()?; - // Get id for the next tx_num or zero if there are no transactions. let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default(); for (block_number, body) in &bodies { // Increment block on static file header. - if let Some(writer) = tx_static_writer.as_mut() { - writer.increment_block(*block_number)?; - } + tx_writer.increment_block(*block_number)?; let tx_count = body.as_ref().map(|b| b.transactions().len() as u64).unwrap_or_default(); let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count }; @@ -2850,37 +2820,34 @@ impl BlockWrite // write transactions for transaction in body.transactions() { - if let Some(writer) = tx_static_writer.as_mut() { - writer.append_transaction(next_tx_num, transaction)?; - } - if let Some(cursor) = tx_cursor.as_mut() { - cursor.append(next_tx_num, transaction)?; - } + tx_writer.append_transaction(next_tx_num, transaction)?; // Increment transaction id for each transaction. next_tx_num += 1; } } - self.storage.writer().write_block_bodies(self, bodies, write_to)?; + self.storage.writer().write_block_bodies(self, bodies)?; Ok(()) } - fn remove_blocks_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()> { - for hash in self.canonical_hashes_range(block + 1, self.last_block_number()? + 1)? { - self.tx.delete::(hash, None)?; - } + fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()> { + // Get highest static file block for the total block range + let highest_static_file_block = self + .static_file_provider() + .get_highest_static_file_block(StaticFileSegment::Headers) + .expect("todo: error handling, headers should exist"); - // Only prune canonical headers after we've removed the block hashes as we rely on data from - // this table in `canonical_hashes_range`. - self.remove::(block + 1..)?; - self.remove::>>(block + 1..)?; - self.remove::(block + 1..)?; + // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure + // we remove only what is ABOVE the block. + // + // i.e., if the highest static file block is 8, we want to remove above block 5 only, we + // will have three blocks to remove, which will be block 8, 7, and 6. + debug!(target: "providers::db", ?block, "Removing static file blocks above block_number"); + self.static_file_provider() + .get_writer(block, StaticFileSegment::Headers)? + .prune_headers(highest_static_file_block.saturating_sub(block))?; // First transaction to be removed let unwind_tx_from = self @@ -2906,17 +2873,13 @@ impl BlockWrite self.remove::(unwind_tx_from..)?; - self.remove_bodies_above(block, remove_from)?; + self.remove_bodies_above(block)?; Ok(()) } - fn remove_bodies_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()> { - self.storage.writer().remove_block_bodies_above(self, block, remove_from)?; + fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()> { + self.storage.writer().remove_block_bodies_above(self, block)?; // First transaction to be removed let unwind_tx_from = self @@ -2927,23 +2890,16 @@ impl BlockWrite self.remove::(block + 1..)?; self.remove::(unwind_tx_from..)?; - if remove_from.database() { - self.remove::>>(unwind_tx_from..)?; - } + let static_file_tx_num = + self.static_file_provider.get_highest_static_file_tx(StaticFileSegment::Transactions); - if remove_from.static_files() { - let static_file_tx_num = self - .static_file_provider - .get_highest_static_file_tx(StaticFileSegment::Transactions); + let to_delete = static_file_tx_num + .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from)) + .unwrap_or_default(); - let to_delete = static_file_tx_num - .map(|static_tx| (static_tx + 1).saturating_sub(unwind_tx_from)) - .unwrap_or_default(); - - self.static_file_provider - .latest_writer(StaticFileSegment::Transactions)? - .prune_transactions(to_delete, block)?; - } + self.static_file_provider + .latest_writer(StaticFileSegment::Transactions)? + .prune_transactions(to_delete, block)?; Ok(()) } @@ -2972,11 +2928,11 @@ impl BlockWrite // Insert the blocks for block in blocks { - self.insert_block(block, StorageLocation::Database)?; + self.insert_block(block)?; durations_recorder.record_relative(metrics::Action::InsertBlock); } - self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?; + self.write_state(execution_outcome, OriginalValuesKnown::No)?; durations_recorder.record_relative(metrics::Action::InsertState); // insert hashes and intermediate merkle nodes @@ -3094,6 +3050,23 @@ impl DBProvider for DatabaseProvider fn prune_modes_ref(&self) -> &PruneModes { self.prune_modes_ref() } + + /// Commit database transaction and static files. + fn commit(self) -> ProviderResult { + // For unwinding it makes more sense to commit the database first, since if + // it is interrupted before the static files commit, we can just + // truncate the static files according to the + // checkpoints on the next start-up. + if self.static_file_provider.has_unwind_queued() { + self.tx.commit()?; + self.static_file_provider.commit()?; + } else { + self.static_file_provider.commit()?; + self.tx.commit()?; + } + + Ok(true) + } } #[cfg(test)] @@ -3133,22 +3106,15 @@ mod tests { let data = BlockchainTestData::default(); let provider_rw = factory.provider_rw().unwrap(); - provider_rw - .insert_block( - data.genesis.clone().try_recover().unwrap(), - crate::StorageLocation::Database, - ) - .unwrap(); - provider_rw - .insert_block(data.blocks[0].0.clone(), crate::StorageLocation::Database) - .unwrap(); + provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap(); provider_rw .write_state( - &data.blocks[0].1, + &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, crate::OriginalValuesKnown::No, - crate::StorageLocation::Database, ) .unwrap(); + provider_rw.insert_block(data.blocks[0].0.clone()).unwrap(); + provider_rw.write_state(&data.blocks[0].1, crate::OriginalValuesKnown::No).unwrap(); provider_rw.commit().unwrap(); let provider = factory.provider().unwrap(); @@ -3166,23 +3132,16 @@ mod tests { let data = BlockchainTestData::default(); let provider_rw = factory.provider_rw().unwrap(); + provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap(); provider_rw - .insert_block( - data.genesis.clone().try_recover().unwrap(), - crate::StorageLocation::Database, + .write_state( + &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, + crate::OriginalValuesKnown::No, ) .unwrap(); for i in 0..3 { - provider_rw - .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database) - .unwrap(); - provider_rw - .write_state( - &data.blocks[i].1, - crate::OriginalValuesKnown::No, - crate::StorageLocation::Database, - ) - .unwrap(); + provider_rw.insert_block(data.blocks[i].0.clone()).unwrap(); + provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); } provider_rw.commit().unwrap(); @@ -3203,25 +3162,18 @@ mod tests { let data = BlockchainTestData::default(); let provider_rw = factory.provider_rw().unwrap(); + provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap(); provider_rw - .insert_block( - data.genesis.clone().try_recover().unwrap(), - crate::StorageLocation::Database, + .write_state( + &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, + crate::OriginalValuesKnown::No, ) .unwrap(); // insert blocks 1-3 with receipts for i in 0..3 { - provider_rw - .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database) - .unwrap(); - provider_rw - .write_state( - &data.blocks[i].1, - crate::OriginalValuesKnown::No, - crate::StorageLocation::Database, - ) - .unwrap(); + provider_rw.insert_block(data.blocks[i].0.clone()).unwrap(); + provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); } provider_rw.commit().unwrap(); @@ -3241,23 +3193,16 @@ mod tests { let data = BlockchainTestData::default(); let provider_rw = factory.provider_rw().unwrap(); + provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap(); provider_rw - .insert_block( - data.genesis.clone().try_recover().unwrap(), - crate::StorageLocation::Database, + .write_state( + &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, + crate::OriginalValuesKnown::No, ) .unwrap(); for i in 0..3 { - provider_rw - .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database) - .unwrap(); - provider_rw - .write_state( - &data.blocks[i].1, - crate::OriginalValuesKnown::No, - crate::StorageLocation::Database, - ) - .unwrap(); + provider_rw.insert_block(data.blocks[i].0.clone()).unwrap(); + provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); } provider_rw.commit().unwrap(); @@ -3284,7 +3229,7 @@ mod tests { // create blocks with no transactions let mut blocks = Vec::new(); - for i in 1..=3 { + for i in 0..3 { let block = random_block(&mut rng, i, BlockParams { tx_count: Some(0), ..Default::default() }); blocks.push(block); @@ -3292,9 +3237,7 @@ mod tests { let provider_rw = factory.provider_rw().unwrap(); for block in blocks { - provider_rw - .insert_block(block.try_recover().unwrap(), crate::StorageLocation::Database) - .unwrap(); + provider_rw.insert_block(block.try_recover().unwrap()).unwrap(); } provider_rw.commit().unwrap(); @@ -3313,23 +3256,16 @@ mod tests { let data = BlockchainTestData::default(); let provider_rw = factory.provider_rw().unwrap(); + provider_rw.insert_block(data.genesis.clone().try_recover().unwrap()).unwrap(); provider_rw - .insert_block( - data.genesis.clone().try_recover().unwrap(), - crate::StorageLocation::Database, + .write_state( + &ExecutionOutcome { first_block: 0, receipts: vec![vec![]], ..Default::default() }, + crate::OriginalValuesKnown::No, ) .unwrap(); for i in 0..3 { - provider_rw - .insert_block(data.blocks[i].0.clone(), crate::StorageLocation::Database) - .unwrap(); - provider_rw - .write_state( - &data.blocks[i].1, - crate::OriginalValuesKnown::No, - crate::StorageLocation::Database, - ) - .unwrap(); + provider_rw.insert_block(data.blocks[i].0.clone()).unwrap(); + provider_rw.write_state(&data.blocks[i].1, crate::OriginalValuesKnown::No).unwrap(); } provider_rw.commit().unwrap(); diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index fc2c94a1ba1..6af6313f6a2 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1332,6 +1332,9 @@ pub trait StaticFileWriter { /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`]. fn commit(&self) -> ProviderResult<()>; + + /// Returns `true` if the static file provider has unwind queued. + fn has_unwind_queued(&self) -> bool; } impl StaticFileWriter for StaticFileProvider { @@ -1362,6 +1365,10 @@ impl StaticFileWriter for StaticFileProvider { fn commit(&self) -> ProviderResult<()> { self.writers.commit() } + + fn has_unwind_queued(&self) -> bool { + self.writers.has_unwind_queued() + } } impl> HeaderProvider for StaticFileProvider { diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index a8e2e603c5b..b9c17f82920 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -69,6 +69,18 @@ impl StaticFileWriters { } Ok(()) } + + pub(crate) fn has_unwind_queued(&self) -> bool { + for writer_lock in [&self.headers, &self.transactions, &self.receipts] { + let writer = writer_lock.read(); + if let Some(writer) = writer.as_ref() && + writer.will_prune_on_commit() + { + return true + } + } + false + } } /// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`]. @@ -214,6 +226,11 @@ impl StaticFileProviderRW { Ok(()) } + /// Returns `true` if the writer will prune on commit. + pub const fn will_prune_on_commit(&self) -> bool { + self.prune_on_commit.is_some() + } + /// Commits configuration changes to disk and updates the reader index with the new changes. pub fn commit(&mut self) -> ProviderResult<()> { let start = Instant::now(); diff --git a/crates/storage/provider/src/test_utils/blocks.rs b/crates/storage/provider/src/test_utils/blocks.rs index 13a05e7921f..a0886ca6286 100644 --- a/crates/storage/provider/src/test_utils/blocks.rs +++ b/crates/storage/provider/src/test_utils/blocks.rs @@ -85,7 +85,7 @@ pub(crate) static TEST_BLOCK: LazyLock DBProvider self.tx } + fn commit(self) -> ProviderResult { + Ok(self.tx.commit()?) + } + fn prune_modes_ref(&self) -> &PruneModes { &self.prune_modes } diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 02f5bdabd76..1151990f97b 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,232 +1,5 @@ -use crate::{ - providers::{StaticFileProvider, StaticFileWriter as SfWriter}, - BlockExecutionWriter, BlockWriter, HistoryWriter, StateWriter, StaticFileProviderFactory, - StorageLocation, TrieWriter, -}; -use alloy_consensus::BlockHeader; -use reth_chain_state::{ExecutedBlock, ExecutedBlockWithTrieUpdates}; -use reth_db_api::transaction::{DbTx, DbTxMut}; -use reth_errors::{ProviderError, ProviderResult}; -use reth_primitives_traits::{NodePrimitives, SignedTransaction}; -use reth_static_file_types::StaticFileSegment; -use reth_storage_api::{DBProvider, StageCheckpointWriter, TransactionsProviderExt}; -use reth_storage_errors::writer::UnifiedStorageWriterError; -use revm_database::OriginalValuesKnown; -use std::sync::Arc; -use tracing::debug; - -/// [`UnifiedStorageWriter`] is responsible for managing the writing to storage with both database -/// and static file providers. -#[derive(Debug)] -pub struct UnifiedStorageWriter<'a, ProviderDB, ProviderSF> { - database: &'a ProviderDB, - static_file: Option, -} - -impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF> { - /// Creates a new instance of [`UnifiedStorageWriter`]. - /// - /// # Parameters - /// - `database`: An optional reference to a database provider. - /// - `static_file`: An optional mutable reference to a static file instance. - pub const fn new(database: &'a ProviderDB, static_file: Option) -> Self { - Self { database, static_file } - } - - /// Creates a new instance of [`UnifiedStorageWriter`] from a database provider and a static - /// file instance. - pub fn from

(database: &'a P, static_file: ProviderSF) -> Self - where - P: AsRef, - { - Self::new(database.as_ref(), Some(static_file)) - } - - /// Creates a new instance of [`UnifiedStorageWriter`] from a database provider. - pub fn from_database

(database: &'a P) -> Self - where - P: AsRef, - { - Self::new(database.as_ref(), None) - } - - /// Returns a reference to the database writer. - /// - /// # Panics - /// If the database provider is not set. - const fn database(&self) -> &ProviderDB { - self.database - } - - /// Returns a reference to the static file instance. - /// - /// # Panics - /// If the static file instance is not set. - const fn static_file(&self) -> &ProviderSF { - self.static_file.as_ref().expect("should exist") - } - - /// Ensures that the static file instance is set. - /// - /// # Returns - /// - `Ok(())` if the static file instance is set. - /// - `Err(StorageWriterError::MissingStaticFileWriter)` if the static file instance is not set. - #[expect(unused)] - const fn ensure_static_file(&self) -> Result<(), UnifiedStorageWriterError> { - if self.static_file.is_none() { - return Err(UnifiedStorageWriterError::MissingStaticFileWriter) - } - Ok(()) - } -} - -impl UnifiedStorageWriter<'_, (), ()> { - /// Commits both storage types in the right order. - /// - /// For non-unwinding operations it makes more sense to commit the static files first, since if - /// it is interrupted before the database commit, we can just truncate - /// the static files according to the checkpoints on the next - /// start-up. - /// - /// NOTE: If unwinding data from storage, use `commit_unwind` instead! - pub fn commit

(provider: P) -> ProviderResult<()> - where - P: DBProvider + StaticFileProviderFactory, - { - let static_file = provider.static_file_provider(); - static_file.commit()?; - provider.commit()?; - Ok(()) - } - - /// Commits both storage types in the right order for an unwind operation. - /// - /// For unwinding it makes more sense to commit the database first, since if - /// it is interrupted before the static files commit, we can just - /// truncate the static files according to the - /// checkpoints on the next start-up. - /// - /// NOTE: Should only be used after unwinding data from storage! - pub fn commit_unwind

(provider: P) -> ProviderResult<()> - where - P: DBProvider + StaticFileProviderFactory, - { - let static_file = provider.static_file_provider(); - provider.commit()?; - static_file.commit()?; - Ok(()) - } -} - -impl UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider> -where - ProviderDB: DBProvider - + BlockWriter - + TransactionsProviderExt - + TrieWriter - + StateWriter - + HistoryWriter - + StageCheckpointWriter - + BlockExecutionWriter - + AsRef - + StaticFileProviderFactory, -{ - /// Writes executed blocks and receipts to storage. - pub fn save_blocks(&self, blocks: Vec>) -> ProviderResult<()> - where - N: NodePrimitives, - ProviderDB: BlockWriter + StateWriter, - { - if blocks.is_empty() { - debug!(target: "provider::storage_writer", "Attempted to write empty block range"); - return Ok(()) - } - - // NOTE: checked non-empty above - let first_block = blocks.first().unwrap().recovered_block(); - - let last_block = blocks.last().unwrap().recovered_block(); - let first_number = first_block.number(); - let last_block_number = last_block.number(); - - debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage"); - - // TODO: Do performant / batched writes for each type of object - // instead of a loop over all blocks, - // meaning: - // * blocks - // * state - // * hashed state - // * trie updates (cannot naively extend, need helper) - // * indices (already done basically) - // Insert the blocks - for ExecutedBlockWithTrieUpdates { - block: ExecutedBlock { recovered_block, execution_output, hashed_state }, - trie, - } in blocks - { - let block_hash = recovered_block.hash(); - self.database() - .insert_block(Arc::unwrap_or_clone(recovered_block), StorageLocation::Both)?; - - // Write state and changesets to the database. - // Must be written after blocks because of the receipt lookup. - self.database().write_state( - &execution_output, - OriginalValuesKnown::No, - StorageLocation::StaticFiles, - )?; - - // insert hashes and intermediate merkle nodes - self.database() - .write_hashed_state(&Arc::unwrap_or_clone(hashed_state).into_sorted())?; - self.database().write_trie_updates( - trie.as_ref().ok_or(ProviderError::MissingTrieUpdates(block_hash))?, - )?; - } - - // update history indices - self.database().update_history_indices(first_number..=last_block_number)?; - - // Update pipeline progress - self.database().update_pipeline_stages(last_block_number, false)?; - - debug!(target: "provider::storage_writer", range = ?first_number..=last_block_number, "Appended block data"); - - Ok(()) - } - - /// Removes all block, transaction and receipt data above the given block number from the - /// database and static files. This is exclusive, i.e., it only removes blocks above - /// `block_number`, and does not remove `block_number`. - pub fn remove_blocks_above(&self, block_number: u64) -> ProviderResult<()> { - // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block - debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number"); - self.database().remove_block_and_execution_above(block_number, StorageLocation::Both)?; - - // Get highest static file block for the total block range - let highest_static_file_block = self - .static_file() - .get_highest_static_file_block(StaticFileSegment::Headers) - .expect("todo: error handling, headers should exist"); - - // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure - // we remove only what is ABOVE the block. - // - // i.e., if the highest static file block is 8, we want to remove above block 5 only, we - // will have three blocks to remove, which will be block 8, 7, and 6. - debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number"); - self.static_file() - .get_writer(block_number, StaticFileSegment::Headers)? - .prune_headers(highest_static_file_block.saturating_sub(block_number))?; - - Ok(()) - } -} - #[cfg(test)] mod tests { - use super::*; use crate::{ test_utils::create_test_provider_factory, AccountReader, StorageTrieWriter, TrieWriter, }; @@ -240,7 +13,7 @@ mod tests { use reth_ethereum_primitives::Receipt; use reth_execution_types::ExecutionOutcome; use reth_primitives_traits::{Account, StorageEntry}; - use reth_storage_api::{DatabaseProviderFactory, HashedPostStateProvider}; + use reth_storage_api::{DatabaseProviderFactory, HashedPostStateProvider, StateWriter}; use reth_trie::{ test_utils::{state_root, storage_root_prehashed}, HashedPostState, HashedStorage, StateRoot, StorageRoot, StorageRootProgress, @@ -250,7 +23,7 @@ mod tests { states::{ bundle_state::BundleRetention, changes::PlainStorageRevert, PlainStorageChangeset, }, - BundleState, State, + BundleState, OriginalValuesKnown, State, }; use revm_database_interface::{DatabaseCommit, EmptyDB}; use revm_state::{ @@ -507,7 +280,7 @@ mod tests { let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(&outcome, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); // Check plain storage state @@ -607,7 +380,7 @@ mod tests { state.merge_transitions(BundleRetention::Reverts); let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 2, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(&outcome, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); assert_eq!( @@ -675,7 +448,7 @@ mod tests { let outcome = ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(&outcome, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -834,7 +607,7 @@ mod tests { let outcome: ExecutionOutcome = ExecutionOutcome::new(bundle, Default::default(), 1, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(&outcome, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider @@ -1000,7 +773,7 @@ mod tests { let outcome = ExecutionOutcome::new(init_state.take_bundle(), Default::default(), 0, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(&outcome, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -1049,7 +822,7 @@ mod tests { state.merge_transitions(BundleRetention::Reverts); let outcome = ExecutionOutcome::new(state.take_bundle(), Default::default(), 1, Vec::new()); provider - .write_state(&outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(&outcome, OriginalValuesKnown::Yes) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider diff --git a/crates/storage/rpc-provider/src/lib.rs b/crates/storage/rpc-provider/src/lib.rs index ba0ba66724c..00b19df49dc 100644 --- a/crates/storage/rpc-provider/src/lib.rs +++ b/crates/storage/rpc-provider/src/lib.rs @@ -1369,6 +1369,10 @@ where self } + fn commit(self) -> ProviderResult { + unimplemented!("commit not supported for RPC provider") + } + fn prune_modes_ref(&self) -> &reth_prune_types::PruneModes { unimplemented!("prune modes not supported for RPC provider") } diff --git a/crates/storage/storage-api/src/block_writer.rs b/crates/storage/storage-api/src/block_writer.rs index 476b0bd8dbc..3bbde88d3ed 100644 --- a/crates/storage/storage-api/src/block_writer.rs +++ b/crates/storage/storage-api/src/block_writer.rs @@ -1,4 +1,4 @@ -use crate::{NodePrimitivesProvider, StorageLocation}; +use crate::NodePrimitivesProvider; use alloc::vec::Vec; use alloy_primitives::BlockNumber; use reth_db_models::StoredBlockBodyIndices; @@ -14,43 +14,27 @@ pub trait BlockExecutionWriter: /// Take all of the blocks above the provided number and their execution result /// /// The passed block number will stay in the database. - /// - /// Accepts [`StorageLocation`] specifying from where should transactions and receipts be - /// removed. fn take_block_and_execution_above( &self, block: BlockNumber, - remove_from: StorageLocation, ) -> ProviderResult>; /// Remove all of the blocks above the provided number and their execution result /// /// The passed block number will stay in the database. - /// - /// Accepts [`StorageLocation`] specifying from where should transactions and receipts be - /// removed. - fn remove_block_and_execution_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()>; + fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()>; } impl BlockExecutionWriter for &T { fn take_block_and_execution_above( &self, block: BlockNumber, - remove_from: StorageLocation, ) -> ProviderResult> { - (*self).take_block_and_execution_above(block, remove_from) + (*self).take_block_and_execution_above(block) } - fn remove_block_and_execution_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()> { - (*self).remove_block_and_execution_above(block, remove_from) + fn remove_block_and_execution_above(&self, block: BlockNumber) -> ProviderResult<()> { + (*self).remove_block_and_execution_above(block) } } @@ -67,13 +51,9 @@ pub trait BlockWriter: Send + Sync { /// /// Return [`StoredBlockBodyIndices`] that contains indices of the first and last transactions /// and transition in the block. - /// - /// Accepts [`StorageLocation`] value which specifies where transactions and headers should be - /// written. fn insert_block( &self, block: RecoveredBlock, - write_to: StorageLocation, ) -> ProviderResult; /// Appends a batch of block bodies extending the canonical chain. This is invoked during @@ -84,24 +64,15 @@ pub trait BlockWriter: Send + Sync { fn append_block_bodies( &self, bodies: Vec<(BlockNumber, Option<::Body>)>, - write_to: StorageLocation, ) -> ProviderResult<()>; /// Removes all blocks above the given block number from the database. /// /// Note: This does not remove state or execution data. - fn remove_blocks_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()>; + fn remove_blocks_above(&self, block: BlockNumber) -> ProviderResult<()>; /// Removes all block bodies above the given block number from the database. - fn remove_bodies_above( - &self, - block: BlockNumber, - remove_from: StorageLocation, - ) -> ProviderResult<()>; + fn remove_bodies_above(&self, block: BlockNumber) -> ProviderResult<()>; /// Appends a batch of sealed blocks to the blockchain, including sender information, and /// updates the post-state. diff --git a/crates/storage/storage-api/src/chain.rs b/crates/storage/storage-api/src/chain.rs index e78a59afefc..f093ed99af6 100644 --- a/crates/storage/storage-api/src/chain.rs +++ b/crates/storage/storage-api/src/chain.rs @@ -1,4 +1,4 @@ -use crate::{DBProvider, StorageLocation}; +use crate::DBProvider; use alloc::vec::Vec; use alloy_consensus::Header; use alloy_primitives::BlockNumber; @@ -29,7 +29,6 @@ pub trait BlockBodyWriter { &self, provider: &Provider, bodies: Vec<(BlockNumber, Option)>, - write_to: StorageLocation, ) -> ProviderResult<()>; /// Removes all block bodies above the given block number from the database. @@ -37,7 +36,6 @@ pub trait BlockBodyWriter { &self, provider: &Provider, block: BlockNumber, - remove_from: StorageLocation, ) -> ProviderResult<()>; } @@ -105,7 +103,6 @@ where &self, provider: &Provider, bodies: Vec<(u64, Option>)>, - _write_to: StorageLocation, ) -> ProviderResult<()> { let mut ommers_cursor = provider.tx_ref().cursor_write::>()?; let mut withdrawals_cursor = @@ -134,7 +131,6 @@ where &self, provider: &Provider, block: BlockNumber, - _remove_from: StorageLocation, ) -> ProviderResult<()> { provider.tx_ref().unwind_table_by_num::(block)?; provider.tx_ref().unwind_table_by_num::>(block)?; @@ -218,7 +214,6 @@ where &self, _provider: &Provider, _bodies: Vec<(u64, Option>)>, - _write_to: StorageLocation, ) -> ProviderResult<()> { // noop Ok(()) @@ -228,7 +223,6 @@ where &self, _provider: &Provider, _block: BlockNumber, - _remove_from: StorageLocation, ) -> ProviderResult<()> { // noop Ok(()) diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index 0d736c00e15..c0e94a044bf 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -37,9 +37,7 @@ pub trait DBProvider: Sized { } /// Commit database transaction - fn commit(self) -> ProviderResult { - Ok(self.into_tx().commit()?) - } + fn commit(self) -> ProviderResult; /// Returns a reference to prune modes. fn prune_modes_ref(&self) -> &PruneModes; diff --git a/crates/storage/storage-api/src/noop.rs b/crates/storage/storage-api/src/noop.rs index ca66ac6931c..4c0117fe54f 100644 --- a/crates/storage/storage-api/src/noop.rs +++ b/crates/storage/storage-api/src/noop.rs @@ -626,6 +626,12 @@ impl DBProvider for NoopProvider &PruneModes { &self.prune_modes } + + fn commit(self) -> ProviderResult { + use reth_db_api::transaction::DbTx; + + Ok(self.tx.commit()?) + } } #[cfg(feature = "db-api")] diff --git a/crates/storage/storage-api/src/state_writer.rs b/crates/storage/storage-api/src/state_writer.rs index 0710d849778..711b9e569f5 100644 --- a/crates/storage/storage-api/src/state_writer.rs +++ b/crates/storage/storage-api/src/state_writer.rs @@ -7,8 +7,6 @@ use revm_database::{ OriginalValuesKnown, }; -use super::StorageLocation; - /// A trait specifically for writing state changes or reverts pub trait StateWriter { /// Receipt type included into [`ExecutionOutcome`]. @@ -20,7 +18,6 @@ pub trait StateWriter { &self, execution_outcome: &ExecutionOutcome, is_value_known: OriginalValuesKnown, - write_receipts_to: StorageLocation, ) -> ProviderResult<()>; /// Write state reverts to the database. @@ -40,17 +37,12 @@ pub trait StateWriter { /// Remove the block range of state above the given block. The state of the passed block is not /// removed. - fn remove_state_above( - &self, - block: BlockNumber, - remove_receipts_from: StorageLocation, - ) -> ProviderResult<()>; + fn remove_state_above(&self, block: BlockNumber) -> ProviderResult<()>; /// Take the block range of state, recreating the [`ExecutionOutcome`]. The state of the passed /// block is not removed. fn take_state_above( &self, block: BlockNumber, - remove_receipts_from: StorageLocation, ) -> ProviderResult>; } diff --git a/crates/storage/storage-api/src/storage.rs b/crates/storage/storage-api/src/storage.rs index 56f42ca5878..8f560d8cfb7 100644 --- a/crates/storage/storage-api/src/storage.rs +++ b/crates/storage/storage-api/src/storage.rs @@ -42,26 +42,3 @@ pub trait StorageChangeSetReader: Send + Sync { block_number: BlockNumber, ) -> ProviderResult>; } - -/// An enum that represents the storage location for a piece of data. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum StorageLocation { - /// Write only to static files. - StaticFiles, - /// Write only to the database. - Database, - /// Write to both the database and static files. - Both, -} - -impl StorageLocation { - /// Returns true if the storage location includes static files. - pub const fn static_files(&self) -> bool { - matches!(self, Self::StaticFiles | Self::Both) - } - - /// Returns true if the storage location includes the database. - pub const fn database(&self) -> bool { - matches!(self, Self::Database | Self::Both) - } -} diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 75ff73101d2..3c601924dea 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -17,7 +17,7 @@ use reth_primitives_traits::{RecoveredBlock, SealedBlock}; use reth_provider::{ test_utils::create_test_provider_factory_with_chain_spec, BlockWriter, DatabaseProviderFactory, ExecutionOutcome, HeaderProvider, HistoryWriter, OriginalValuesKnown, StateProofProvider, - StateWriter, StorageLocation, + StateWriter, StaticFileProviderFactory, StaticFileSegment, StaticFileWriter, }; use reth_revm::{database::StateProviderDatabase, witness::ExecutionWitnessRecord, State}; use reth_stateless::{validation::stateless_validation, ExecutionWitness}; @@ -202,8 +202,13 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> { .try_recover() .unwrap(); + provider.insert_block(genesis_block.clone()).map_err(|err| Error::block_failed(0, err))?; + + // Increment block number for receipts static file provider - .insert_block(genesis_block.clone(), StorageLocation::Database) + .static_file_provider() + .latest_writer(StaticFileSegment::Receipts) + .and_then(|mut writer| writer.increment_block(0)) .map_err(|err| Error::block_failed(0, err))?; let genesis_state = case.pre.clone().into_genesis_state(); @@ -227,7 +232,12 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> { // Insert the block into the database provider - .insert_block(block.clone(), StorageLocation::Database) + .insert_block(block.clone()) + .map_err(|err| Error::block_failed(block_number, err))?; + // Commit static files, so we can query the headers for stateless execution below + provider + .static_file_provider() + .commit() .map_err(|err| Error::block_failed(block_number, err))?; // Consensus checks before block execution @@ -293,11 +303,7 @@ fn run_case(case: &BlockchainTest) -> Result<(), Error> { // Commit the post state/state diff to the database provider - .write_state( - &ExecutionOutcome::single(block.number, output), - OriginalValuesKnown::Yes, - StorageLocation::Database, - ) + .write_state(&ExecutionOutcome::single(block.number, output), OriginalValuesKnown::Yes) .map_err(|err| Error::block_failed(block_number, err))?; provider