diff --git a/Cargo.lock b/Cargo.lock index db1a89fdd2b..2591b3a351d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7848,8 +7848,6 @@ dependencies = [ "reth-chainspec", "reth-config", "reth-consensus", - "reth-db", - "reth-db-api", "reth-ethereum-primitives", "reth-metrics", "reth-network-p2p", diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 498907bb991..24bdc069f09 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2659,7 +2659,7 @@ where let mut canonical = self.canonical_in_memory_state.header_by_hash(hash); if canonical.is_none() { - canonical = self.provider.header(&hash)?.map(|header| SealedHeader::new(header, hash)); + canonical = self.provider.header(hash)?.map(|header| SealedHeader::new(header, hash)); } Ok(canonical) @@ -2883,7 +2883,7 @@ where } // Check if the block is persisted - if let Some(header) = self.provider.header(&hash)? { + if let Some(header) = self.provider.header(hash)? { debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder"); // For persisted blocks, we create a builder that will fetch state directly from the // database diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index c1cd4c433b8..cd2c37d1e91 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -967,7 +967,7 @@ where } // Check if the block is persisted - if let Some(header) = self.provider.header(&hash)? { + if let Some(header) = self.provider.header(hash)? { debug!(target: "engine::tree", %hash, number = %header.number(), "found canonical state for block in database, creating provider builder"); // For persisted blocks, we create a builder that will fetch state directly from the // database diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 9ebf69457b6..99694f0a51b 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -370,7 +370,7 @@ where .map(|(exex_id, num_hash)| { num_hash.map_or(Ok((exex_id, num_hash, false)), |num_hash| { self.provider - .is_known(&num_hash.hash) + .is_known(num_hash.hash) // Save the ExEx ID, finished height, and whether the hash is canonical .map(|is_canonical| (exex_id, Some(num_hash), is_canonical)) }) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 0def7a510ed..c6a54e647cf 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -308,7 +308,7 @@ where /// we're not on the canonical chain and we need to revert the notification with the ExEx /// head block. fn check_canonical(&mut self) -> eyre::Result>> { - if self.provider.is_known(&self.initial_exex_head.block.hash)? && + if self.provider.is_known(self.initial_exex_head.block.hash)? && self.initial_exex_head.block.number <= self.initial_local_head.number { // we have the targeted block and that block is below the current head diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index 1dd5ec0947b..57094813eee 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -22,9 +22,8 @@ reth-storage-api.workspace = true reth-tasks.workspace = true # optional deps for the test-utils feature -reth-db = { workspace = true, optional = true } -reth-db-api = { workspace = true, optional = true } reth-ethereum-primitives = { workspace = true, optional = true } +reth-provider = { workspace = true, optional = true } reth-testing-utils = { workspace = true, optional = true } # ethereum @@ -58,8 +57,6 @@ itertools.workspace = true async-compression = { workspace = true, features = ["gzip", "tokio"] } reth-ethereum-primitives.workspace = true reth-chainspec.workspace = true -reth-db = { workspace = true, features = ["test-utils"] } -reth-db-api.workspace = true reth-consensus = { workspace = true, features = ["test-utils"] } reth-network-p2p = { workspace = true, features = ["test-utils"] } reth-provider = { workspace = true, features = ["test-utils"] } @@ -76,13 +73,10 @@ default = [] file-client = ["dep:async-compression", "dep:alloy-rlp"] test-utils = [ "tempfile", - "reth-db-api", - "reth-db/test-utils", "reth-consensus/test-utils", "reth-network-p2p/test-utils", "reth-testing-utils", "reth-chainspec/test-utils", - "reth-db-api?/test-utils", "reth-provider/test-utils", "reth-primitives-traits/test-utils", "dep:reth-ethereum-primitives", diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 0c7b1e62012..09eb22854d4 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -618,12 +618,8 @@ mod tests { }; use alloy_primitives::B256; use assert_matches::assert_matches; - use reth_chainspec::MAINNET; use reth_consensus::test_utils::TestConsensus; - use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; - use reth_provider::{ - providers::StaticFileProvider, test_utils::MockNodeTypesWithDB, ProviderFactory, - }; + use reth_provider::test_utils::create_test_provider_factory; use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams}; use std::collections::HashMap; @@ -632,25 +628,20 @@ mod tests { #[tokio::test] async fn streams_bodies_in_order() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=19); - insert_headers(db.db(), &headers); + insert_headers(&factory, &headers); let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), ); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); let mut downloader = BodiesDownloaderBuilder::default() .build::( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ), + factory, ); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -666,7 +657,7 @@ mod tests { #[tokio::test] async fn requests_correct_number_of_times() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let mut rng = generators::rng(); let blocks = random_block_range( &mut rng, @@ -680,22 +671,17 @@ mod tests { .map(|block| (block.hash(), block.into_body())) .collect::>(); - insert_headers(db.db(), &headers); + insert_headers(&factory, &headers); let request_limit = 10; let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone())); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); let mut downloader = BodiesDownloaderBuilder::default() .with_request_limit(request_limit) .build::( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ), + factory, ); downloader.set_download_range(0..=199).expect("failed to set download range"); @@ -708,28 +694,23 @@ mod tests { #[tokio::test] async fn streams_bodies_in_order_after_range_reset() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=99); - insert_headers(db.db(), &headers); + insert_headers(&factory, &headers); let stream_batch_size = 20; let request_limit = 10; let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), ); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); let mut downloader = BodiesDownloaderBuilder::default() .with_stream_batch_size(stream_batch_size) .with_request_limit(request_limit) .build::( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ), + factory, ); let mut range_start = 0; @@ -750,24 +731,19 @@ mod tests { #[tokio::test] async fn can_download_new_range_after_termination() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=199); - insert_headers(db.db(), &headers); + insert_headers(&factory, &headers); let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone())); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); let mut downloader = BodiesDownloaderBuilder::default() .with_stream_batch_size(100) .build::( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ), + factory, ); // Set and download the first range @@ -792,14 +768,13 @@ mod tests { #[tokio::test] async fn can_download_after_exceeding_limit() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=199); - insert_headers(db.db(), &headers); + insert_headers(&factory, &headers); let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone())); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); // Set the max buffered block size to 1 byte, to make sure that every response exceeds the // limit let mut downloader = BodiesDownloaderBuilder::default() @@ -809,11 +784,7 @@ mod tests { .build::( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ), + factory, ); // Set and download the entire range @@ -829,16 +800,15 @@ mod tests { #[tokio::test] async fn can_tolerate_empty_responses() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=99); - insert_headers(db.db(), &headers); + insert_headers(&factory, &headers); // respond with empty bodies for every other request. let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2), ); - let (_static_dir, static_dir_path) = create_test_static_files_dir(); let mut downloader = BodiesDownloaderBuilder::default() .with_request_limit(3) @@ -846,11 +816,7 @@ mod tests { .build::( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::::new( - db, - MAINNET.clone(), - StaticFileProvider::read_write(static_dir_path).unwrap(), - ), + factory, ); // Download the requested range diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index df1d5540db3..4da5946fffb 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -190,7 +190,7 @@ mod tests { let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=19); - insert_headers(factory.db_ref().db(), &headers); + insert_headers(&factory, &headers); let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), diff --git a/crates/net/downloaders/src/bodies/test_utils.rs b/crates/net/downloaders/src/bodies/test_utils.rs index aeb4488eb0d..a7172ec1a00 100644 --- a/crates/net/downloaders/src/bodies/test_utils.rs +++ b/crates/net/downloaders/src/bodies/test_utils.rs @@ -3,12 +3,14 @@ #![allow(dead_code)] use alloy_consensus::BlockHeader; -use alloy_primitives::B256; -use reth_db::DatabaseEnv; -use reth_db_api::{database::Database, tables, transaction::DbTxMut}; +use alloy_primitives::{B256, U256}; use reth_ethereum_primitives::BlockBody; use reth_network_p2p::bodies::response::BlockResponse; use reth_primitives_traits::{Block, SealedBlock, SealedHeader}; +use reth_provider::{ + test_utils::MockNodeTypesWithDB, ProviderFactory, StaticFileProviderFactory, StaticFileSegment, + StaticFileWriter, +}; use std::collections::HashMap; pub(crate) fn zip_blocks<'a, B: Block>( @@ -42,12 +44,21 @@ pub(crate) fn create_raw_bodies( } #[inline] -pub(crate) fn insert_headers(db: &DatabaseEnv, headers: &[SealedHeader]) { - db.update(|tx| { - for header in headers { - tx.put::(header.number, header.hash()).unwrap(); - tx.put::(header.number, header.clone_header()).unwrap(); - } - }) - .expect("failed to commit") +pub(crate) fn insert_headers( + factory: &ProviderFactory, + headers: &[SealedHeader], +) { + let provider_rw = factory.provider_rw().expect("failed to create provider"); + let static_file_provider = provider_rw.static_file_provider(); + let mut writer = static_file_provider + .latest_writer(StaticFileSegment::Headers) + .expect("failed to create writer"); + + for header in headers { + writer + .append_header(header.header(), U256::ZERO, &header.hash()) + .expect("failed to append header"); + } + drop(writer); + provider_rw.commit().expect("failed to commit"); } diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 3f6233615c3..34c2f56b75c 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -675,7 +675,7 @@ mod tests { let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=19); - insert_headers(factory.db_ref().db(), &headers); + insert_headers(&factory, &headers); // create an empty file let file = tempfile::tempfile().unwrap(); @@ -770,7 +770,7 @@ mod tests { Arc::new(FileClient::from_file(file, NoopConsensus::arc()).await.unwrap()); // insert headers in db for the bodies downloader - insert_headers(factory.db_ref().db(), &headers); + insert_headers(&factory, &headers); let mut downloader = BodiesDownloaderBuilder::default().build::( client.clone(), diff --git a/crates/prune/prune/src/segments/user/transaction_lookup.rs b/crates/prune/prune/src/segments/user/transaction_lookup.rs index 2ed08f7d1a7..e218f623ed5 100644 --- a/crates/prune/prune/src/segments/user/transaction_lookup.rs +++ b/crates/prune/prune/src/segments/user/transaction_lookup.rs @@ -157,7 +157,7 @@ mod tests { 1..=10, BlockRangeParams { parent: Some(B256::ZERO), tx_count: 2..3, ..Default::default() }, ); - db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks"); let mut tx_hash_numbers = Vec::new(); for block in &blocks { @@ -170,11 +170,11 @@ mod tests { db.insert_tx_hash_numbers(tx_hash_numbers).expect("insert tx hash numbers"); assert_eq!( - db.table::().unwrap().len(), + db.count_entries::().unwrap(), blocks.iter().map(|block| block.transaction_count()).sum::() ); assert_eq!( - db.table::().unwrap().len(), + db.count_entries::().unwrap(), db.table::().unwrap().len() ); diff --git a/crates/rpc/rpc-eth-types/src/cache/mod.rs b/crates/rpc/rpc-eth-types/src/cache/mod.rs index 6df612261d9..e0bea2cf463 100644 --- a/crates/rpc/rpc-eth-types/src/cache/mod.rs +++ b/crates/rpc/rpc-eth-types/src/cache/mod.rs @@ -539,7 +539,7 @@ where this.action_task_spawner.spawn_blocking(Box::pin(async move { // Acquire permit let _permit = rate_limiter.acquire().await; - let header = provider.header(&block_hash).and_then(|header| { + let header = provider.header(block_hash).and_then(|header| { header.ok_or_else(|| { ProviderError::HeaderNotFound(block_hash.into()) }) diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index e0f5b4eabce..b3715c0e8e0 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -917,7 +917,7 @@ where /// Handler for `debug_getRawHeader` async fn raw_header(&self, block_id: BlockId) -> RpcResult { let header = match block_id { - BlockId::Hash(hash) => self.provider().header(&hash.into()).to_rpc_result()?, + BlockId::Hash(hash) => self.provider().header(hash.into()).to_rpc_result()?, BlockId::Number(number_or_tag) => { let number = self .provider() diff --git a/crates/stages/stages/src/stages/hashing_account.rs b/crates/stages/stages/src/stages/hashing_account.rs index c48381d4fe9..cc86db14d38 100644 --- a/crates/stages/stages/src/stages/hashing_account.rs +++ b/crates/stages/stages/src/stages/hashing_account.rs @@ -344,7 +344,7 @@ mod tests { done: true, }) if block_number == previous_stage && processed == total && - total == runner.db.table::().unwrap().len() as u64 + total == runner.db.count_entries::().unwrap() as u64 ); // Validate the stage execution diff --git a/crates/stages/stages/src/stages/hashing_storage.rs b/crates/stages/stages/src/stages/hashing_storage.rs index e0eb9716537..c52f800a018 100644 --- a/crates/stages/stages/src/stages/hashing_storage.rs +++ b/crates/stages/stages/src/stages/hashing_storage.rs @@ -266,7 +266,7 @@ mod tests { }, .. }) if processed == previous_checkpoint.progress.processed + 1 && - total == runner.db.table::().unwrap().len() as u64); + total == runner.db.count_entries::().unwrap() as u64); // Continue from checkpoint input.checkpoint = Some(checkpoint); @@ -280,7 +280,7 @@ mod tests { }, .. }) if processed == total && - total == runner.db.table::().unwrap().len() as u64); + total == runner.db.count_entries::().unwrap() as u64); // Validate the stage execution assert!( diff --git a/crates/stages/stages/src/stages/merkle.rs b/crates/stages/stages/src/stages/merkle.rs index c5115316243..6cbed3ab20e 100644 --- a/crates/stages/stages/src/stages/merkle.rs +++ b/crates/stages/stages/src/stages/merkle.rs @@ -493,8 +493,8 @@ mod tests { done: true }) if block_number == previous_stage && processed == total && total == ( - runner.db.table::().unwrap().len() + - runner.db.table::().unwrap().len() + runner.db.count_entries::().unwrap() + + runner.db.count_entries::().unwrap() ) as u64 ); @@ -533,8 +533,8 @@ mod tests { done: true }) if block_number == previous_stage && processed == total && total == ( - runner.db.table::().unwrap().len() + - runner.db.table::().unwrap().len() + runner.db.count_entries::().unwrap() + + runner.db.count_entries::().unwrap() ) as u64 ); @@ -575,8 +575,8 @@ mod tests { done: true }) if block_number == previous_stage && processed == total && total == ( - runner.db.table::().unwrap().len() + - runner.db.table::().unwrap().len() + runner.db.count_entries::().unwrap() + + runner.db.count_entries::().unwrap() ) as u64 ); diff --git a/crates/stages/stages/src/stages/sender_recovery.rs b/crates/stages/stages/src/stages/sender_recovery.rs index 2a2870f07ca..947f0620954 100644 --- a/crates/stages/stages/src/stages/sender_recovery.rs +++ b/crates/stages/stages/src/stages/sender_recovery.rs @@ -490,7 +490,7 @@ mod tests { ExecOutput { checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( EntitiesCheckpoint { - processed: runner.db.table::().unwrap().len() + processed: runner.db.count_entries::().unwrap() as u64, total: total_transactions } diff --git a/crates/stages/stages/src/stages/tx_lookup.rs b/crates/stages/stages/src/stages/tx_lookup.rs index 20a0770d8c8..8b1c531736b 100644 --- a/crates/stages/stages/src/stages/tx_lookup.rs +++ b/crates/stages/stages/src/stages/tx_lookup.rs @@ -264,7 +264,6 @@ mod tests { use reth_primitives_traits::SealedBlock; use reth_provider::{ providers::StaticFileWriter, BlockBodyIndicesProvider, DatabaseProviderFactory, - StaticFileProviderFactory, }; use reth_stages_api::StageUnitCheckpoint; use reth_testing_utils::generators::{ @@ -320,7 +319,7 @@ mod tests { total })) }, done: true }) if block_number == previous_stage && processed == total && - total == runner.db.factory.static_file_provider().count_entries::().unwrap() as u64 + total == runner.db.count_entries::().unwrap() as u64 ); // Validate the stage execution @@ -366,7 +365,7 @@ mod tests { total })) }, done: true }) if block_number == previous_stage && processed == total && - total == runner.db.factory.static_file_provider().count_entries::().unwrap() as u64 + total == runner.db.count_entries::().unwrap() as u64 ); // Validate the stage execution diff --git a/crates/stages/stages/src/test_utils/test_db.rs b/crates/stages/stages/src/test_utils/test_db.rs index f3e29c1fa66..f38f77b2247 100644 --- a/crates/stages/stages/src/test_utils/test_db.rs +++ b/crates/stages/stages/src/test_utils/test_db.rs @@ -19,7 +19,7 @@ use reth_primitives_traits::{Account, SealedBlock, SealedHeader, StorageEntry}; use reth_provider::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter}, test_utils::MockNodeTypesWithDB, - HistoryWriter, ProviderError, ProviderFactory, StaticFileProviderFactory, + HistoryWriter, ProviderError, ProviderFactory, StaticFileProviderFactory, StatsReader, }; use reth_static_file_types::StaticFileSegment; use reth_storage_errors::provider::ProviderResult; @@ -103,6 +103,11 @@ impl TestStageDB { }) } + /// Return the number of entries in the table or static file segment + pub fn count_entries(&self) -> ProviderResult { + self.factory.provider()?.count_entries::() + } + /// Check that there is no table entry above a given /// number by [`Table::Key`] pub fn ensure_no_entry_above(&self, num: u64, mut selector: F) -> ProviderResult<()> diff --git a/crates/static-file/static-file/src/static_file_producer.rs b/crates/static-file/static-file/src/static_file_producer.rs index b5df8aec08b..b6d205a42e1 100644 --- a/crates/static-file/static-file/src/static_file_producer.rs +++ b/crates/static-file/static-file/src/static_file_producer.rs @@ -255,9 +255,8 @@ mod tests { use crate::static_file_producer::{ StaticFileProducer, StaticFileProducerInner, StaticFileTargets, }; - use alloy_primitives::{B256, U256}; + use alloy_primitives::B256; use assert_matches::assert_matches; - use reth_db_api::{database::Database, transaction::DbTx}; use reth_provider::{ providers::StaticFileWriter, test_utils::MockNodeTypesWithDB, ProviderError, ProviderFactory, StaticFileProviderFactory, @@ -291,12 +290,7 @@ mod tests { static_file_writer.commit().expect("prune headers"); drop(static_file_writer); - let tx = db.factory.db_ref().tx_mut().expect("init tx"); - for block in &blocks { - TestStageDB::insert_header(None, &tx, block.sealed_header(), U256::ZERO) - .expect("insert block header"); - } - tx.commit().expect("commit tx"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let mut receipts = Vec::new(); for block in &blocks { diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index c5ba6890ee8..890b98124a5 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -182,7 +182,7 @@ impl StaticFileProviderFactory for BlockchainProvider { impl HeaderProvider for BlockchainProvider { type Header = HeaderTy; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { self.consistent_provider()?.header(block_hash) } @@ -190,7 +190,7 @@ impl HeaderProvider for BlockchainProvider { self.consistent_provider()?.header_by_number(num) } - fn header_td(&self, hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, hash: BlockHash) -> ProviderResult> { self.consistent_provider()?.header_td(hash) } @@ -342,6 +342,10 @@ impl BlockReader for BlockchainProvider { ) -> ProviderResult>> { self.consistent_provider()?.recovered_block_range(range) } + + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult> { + self.consistent_provider()?.block_by_transaction_id(id) + } } impl TransactionsProvider for BlockchainProvider { @@ -2290,13 +2294,15 @@ mod tests { let test_tx_index = 0; test_non_range!([ - // TODO: header should use B256 like others instead of &B256 - // ( - // ONE, - // header, - // |block: &SealedBlock, tx_num: TxNumber, tx_hash: B256, receipts: &Vec>| (&block.hash(), Some(block.header.header().clone())), - // (&B256::random()) - // ), + ( + ONE, + header, + |block: &SealedBlock, _: TxNumber, _: B256, _: &Vec>| ( + block.hash(), + Some(block.header().clone()) + ), + B256::random() + ), ( ONE, header_by_number, diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index cd74ab36965..03615d5357b 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -646,9 +646,9 @@ impl StaticFileProviderFactory for ConsistentProvider { impl HeaderProvider for ConsistentProvider { type Header = HeaderTy; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { self.get_in_memory_or_storage_by_block( - (*block_hash).into(), + block_hash.into(), |db_provider| db_provider.header(block_hash), |block_state| Ok(Some(block_state.block_ref().recovered_block().clone_header())), ) @@ -662,8 +662,8 @@ impl HeaderProvider for ConsistentProvider { ) } - fn header_td(&self, hash: &BlockHash) -> ProviderResult> { - if let Some(num) = self.block_number(*hash)? { + fn header_td(&self, hash: BlockHash) -> ProviderResult> { + if let Some(num) = self.block_number(hash)? { self.header_td_by_number(num) } else { Ok(None) @@ -917,6 +917,14 @@ impl BlockReader for ConsistentProvider { |_| true, ) } + + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult> { + self.get_in_memory_or_storage_by_tx( + id.into(), + |db_provider| db_provider.block_by_transaction_id(id), + |_, _, block_state| Ok(Some(block_state.number())), + ) + } } impl TransactionsProvider for ConsistentProvider { @@ -1305,14 +1313,14 @@ impl BlockReaderIdExt for ConsistentProvider { ) -> ProviderResult>>> { Ok(match id { BlockId::Number(num) => self.sealed_header_by_number_or_tag(num)?, - BlockId::Hash(hash) => self.header(&hash.block_hash)?.map(SealedHeader::seal_slow), + BlockId::Hash(hash) => self.header(hash.block_hash)?.map(SealedHeader::seal_slow), }) } fn header_by_id(&self, id: BlockId) -> ProviderResult>> { Ok(match id { BlockId::Number(num) => self.header_by_number_or_tag(num)?, - BlockId::Hash(hash) => self.header(&hash.block_hash)?, + BlockId::Hash(hash) => self.header(hash.block_hash)?, }) } } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index c9a19936af8..54642a94757 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -234,57 +234,41 @@ impl HeaderSyncGapProvider for ProviderFactory { impl HeaderProvider for ProviderFactory { type Header = HeaderTy; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { self.provider()?.header(block_hash) } fn header_by_number(&self, num: BlockNumber) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - num, - |static_file| static_file.header_by_number(num), - || self.provider()?.header_by_number(num), - ) + self.static_file_provider.header_by_number(num) } - fn header_td(&self, hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, hash: BlockHash) -> ProviderResult> { self.provider()?.header_td(hash) } fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult> { - self.provider()?.header_td_by_number(number) + self.static_file_provider.header_td_by_number(number) } fn headers_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Headers, - to_range(range), - |static_file, range, _| static_file.headers_range(range), - |range, _| self.provider()?.headers_range(range), - |_| true, - ) + self.static_file_provider.headers_range(range) } fn sealed_header( &self, number: BlockNumber, ) -> ProviderResult>> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - number, - |static_file| static_file.sealed_header(number), - || self.provider()?.sealed_header(number), - ) + self.static_file_provider.sealed_header(number) } fn sealed_headers_range( &self, range: impl RangeBounds, ) -> ProviderResult>> { - self.sealed_headers_while(range, |_| true) + self.static_file_provider.sealed_headers_range(range) } fn sealed_headers_while( @@ -292,24 +276,13 @@ impl HeaderProvider for ProviderFactory { range: impl RangeBounds, predicate: impl FnMut(&SealedHeader) -> bool, ) -> ProviderResult>> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Headers, - to_range(range), - |static_file, range, predicate| static_file.sealed_headers_while(range, predicate), - |range, predicate| self.provider()?.sealed_headers_while(range, predicate), - predicate, - ) + self.static_file_provider.sealed_headers_while(range, predicate) } } impl BlockHashReader for ProviderFactory { fn block_hash(&self, number: u64) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - number, - |static_file| static_file.block_hash(number), - || self.provider()?.block_hash(number), - ) + self.static_file_provider.block_hash(number) } fn canonical_hashes_range( @@ -317,13 +290,7 @@ impl BlockHashReader for ProviderFactory { start: BlockNumber, end: BlockNumber, ) -> ProviderResult> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Headers, - start..end, - |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end), - |range, _| self.provider()?.canonical_hashes_range(range.start, range.end), - |_| true, - ) + self.static_file_provider.canonical_hashes_range(start, end) } } @@ -337,7 +304,7 @@ impl BlockNumReader for ProviderFactory { } fn last_block_number(&self) -> ProviderResult { - self.provider()?.last_block_number() + self.static_file_provider.last_block_number() } fn earliest_block_number(&self) -> ProviderResult { @@ -409,6 +376,10 @@ impl BlockReader for ProviderFactory { ) -> ProviderResult>> { self.provider()?.recovered_block_range(range) } + + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult> { + self.provider()?.block_by_transaction_id(id) + } } impl TransactionsProvider for ProviderFactory { @@ -419,24 +390,14 @@ impl TransactionsProvider for ProviderFactory { } fn transaction_by_id(&self, id: TxNumber) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Transactions, - id, - |static_file| static_file.transaction_by_id(id), - || self.provider()?.transaction_by_id(id), - ) + self.static_file_provider.transaction_by_id(id) } fn transaction_by_id_unhashed( &self, id: TxNumber, ) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Transactions, - id, - |static_file| static_file.transaction_by_id_unhashed(id), - || self.provider()?.transaction_by_id_unhashed(id), - ) + self.static_file_provider.transaction_by_id_unhashed(id) } fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult> { @@ -472,7 +433,7 @@ impl TransactionsProvider for ProviderFactory { &self, range: impl RangeBounds, ) -> ProviderResult> { - self.provider()?.transactions_by_tx_range(range) + self.static_file_provider.transactions_by_tx_range(range) } fn senders_by_tx_range( @@ -489,6 +450,7 @@ impl TransactionsProvider for ProviderFactory { impl ReceiptProvider for ProviderFactory { type Receipt = ReceiptTy; + fn receipt(&self, id: TxNumber) -> ProviderResult> { self.static_file_provider.get_with_static_file_or_database( StaticFileSegment::Receipts, @@ -674,7 +636,6 @@ mod tests { StaticFileProvider::read_write(static_dir_path).unwrap(), ) .unwrap(); - let provider = factory.provider().unwrap(); provider.block_hash(0).unwrap(); let provider_rw = factory.provider_rw().unwrap(); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 33df75ace5f..16b463be1e8 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -22,7 +22,7 @@ use alloy_consensus::{ transaction::{SignerRecoverable, TransactionMeta, TxHashRef}, BlockHeader, TxReceipt, }; -use alloy_eips::{eip2718::Encodable2718, BlockHashOrNumber}; +use alloy_eips::BlockHashOrNumber; use alloy_primitives::{ keccak256, map::{hash_map, B256Map, HashMap, HashSet}, @@ -42,7 +42,7 @@ use reth_db_api::{ table::Table, tables, transaction::{DbTx, DbTxMut}, - BlockNumberList, DatabaseError, PlainAccountState, PlainStorageState, + BlockNumberList, PlainAccountState, PlainStorageState, }; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_node_types::{BlockTy, BodyTy, HeaderTy, NodeTypes, ReceiptTy, TxTy}; @@ -74,7 +74,7 @@ use std::{ collections::{BTreeMap, BTreeSet}, fmt::Debug, ops::{Deref, DerefMut, Not, Range, RangeBounds, RangeInclusive}, - sync::{mpsc, Arc}, + sync::Arc, }; use tracing::{debug, trace}; @@ -563,23 +563,6 @@ impl DatabaseProvider { } impl DatabaseProvider { - fn transactions_by_tx_range_with_cursor( - &self, - range: impl RangeBounds, - cursor: &mut C, - ) -> ProviderResult>> - where - C: DbCursorRO>>, - { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Transactions, - to_range(range), - |static_file, range, _| static_file.transactions_by_tx_range(range), - |range, _| self.cursor_collect(cursor, range), - |_| true, - ) - } - fn recovered_block( &self, id: BlockHashOrNumber, @@ -649,7 +632,6 @@ impl DatabaseProvider { let mut blocks = Vec::with_capacity(len); let headers = headers_range(range.clone())?; - let mut tx_cursor = self.tx.cursor_read::>>()?; // If the body indices are not found, this means that the transactions either do // not exist in the database yet, or they do exit but are @@ -668,7 +650,7 @@ impl DatabaseProvider { let transactions = if tx_range.is_empty() { Vec::new() } else { - self.transactions_by_tx_range_with_cursor(tx_range.clone(), &mut tx_cursor)? + self.transactions_by_tx_range(tx_range.clone())? }; inputs.push((header.as_ref(), transactions)); @@ -1007,8 +989,8 @@ impl HeaderSyncGapProvider impl HeaderProvider for DatabaseProvider { type Header = HeaderTy; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { - if let Some(num) = self.block_number(*block_hash)? { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { + if let Some(num) = self.block_number(block_hash)? { Ok(self.header_by_number(num)?) } else { Ok(None) @@ -1016,16 +998,11 @@ impl HeaderProvider for DatabasePro } fn header_by_number(&self, num: BlockNumber) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - num, - |static_file| static_file.header_by_number(num), - || Ok(self.tx.get::>(num)?), - ) + self.static_file_provider.header_by_number(num) } - fn header_td(&self, block_hash: &BlockHash) -> ProviderResult> { - if let Some(num) = self.block_number(*block_hash)? { + fn header_td(&self, block_hash: BlockHash) -> ProviderResult> { + if let Some(num) = self.block_number(block_hash)? { self.header_td_by_number(num) } else { Ok(None) @@ -1041,46 +1018,21 @@ impl HeaderProvider for DatabasePro return Ok(Some(td)) } - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - number, - |static_file| static_file.header_td_by_number(number), - || Ok(self.tx.get::(number)?.map(|td| td.0)), - ) + self.static_file_provider.header_td_by_number(number) } fn headers_range( &self, range: impl RangeBounds, ) -> ProviderResult> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Headers, - to_range(range), - |static_file, range, _| static_file.headers_range(range), - |range, _| self.cursor_read_collect::>(range), - |_| true, - ) + self.static_file_provider.headers_range(range) } fn sealed_header( &self, number: BlockNumber, ) -> ProviderResult>> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - number, - |static_file| static_file.sealed_header(number), - || { - if let Some(header) = self.header_by_number(number)? { - let hash = self - .block_hash(number)? - .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?; - Ok(Some(SealedHeader::new(header, hash))) - } else { - Ok(None) - } - }, - ) + self.static_file_provider.sealed_header(number) } fn sealed_headers_while( @@ -1088,40 +1040,13 @@ impl HeaderProvider for DatabasePro range: impl RangeBounds, predicate: impl FnMut(&SealedHeader) -> bool, ) -> ProviderResult>> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Headers, - to_range(range), - |static_file, range, predicate| static_file.sealed_headers_while(range, predicate), - |range, mut predicate| { - let mut headers = vec![]; - for entry in - self.tx.cursor_read::>()?.walk_range(range)? - { - let (number, header) = entry?; - let hash = self - .block_hash(number)? - .ok_or_else(|| ProviderError::HeaderNotFound(number.into()))?; - let sealed = SealedHeader::new(header, hash); - if !predicate(&sealed) { - break - } - headers.push(sealed); - } - Ok(headers) - }, - predicate, - ) + self.static_file_provider.sealed_headers_while(range, predicate) } } impl BlockHashReader for DatabaseProvider { fn block_hash(&self, number: u64) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Headers, - number, - |static_file| static_file.block_hash(number), - || Ok(self.tx.get::(number)?), - ) + self.static_file_provider.block_hash(number) } fn canonical_hashes_range( @@ -1129,13 +1054,7 @@ impl BlockHashReader for DatabaseProvider ProviderResult> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Headers, - start..end, - |static_file, range, _| static_file.canonical_hashes_range(range.start, range.end), - |range, _| self.cursor_read_collect::(range), - |_| true, - ) + self.static_file_provider.canonical_hashes_range(start, end) } } @@ -1156,15 +1075,7 @@ impl BlockNumReader for DatabaseProvider ProviderResult { - Ok(self - .tx - .cursor_read::()? - .last()? - .map(|(num, _)| num) - .max( - self.static_file_provider.get_highest_static_file_block(StaticFileSegment::Headers), - ) - .unwrap_or_default()) + self.static_file_provider.last_block_number() } fn block_number(&self, hash: B256) -> ProviderResult> { @@ -1216,6 +1127,7 @@ impl BlockReader for DatabaseProvid Ok(None) } + fn pending_block(&self) -> ProviderResult>> { Ok(None) } @@ -1313,6 +1225,14 @@ impl BlockReader for DatabaseProvid }, ) } + + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult> { + Ok(self + .tx + .cursor_read::()? + .seek(id) + .map(|b| b.map(|(_, bn)| bn))?) + } } impl TransactionsProviderExt @@ -1324,66 +1244,7 @@ impl TransactionsProviderExt &self, tx_range: Range, ) -> ProviderResult> { - self.static_file_provider.get_range_with_static_file_or_database( - StaticFileSegment::Transactions, - tx_range, - |static_file, range, _| static_file.transaction_hashes_by_range(range), - |tx_range, _| { - let mut tx_cursor = self.tx.cursor_read::>>()?; - let tx_range_size = tx_range.clone().count(); - let tx_walker = tx_cursor.walk_range(tx_range)?; - - let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1); - let mut channels = Vec::with_capacity(chunk_size); - let mut transaction_count = 0; - - #[inline] - fn calculate_hash( - entry: Result<(TxNumber, T), DatabaseError>, - rlp_buf: &mut Vec, - ) -> Result<(B256, TxNumber), Box> - where - T: Encodable2718, - { - let (tx_id, tx) = entry.map_err(|e| Box::new(e.into()))?; - tx.encode_2718(rlp_buf); - Ok((keccak256(rlp_buf), tx_id)) - } - - for chunk in &tx_walker.chunks(chunk_size) { - let (tx, rx) = mpsc::channel(); - channels.push(rx); - - // Note: Unfortunate side-effect of how chunk is designed in itertools (it is - // not Send) - let chunk: Vec<_> = chunk.collect(); - transaction_count += chunk.len(); - - // Spawn the task onto the global rayon pool - // This task will send the results through the channel after it has calculated - // the hash. - rayon::spawn(move || { - let mut rlp_buf = Vec::with_capacity(128); - for entry in chunk { - rlp_buf.clear(); - let _ = tx.send(calculate_hash(entry, &mut rlp_buf)); - } - }); - } - let mut tx_list = Vec::with_capacity(transaction_count); - - // Iterate over channels and append the tx hashes unsorted - for channel in channels { - while let Ok(tx) = channel.recv() { - let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?; - tx_list.push((tx_hash, tx_id)); - } - } - - Ok(tx_list) - }, - |_| true, - ) + self.static_file_provider.transaction_hashes_by_range(tx_range) } } @@ -1396,24 +1257,14 @@ impl TransactionsProvider for Datab } fn transaction_by_id(&self, id: TxNumber) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Transactions, - id, - |static_file| static_file.transaction_by_id(id), - || Ok(self.tx.get::>(id)?), - ) + self.static_file_provider.transaction_by_id(id) } fn transaction_by_id_unhashed( &self, id: TxNumber, ) -> ProviderResult> { - self.static_file_provider.get_with_static_file_or_database( - StaticFileSegment::Transactions, - id, - |static_file| static_file.transaction_by_id_unhashed(id), - || Ok(self.tx.get::>(id)?), - ) + self.static_file_provider.transaction_by_id_unhashed(id) } fn transaction_by_hash(&self, hash: TxHash) -> ProviderResult> { @@ -1428,11 +1279,9 @@ impl TransactionsProvider for Datab &self, tx_hash: TxHash, ) -> ProviderResult> { - let mut transaction_cursor = self.tx.cursor_read::()?; if let Some(transaction_id) = self.transaction_id(tx_hash)? && let Some(transaction) = self.transaction_by_id_unhashed(transaction_id)? && - let Some(block_number) = - transaction_cursor.seek(transaction_id).map(|b| b.map(|(_, bn)| bn))? && + let Some(block_number) = self.block_by_transaction_id(transaction_id)? && let Some(sealed_header) = self.sealed_header(block_number)? { let (header, block_hash) = sealed_header.split(); @@ -1469,8 +1318,6 @@ impl TransactionsProvider for Datab &self, id: BlockHashOrNumber, ) -> ProviderResult>> { - let mut tx_cursor = self.tx.cursor_read::>()?; - if let Some(block_number) = self.convert_hash_or_number(id)? && let Some(body) = self.block_body_indices(block_number)? { @@ -1478,7 +1325,7 @@ impl TransactionsProvider for Datab return if tx_range.is_empty() { Ok(Some(Vec::new())) } else { - Ok(Some(self.transactions_by_tx_range_with_cursor(tx_range, &mut tx_cursor)?)) + self.transactions_by_tx_range(tx_range).map(Some) } } Ok(None) @@ -1489,7 +1336,6 @@ impl TransactionsProvider for Datab range: impl RangeBounds, ) -> ProviderResult>> { let range = to_range(range); - let mut tx_cursor = self.tx.cursor_read::>()?; self.block_body_indices_range(range.start..=range.end.saturating_sub(1))? .into_iter() @@ -1498,10 +1344,7 @@ impl TransactionsProvider for Datab if tx_num_range.is_empty() { Ok(Vec::new()) } else { - Ok(self - .transactions_by_tx_range_with_cursor(tx_num_range, &mut tx_cursor)? - .into_iter() - .collect()) + self.transactions_by_tx_range(tx_num_range) } }) .collect() @@ -1511,10 +1354,7 @@ impl TransactionsProvider for Datab &self, range: impl RangeBounds, ) -> ProviderResult> { - self.transactions_by_tx_range_with_cursor( - range, - &mut self.tx.cursor_read::>()?, - ) + self.static_file_provider.transactions_by_tx_range(range) } fn senders_by_tx_range( @@ -2698,16 +2538,17 @@ impl BlockWrite type Block = BlockTy; type Receipt = ReceiptTy; - /// Inserts the block into the database, always modifying the following tables: - /// * [`CanonicalHeaders`](tables::CanonicalHeaders) - /// * [`Headers`](tables::Headers) - /// * [`HeaderNumbers`](tables::HeaderNumbers) - /// * [`HeaderTerminalDifficulties`](tables::HeaderTerminalDifficulties) - /// * [`BlockBodyIndices`](tables::BlockBodyIndices) + /// Inserts the block into the database, always modifying the following static file segments and + /// tables: + /// * [`StaticFileSegment::Headers`] + /// * [`tables::HeaderNumbers`] + /// * [`tables::HeaderTerminalDifficulties`] + /// * [`tables::BlockBodyIndices`] /// - /// If there are transactions in the block, the following tables will be modified: - /// * [`Transactions`](tables::Transactions) - /// * [`TransactionBlocks`](tables::TransactionBlocks) + /// If there are transactions in the block, the following static file segments and tables will + /// be modified: + /// * [`StaticFileSegment::Transactions`] + /// * [`tables::TransactionBlocks`] /// /// If ommers are not empty, this will modify [`BlockOmmers`](tables::BlockOmmers). /// If withdrawals are not empty, this will modify diff --git a/crates/storage/provider/src/providers/static_file/jar.rs b/crates/storage/provider/src/providers/static_file/jar.rs index ab19fbf732c..9906583f900 100644 --- a/crates/storage/provider/src/providers/static_file/jar.rs +++ b/crates/storage/provider/src/providers/static_file/jar.rs @@ -89,11 +89,11 @@ impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> { impl> HeaderProvider for StaticFileJarProvider<'_, N> { type Header = N::BlockHeader; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { Ok(self .cursor()? - .get_two::>(block_hash.into())? - .filter(|(_, hash)| hash == block_hash) + .get_two::>((&block_hash).into())? + .filter(|(_, hash)| hash == &block_hash) .map(|(header, _)| header)) } @@ -101,11 +101,11 @@ impl> HeaderProvider for StaticFileJarProv self.cursor()?.get_one::>(num.into()) } - fn header_td(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, block_hash: BlockHash) -> ProviderResult> { Ok(self .cursor()? - .get_two::(block_hash.into())? - .filter(|(_, hash)| hash == block_hash) + .get_two::((&block_hash).into())? + .filter(|(_, hash)| hash == &block_hash) .map(|(td, _)| td.into())) } diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 6af6313f6a2..434d3836fb2 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -1109,16 +1109,31 @@ impl StaticFileProvider { F: FnMut(&mut StaticFileCursor<'_>, u64) -> ProviderResult>, P: FnMut(&T) -> bool, { - let get_provider = |start: u64| { - if segment.is_block_based() { - self.get_segment_provider_from_block(segment, start, None) - } else { - self.get_segment_provider_from_transaction(segment, start, None) - } - }; - let mut result = Vec::with_capacity((range.end - range.start).min(100) as usize); - let mut provider = get_provider(range.start)?; + + /// Resolves to the provider for the given block or transaction number. + /// + /// If the static file is missing, the `result` is returned. + macro_rules! get_provider { + ($number:expr) => {{ + let provider = if segment.is_block_based() { + self.get_segment_provider_from_block(segment, $number, None) + } else { + self.get_segment_provider_from_transaction(segment, $number, None) + }; + + match provider { + Ok(provider) => provider, + Err( + ProviderError::MissingStaticFileBlock(_, _) | + ProviderError::MissingStaticFileTx(_, _), + ) => return Ok(result), + Err(err) => return Err(err), + } + }}; + } + + let mut provider = get_provider!(range.start); let mut cursor = provider.cursor()?; // advances number in range @@ -1140,19 +1155,7 @@ impl StaticFileProvider { } None => { if retrying { - warn!( - target: "provider::static_file", - ?segment, - ?number, - "Could not find block or tx number on a range request" - ); - - let err = if segment.is_block_based() { - ProviderError::MissingStaticFileBlock(segment, number) - } else { - ProviderError::MissingStaticFileTx(segment, number) - }; - return Err(err) + return Ok(result) } // There is a very small chance of hitting a deadlock if two consecutive // static files share the same bucket in the @@ -1160,7 +1163,7 @@ impl StaticFileProvider { // before requesting the next one. drop(cursor); drop(provider); - provider = get_provider(number)?; + provider = get_provider!(number); cursor = provider.cursor()?; retrying = true; } @@ -1374,13 +1377,13 @@ impl StaticFileWriter for StaticFileProvider { impl> HeaderProvider for StaticFileProvider { type Header = N::BlockHeader; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { self.find_static_file(StaticFileSegment::Headers, |jar_provider| { Ok(jar_provider .cursor()? - .get_two::>(block_hash.into())? + .get_two::>((&block_hash).into())? .and_then(|(header, hash)| { - if &hash == block_hash { + if hash == block_hash { return Some(header) } None @@ -1400,12 +1403,12 @@ impl> HeaderProvider for StaticFileProvide }) } - fn header_td(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, block_hash: BlockHash) -> ProviderResult> { self.find_static_file(StaticFileSegment::Headers, |jar_provider| { Ok(jar_provider .cursor()? - .get_two::(block_hash.into())? - .and_then(|(td, hash)| (&hash == block_hash).then_some(td.0))) + .get_two::((&block_hash).into())? + .and_then(|(td, hash)| (hash == block_hash).then_some(td.0))) }) } @@ -1468,7 +1471,15 @@ impl> HeaderProvider for StaticFileProvide impl BlockHashReader for StaticFileProvider { fn block_hash(&self, num: u64) -> ProviderResult> { - self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num) + self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None) + .and_then(|provider| provider.block_hash(num)) + .or_else(|err| { + if let ProviderError::MissingStaticFileBlock(_, _) = err { + Ok(None) + } else { + Err(err) + } + }) } fn canonical_hashes_range( @@ -1712,8 +1723,6 @@ impl> TransactionsPr } } -/* Cannot be successfully implemented but must exist for trait requirements */ - impl BlockNumReader for StaticFileProvider { fn chain_info(&self) -> ProviderResult { // Required data not present in static_files @@ -1726,8 +1735,7 @@ impl BlockNumReader for StaticFileProvider { } fn last_block_number(&self) -> ProviderResult { - // Required data not present in static_files - Err(ProviderError::UnsupportedProvider) + Ok(self.get_highest_static_file_block(StaticFileSegment::Headers).unwrap_or_default()) } fn block_number(&self, _hash: B256) -> ProviderResult> { @@ -1736,6 +1744,8 @@ impl BlockNumReader for StaticFileProvider { } } +/* Cannot be successfully implemented but must exist for trait requirements */ + impl> BlockReader for StaticFileProvider { @@ -1803,6 +1813,10 @@ impl> ) -> ProviderResult>> { Err(ProviderError::UnsupportedProvider) } + + fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult> { + Err(ProviderError::UnsupportedProvider) + } } impl BlockBodyIndicesProvider for StaticFileProvider { diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index 97a8ea95433..1c3bfd58a79 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -146,12 +146,12 @@ mod tests { let header = header.unseal(); // Compare Header - assert_eq!(header, db_provider.header(&header_hash).unwrap().unwrap()); + assert_eq!(header, db_provider.header(header_hash).unwrap().unwrap()); assert_eq!(header, jar_provider.header_by_number(header.number).unwrap().unwrap()); // Compare HeaderTerminalDifficulties assert_eq!( - db_provider.header_td(&header_hash).unwrap().unwrap(), + db_provider.header_td(header_hash).unwrap().unwrap(), jar_provider.header_td_by_number(header.number).unwrap().unwrap() ); } diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 220cff07ce1..d5e3fe4da7b 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -281,9 +281,9 @@ impl HeaderP { type Header = ::Header; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { let lock = self.headers.lock(); - Ok(lock.get(block_hash).cloned()) + Ok(lock.get(&block_hash).cloned()) } fn header_by_number(&self, num: u64) -> ProviderResult> { @@ -291,9 +291,9 @@ impl HeaderP Ok(lock.values().find(|h| h.number() == num).cloned()) } - fn header_td(&self, hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, hash: BlockHash) -> ProviderResult> { let lock = self.headers.lock(); - Ok(lock.get(hash).map(|target| { + Ok(lock.get(&hash).map(|target| { lock.values() .filter(|h| h.number() < target.number()) .fold(target.difficulty(), |td, h| td + h.difficulty()) @@ -718,6 +718,10 @@ impl BlockRe ) -> ProviderResult>> { Ok(vec![]) } + + fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult> { + Ok(None) + } } impl BlockReaderIdExt for MockEthProvider diff --git a/crates/storage/rpc-provider/src/lib.rs b/crates/storage/rpc-provider/src/lib.rs index 00b19df49dc..76e511d52d4 100644 --- a/crates/storage/rpc-provider/src/lib.rs +++ b/crates/storage/rpc-provider/src/lib.rs @@ -338,9 +338,9 @@ where { type Header = HeaderTy; - fn header(&self, block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, block_hash: BlockHash) -> ProviderResult> { let block_response = self.block_on_async(async { - self.provider.get_block_by_hash(*block_hash).await.map_err(ProviderError::other) + self.provider.get_block_by_hash(block_hash).await.map_err(ProviderError::other) })?; let Some(block_response) = block_response else { @@ -364,7 +364,7 @@ where Ok(Some(sealed_header.into_header())) } - fn header_td(&self, hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, hash: BlockHash) -> ProviderResult> { let header = self.header(hash).map_err(ProviderError::other)?; Ok(header.map(|b| b.difficulty())) @@ -510,6 +510,10 @@ where ) -> ProviderResult>> { Err(ProviderError::UnsupportedProvider) } + + fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult> { + Err(ProviderError::UnsupportedProvider) + } } impl BlockReaderIdExt for RpcBlockchainProvider @@ -1539,6 +1543,10 @@ where ) -> Result>, ProviderError> { Err(ProviderError::UnsupportedProvider) } + + fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult> { + Err(ProviderError::UnsupportedProvider) + } } impl TransactionsProvider for RpcBlockchainStateProvider @@ -1658,7 +1666,7 @@ where { type Header = HeaderTy; - fn header(&self, _block_hash: &BlockHash) -> Result, ProviderError> { + fn header(&self, _block_hash: BlockHash) -> Result, ProviderError> { Err(ProviderError::UnsupportedProvider) } @@ -1666,7 +1674,7 @@ where Err(ProviderError::UnsupportedProvider) } - fn header_td(&self, _hash: &BlockHash) -> Result, ProviderError> { + fn header_td(&self, _hash: BlockHash) -> Result, ProviderError> { Err(ProviderError::UnsupportedProvider) } diff --git a/crates/storage/storage-api/src/block.rs b/crates/storage/storage-api/src/block.rs index 40a009935ca..b9ab206a6b8 100644 --- a/crates/storage/storage-api/src/block.rs +++ b/crates/storage/storage-api/src/block.rs @@ -4,7 +4,7 @@ use crate::{ }; use alloc::{sync::Arc, vec::Vec}; use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumberOrTag}; -use alloy_primitives::{BlockNumber, B256}; +use alloy_primitives::{BlockNumber, TxNumber, B256}; use core::ops::RangeInclusive; use reth_primitives_traits::{RecoveredBlock, SealedHeader}; use reth_storage_errors::provider::ProviderResult; @@ -144,6 +144,9 @@ pub trait BlockReader: &self, range: RangeInclusive, ) -> ProviderResult>>; + + /// Returns the block number that contains the given transaction. + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult>; } impl BlockReader for Arc { @@ -202,6 +205,9 @@ impl BlockReader for Arc { ) -> ProviderResult>> { T::recovered_block_range(self, range) } + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult> { + T::block_by_transaction_id(self, id) + } } impl BlockReader for &T { @@ -260,6 +266,9 @@ impl BlockReader for &T { ) -> ProviderResult>> { T::recovered_block_range(self, range) } + fn block_by_transaction_id(&self, id: TxNumber) -> ProviderResult> { + T::block_by_transaction_id(self, id) + } } /// Trait extension for `BlockReader`, for types that implement `BlockId` conversion. diff --git a/crates/storage/storage-api/src/header.rs b/crates/storage/storage-api/src/header.rs index a4c9b215f82..7e3133ec712 100644 --- a/crates/storage/storage-api/src/header.rs +++ b/crates/storage/storage-api/src/header.rs @@ -15,19 +15,19 @@ pub trait HeaderProvider: Send + Sync { type Header: BlockHeader; /// Check if block is known - fn is_known(&self, block_hash: &BlockHash) -> ProviderResult { + fn is_known(&self, block_hash: BlockHash) -> ProviderResult { self.header(block_hash).map(|header| header.is_some()) } /// Get header by block hash - fn header(&self, block_hash: &BlockHash) -> ProviderResult>; + fn header(&self, block_hash: BlockHash) -> ProviderResult>; /// Retrieves the header sealed by the given block hash. fn sealed_header_by_hash( &self, block_hash: BlockHash, ) -> ProviderResult>> { - Ok(self.header(&block_hash)?.map(|header| SealedHeader::new(header, block_hash))) + Ok(self.header(block_hash)?.map(|header| SealedHeader::new(header, block_hash))) } /// Get header by block number @@ -39,13 +39,13 @@ pub trait HeaderProvider: Send + Sync { hash_or_num: BlockHashOrNumber, ) -> ProviderResult> { match hash_or_num { - BlockHashOrNumber::Hash(hash) => self.header(&hash), + BlockHashOrNumber::Hash(hash) => self.header(hash), BlockHashOrNumber::Number(num) => self.header_by_number(num), } } /// Get total difficulty by block hash. - fn header_td(&self, hash: &BlockHash) -> ProviderResult>; + fn header_td(&self, hash: BlockHash) -> ProviderResult>; /// Get total difficulty by block number. fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult>; diff --git a/crates/storage/storage-api/src/noop.rs b/crates/storage/storage-api/src/noop.rs index 4c0117fe54f..44e499ae006 100644 --- a/crates/storage/storage-api/src/noop.rs +++ b/crates/storage/storage-api/src/noop.rs @@ -237,6 +237,10 @@ impl BlockReader for NoopProvider { ) -> ProviderResult>> { Ok(Vec::new()) } + + fn block_by_transaction_id(&self, _id: TxNumber) -> ProviderResult> { + Ok(None) + } } impl TransactionsProvider for NoopProvider { @@ -343,7 +347,7 @@ impl ReceiptProviderIdExt for NoopProvider HeaderProvider for NoopProvider { type Header = N::BlockHeader; - fn header(&self, _block_hash: &BlockHash) -> ProviderResult> { + fn header(&self, _block_hash: BlockHash) -> ProviderResult> { Ok(None) } @@ -351,7 +355,7 @@ impl HeaderProvider for NoopProvider { Ok(None) } - fn header_td(&self, _hash: &BlockHash) -> ProviderResult> { + fn header_td(&self, _hash: BlockHash) -> ProviderResult> { Ok(None) } diff --git a/examples/db-access/src/main.rs b/examples/db-access/src/main.rs index 4027beb70cb..93896accbbc 100644 --- a/examples/db-access/src/main.rs +++ b/examples/db-access/src/main.rs @@ -63,7 +63,7 @@ fn header_provider_example(provider: T, number: u64) -> eyre: // Can also query the header by hash! let header_by_hash = - provider.header(&sealed_header.hash())?.ok_or(eyre::eyre!("header by hash not found"))?; + provider.header(sealed_header.hash())?.ok_or(eyre::eyre!("header by hash not found"))?; assert_eq!(sealed_header.header(), &header_by_hash); // The header's total difficulty is stored in a separate table, so we have a separate call for