diff --git a/.github/scripts/hive/expected_failures.yaml b/.github/scripts/hive/expected_failures.yaml index 7046f5011e2..49b1c8c4a34 100644 --- a/.github/scripts/hive/expected_failures.yaml +++ b/.github/scripts/hive/expected_failures.yaml @@ -21,10 +21,10 @@ engine-withdrawals: [] engine-api: [] engine-cancun: - - Invalid PayloadAttributes, Missing BeaconRoot, Syncing=True (Cancun) (reth) + # - Invalid PayloadAttributes, Missing BeaconRoot, Syncing=True (Cancun) (reth) # the test fails with older versions of the code for which it passed before, probably related to changes # in hive or its dependencies - - Blob Transaction Ordering, Multiple Clients (Cancun) (reth) + # - Blob Transaction Ordering, Multiple Clients (Cancun) (reth) sync: [] diff --git a/Cargo.lock b/Cargo.lock index acbe4955d42..356448e9e0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7587,6 +7587,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-bal-store" +version = "1.11.0" +dependencies = [ + "alloy-primitives", + "parking_lot", + "tempfile", + "thiserror 2.0.18", + "tracing", +] + [[package]] name = "reth-basic-payload-builder" version = "1.11.0" @@ -8356,6 +8367,7 @@ dependencies = [ "rand 0.8.5", "rand 0.9.2", "rayon", + "reth-bal-store", "reth-chain-state", "reth-chainspec", "reth-consensus", @@ -9052,6 +9064,7 @@ dependencies = [ "rand 0.8.5", "rand 0.9.2", "rayon", + "reth-bal-store", "reth-chainspec", "reth-consensus", "reth-discv4", @@ -9225,6 +9238,7 @@ dependencies = [ "jsonrpsee", "parking_lot", "rayon", + "reth-bal-store", "reth-basic-payload-builder", "reth-chain-state", "reth-chainspec", @@ -9612,6 +9626,7 @@ dependencies = [ "parking_lot", "rand 0.9.2", "rayon", + "reth-bal-store", "reth-chain-state", "reth-chainspec", "reth-codecs", @@ -9752,6 +9767,7 @@ dependencies = [ "parking_lot", "pin-project", "rand 0.9.2", + "reth-bal-store", "reth-chain-state", "reth-chainspec", "reth-consensus", @@ -9957,6 +9973,7 @@ dependencies = [ "jsonrpsee-core", "jsonrpsee-types", "metrics", + "reth-bal-store", "reth-chainspec", "reth-engine-primitives", "reth-ethereum-engine-primitives", @@ -9975,6 +9992,7 @@ dependencies = [ "reth-testing-utils", "reth-transaction-pool", "serde", + "tempfile", "thiserror 2.0.18", "tokio", "tracing", @@ -10002,6 +10020,7 @@ dependencies = [ "jsonrpsee", "jsonrpsee-types", "parking_lot", + "reth-bal-store", "reth-chain-state", "reth-chainspec", "reth-errors", @@ -10045,6 +10064,7 @@ dependencies = [ "metrics", "rand 0.9.2", "reqwest", + "reth-bal-store", "reth-chain-state", "reth-chainspec", "reth-errors", @@ -10266,6 +10286,7 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "auto_impl", + "reth-bal-store", "reth-chainspec", "reth-db-api", "reth-db-models", @@ -11171,9 +11192,9 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.28" +version = "0.1.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +checksum = "91c1b7e4904c873ef0710c1f407dde2e6287de2bebc1bbbf7d430bb7cbffd939" dependencies = [ "windows-sys 0.61.2", ] @@ -11893,9 +11914,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.26.0" +version = "3.27.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82a72c767771b47409d2345987fda8628641887d5466101319899796367354a0" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand 2.3.0", "getrandom 0.4.2", diff --git a/Cargo.toml b/Cargo.toml index e26a5e58560..711b5f3f999 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ members = [ "crates/exex/test-utils/", "crates/exex/types/", "crates/metrics/", + "crates/net/bal-store/", "crates/net/banlist/", "crates/net/discv4/", "crates/net/discv5/", @@ -377,6 +378,7 @@ reth-ipc = { path = "crates/rpc/ipc" } reth-libmdbx = { path = "crates/storage/libmdbx-rs" } reth-mdbx-sys = { path = "crates/storage/libmdbx-rs/mdbx-sys" } reth-metrics = { path = "crates/metrics" } +reth-bal-store = { path = "crates/net/bal-store" } reth-net-banlist = { path = "crates/net/banlist" } reth-net-nat = { path = "crates/net/nat" } reth-network = { path = "crates/net/network" } diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 5c5cf853953..4af3338b855 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -12,6 +12,7 @@ workspace = true [dependencies] # reth +reth-bal-store.workspace = true reth-chain-state = { workspace = true, features = ["rayon"] } reth-chainspec = { workspace = true, optional = true } reth-consensus.workspace = true diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index 175496181de..c5162c672c7 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -1190,7 +1190,7 @@ mod tests { use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::{Account, Recovered, StorageEntry}; use reth_provider::{ - providers::{BlockchainProvider, OverlayStateProviderFactory}, + providers::{BalProvider, BlockchainProvider, OverlayStateProviderFactory}, test_utils::create_test_provider_factory_with_chain_spec, ChainSpecProvider, HashingWriter, }; @@ -1448,7 +1448,7 @@ mod tests { PrecompileCacheMap::default(), ); - let provider_factory = BlockchainProvider::new(factory).unwrap(); + let provider_factory = BlockchainProvider::new(factory, BalProvider::default()).unwrap(); let mut handle = payload_processor.spawn( ExecutionEnv::test_default(), diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index aa1ea214cba..be43a006a82 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -16,6 +16,7 @@ use alloy_eips::{eip1898::BlockWithParent, eip4895::Withdrawal, NumHash}; use alloy_evm::Evm; use alloy_primitives::{map::B256Set, B256}; use alloy_rlp::Decodable; +use reth_bal_store::BalStore; #[cfg(feature = "trie-debug")] use reth_trie_sparse::debug_recorder::TrieDebugRecorder; @@ -178,6 +179,7 @@ where + StateProviderFactory + StateReader + HashedPostStateProvider + + BalStore + Clone + 'static, Evm: ConfigureEvm + 'static, @@ -993,6 +995,9 @@ where ConsensusError::BlockAccessListHashMismatch((got, expected).into()), )); } + let bal_bytes: alloy_primitives::Bytes = + alloy_rlp::encode(built_bal.unwrap_or_default()).into(); + let _ = self.provider.insert(input.hash(), input.num_hash().number, bal_bytes); } let output = BlockExecutionOutput { result, state: db.take_bundle() }; @@ -1986,6 +1991,7 @@ where + ChangeSetReader + BlockNumReader + HashedPostStateProvider + + BalStore + Clone + 'static, N: NodePrimitives, diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index f819ff7169d..101877a07b3 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -253,7 +253,8 @@ mod tests { use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::crypto::secp256k1::public_key_to_address; use reth_provider::{ - providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, + providers::{BalProvider, BlockchainProvider}, + test_utils::create_test_provider_factory_with_chain_spec, }; use reth_testing_utils::generators; @@ -270,7 +271,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; let blocks_and_execution_outputs = blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; @@ -306,7 +308,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; let blocks_and_execution_outcomes = blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; @@ -356,7 +359,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; // Execute blocks via LatestStateProvider (pipeline-style) and commit to DB. // This mirrors what the pipeline's ExecutionStage does. @@ -420,7 +424,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; let pipeline_results = blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; @@ -463,7 +468,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; let blocks_and_execution_outputs = blocks_and_execution_outputs(provider_factory, chain_spec, key_pair)?; diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index 9d50737f5aa..0d253a9f07d 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -259,7 +259,7 @@ mod tests { crypto::secp256k1::public_key_to_address, Block as _, NodePrimitives, }; use reth_provider::{ - providers::{BlockchainProvider, ProviderNodeTypes}, + providers::{BalProvider, BlockchainProvider, ProviderNodeTypes}, test_utils::create_test_provider_factory_with_chain_spec, ProviderFactory, }; @@ -281,7 +281,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; // Create first 2 blocks let blocks_and_execution_outcomes = @@ -318,7 +319,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; // Create first 2 blocks let (blocks, execution_outcome) = @@ -421,7 +423,8 @@ mod tests { let executor = EthEvmConfig::ethereum(chain_spec.clone()); let provider_factory = create_test_provider_factory_with_chain_spec(chain_spec.clone()); init_genesis(&provider_factory)?; - let blockchain_db = BlockchainProvider::new(provider_factory.clone())?; + let blockchain_db = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; // Create and commit 4 blocks let blocks = create_blocks(&chain_spec, key_pair, 4)?; diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index c8a1b2c4ba6..dbbcedee4e5 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -687,8 +687,9 @@ mod tests { use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::RecoveredBlock; use reth_provider::{ - providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockReader, - BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant, + providers::{BalProvider, BlockchainProvider}, + test_utils::create_test_provider_factory, + BlockReader, BlockWriter, Chain, DBProvider, DatabaseProviderFactory, TransactionVariant, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; @@ -1119,7 +1120,7 @@ mod tests { async fn exex_handle_new() { let provider_factory = create_test_provider_factory(); init_genesis(&provider_factory).unwrap(); - let provider = BlockchainProvider::new(provider_factory).unwrap(); + let provider = BlockchainProvider::new(provider_factory, BalProvider::default()).unwrap(); let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); @@ -1174,7 +1175,7 @@ mod tests { async fn test_notification_if_finished_height_gt_chain_tip() { let provider_factory = create_test_provider_factory(); init_genesis(&provider_factory).unwrap(); - let provider = BlockchainProvider::new(provider_factory).unwrap(); + let provider = BlockchainProvider::new(provider_factory, BalProvider::default()).unwrap(); let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); @@ -1224,7 +1225,7 @@ mod tests { async fn test_sends_chain_reorged_notification() { let provider_factory = create_test_provider_factory(); init_genesis(&provider_factory).unwrap(); - let provider = BlockchainProvider::new(provider_factory).unwrap(); + let provider = BlockchainProvider::new(provider_factory, BalProvider::default()).unwrap(); let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); @@ -1267,7 +1268,7 @@ mod tests { async fn test_sends_chain_reverted_notification() { let provider_factory = create_test_provider_factory(); init_genesis(&provider_factory).unwrap(); - let provider = BlockchainProvider::new(provider_factory).unwrap(); + let provider = BlockchainProvider::new(provider_factory, BalProvider::default()).unwrap(); let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); @@ -1327,7 +1328,7 @@ mod tests { provider_rw.insert_block(&block).unwrap(); provider_rw.commit().unwrap(); - let provider = BlockchainProvider::new(provider_factory).unwrap(); + let provider = BlockchainProvider::new(provider_factory, BalProvider::default()).unwrap(); let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); @@ -1428,7 +1429,8 @@ mod tests { let wal = Wal::new(temp_dir.path()).unwrap(); let provider_factory = create_test_provider_factory(); init_genesis(&provider_factory).unwrap(); - let provider = BlockchainProvider::new(provider_factory.clone()).unwrap(); + let provider = + BlockchainProvider::new(provider_factory.clone(), BalProvider::default()).unwrap(); // 1. Setup Manager with Capacity = 1 let (exex_handle, _, mut notifications) = ExExHandle::new( diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index b5124e8ce0e..001ffd38c29 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -529,8 +529,9 @@ mod tests { use reth_evm_ethereum::EthEvmConfig; use reth_primitives_traits::Block as _; use reth_provider::{ - providers::BlockchainProvider, test_utils::create_test_provider_factory, BlockWriter, - Chain, DBProvider, DatabaseProviderFactory, + providers::{BalProvider, BlockchainProvider}, + test_utils::create_test_provider_factory, + BlockWriter, Chain, DBProvider, DatabaseProviderFactory, }; use reth_testing_utils::generators::{self, random_block, BlockParams}; use std::collections::BTreeMap; @@ -549,7 +550,7 @@ mod tests { .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; - let provider = BlockchainProvider::new(provider_factory.clone())?; + let provider = BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; let node_head_block = random_block( &mut rng, @@ -623,7 +624,7 @@ mod tests { .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; - let provider = BlockchainProvider::new(provider_factory)?; + let provider = BlockchainProvider::new(provider_factory, BalProvider::default())?; let node_head = BlockNumHash { number: genesis_block.number, hash: genesis_hash }; let exex_head = ExExHead { block: node_head }; @@ -677,7 +678,7 @@ mod tests { .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; - let provider = BlockchainProvider::new(provider_factory)?; + let provider = BlockchainProvider::new(provider_factory, BalProvider::default())?; let node_head_block = random_block( &mut rng, @@ -768,7 +769,7 @@ mod tests { .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; - let provider = BlockchainProvider::new(provider_factory)?; + let provider = BlockchainProvider::new(provider_factory, BalProvider::default())?; let exex_head_block = random_block( &mut rng, @@ -853,7 +854,7 @@ mod tests { .block(genesis_hash.into())? .ok_or_else(|| eyre::eyre!("genesis block not found"))?; - let provider = BlockchainProvider::new(provider_factory.clone())?; + let provider = BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; // Insert block 1 into the DB so there's something to backfill let node_head_block = random_block( diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 900a985eb32..8f42f9b09ff 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -52,7 +52,7 @@ use reth_node_ethereum::{ use reth_payload_builder::noop::NoopPayloadBuilderService; use reth_primitives_traits::{Block as _, RecoveredBlock}; use reth_provider::{ - providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider}, + providers::{BalProvider, BlockchainProvider, RocksDBProvider, StaticFileProvider}, BlockReader, EthStorage, ProviderFactory, }; use reth_tasks::Runtime; @@ -256,7 +256,7 @@ pub async fn test_exex_context_with_chain_spec( )?; let genesis_hash = init_genesis(&provider_factory)?; - let provider = BlockchainProvider::new(provider_factory.clone())?; + let provider = BlockchainProvider::new(provider_factory.clone(), BalProvider::default())?; let runtime = Runtime::test(); let network_manager = NetworkManager::new( diff --git a/crates/net/bal-store/Cargo.toml b/crates/net/bal-store/Cargo.toml new file mode 100644 index 00000000000..39bf1a51b2e --- /dev/null +++ b/crates/net/bal-store/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reth-bal-store" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Shared BAL storage abstractions and implementations" + +[lints] +workspace = true + +[dependencies] +alloy-primitives.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tracing.workspace = true + +[dev-dependencies] +alloy-primitives = { workspace = true, features = ["rand"] } +tempfile.workspace = true diff --git a/crates/net/bal-store/src/disk.rs b/crates/net/bal-store/src/disk.rs new file mode 100644 index 00000000000..0a96197338f --- /dev/null +++ b/crates/net/bal-store/src/disk.rs @@ -0,0 +1,676 @@ +//! Disk-backed BAL store. + +use crate::{BalStore, BalStoreError}; +use alloy_primitives::{BlockHash, BlockNumber, Bytes}; +use parking_lot::{Mutex, RwLock}; +use std::{ + collections::{BTreeMap, HashMap}, + fs, io, + path::PathBuf, + sync::Arc, +}; +use tracing::debug; + +/// Default maximum retained BAL entries on disk. +/// +/// Roughly aligns with the weak subjectivity period target discussed in EIP-7928. +pub const DEFAULT_MAX_BAL_STORE_ENTRIES: u32 = 113_000; +/// Default number of recent BALs retained in memory to avoid repeated disk reads. +pub const DEFAULT_RECENT_BAL_CACHE_ENTRIES: u32 = 1024; + +/// Configuration for [`DiskFileBalStore`]. +#[derive(Debug, Clone, Copy)] +pub struct DiskFileBalStoreConfig { + /// Maximum number of BAL entries to retain. + pub max_entries: u32, + /// Number of most-recent BAL entries retained in memory. + pub recent_cache_entries: u32, +} + +impl Default for DiskFileBalStoreConfig { + fn default() -> Self { + Self { + max_entries: DEFAULT_MAX_BAL_STORE_ENTRIES, + recent_cache_entries: DEFAULT_RECENT_BAL_CACHE_ENTRIES, + } + } +} + +impl DiskFileBalStoreConfig { + /// Sets the maximum retained entries. + pub const fn with_max_entries(mut self, max_entries: u32) -> Self { + self.max_entries = max_entries; + self + } + + /// Sets the number of recent entries retained in memory. + pub const fn with_recent_cache_entries(mut self, recent_cache_entries: u32) -> Self { + self.recent_cache_entries = recent_cache_entries; + self + } +} + +/// A disk-backed BAL store with in-memory indexes. +/// +/// BAL payloads are stored as raw bytes under `/entries/`. +/// Block-number index entries are stored under `/index/` with +/// 32-byte block hash content. +#[derive(Clone, Debug)] +pub struct DiskFileBalStore { + inner: Arc, +} + +#[derive(Debug, Default)] +struct IndexState { + block_to_hash: BTreeMap, + hash_to_block: HashMap, +} + +#[derive(Debug)] +struct RecentBalCache { + capacity: u32, + entries: BTreeMap, +} + +impl RecentBalCache { + const fn new(capacity: u32) -> Self { + Self { capacity, entries: BTreeMap::new() } + } + + fn insert(&mut self, block_hash: BlockHash, block_number: BlockNumber, bal: Bytes) { + if self.capacity == 0 { + return; + } + + // Keep a single slot per hash within the recent window, then write the latest position. + self.remove_by_hash(block_hash); + self.entries.insert(block_number, (block_hash, bal)); + + while self.entries.len() as u32 > self.capacity { + let Some((&oldest_number, _)) = self.entries.first_key_value() else { + break; + }; + self.entries.remove(&oldest_number); + } + } + + fn get(&self, block_number: BlockNumber, block_hash: BlockHash) -> Option { + self.entries + .get(&block_number) + .and_then(|(cached_hash, bal)| (*cached_hash == block_hash).then(|| bal.clone())) + } + + fn remove_by_hash(&mut self, block_hash: BlockHash) { + self.entries.retain(|_, (cached_hash, _)| *cached_hash != block_hash); + } +} + +#[derive(Debug)] +struct DiskFileBalStoreInner { + root_dir: PathBuf, + entries_dir: PathBuf, + index_dir: PathBuf, + max_entries: u32, + state: Mutex, + recent_cache: RwLock, +} + +impl DiskFileBalStore { + /// Opens (or creates) a disk BAL store and rebuilds in-memory indexes. + pub fn open( + bal_dir: impl Into, + opts: DiskFileBalStoreConfig, + ) -> Result { + let root_dir = bal_dir.into(); + let entries_dir = root_dir.join("entries"); + let index_dir = root_dir.join("index"); + + fs::create_dir_all(&entries_dir)?; + fs::create_dir_all(&index_dir)?; + + let inner = DiskFileBalStoreInner { + root_dir, + entries_dir, + index_dir, + max_entries: opts.max_entries, + state: Mutex::new(IndexState::default()), + recent_cache: RwLock::new(RecentBalCache::new(opts.recent_cache_entries)), + }; + inner.rebuild_indexes()?; + + Ok(Self { inner: Arc::new(inner) }) + } +} + +impl BalStore for DiskFileBalStore { + /// Delegates insertion to the shared inner implementation. + fn insert( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: Bytes, + ) -> Result<(), BalStoreError> { + self.inner.insert(block_hash, block_number, bal) + } + + /// Delegates by-hash reads to the shared inner implementation. + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, BalStoreError> { + self.inner.get_by_hashes(block_hashes) + } + + /// Delegates contiguous range reads to the shared inner implementation. + fn get_by_range(&self, start: BlockNumber, count: u64) -> Result, BalStoreError> { + self.inner.get_by_range(start, count) + } + + fn get_by_block_number( + &self, + block_number: BlockNumber, + ) -> Result, BalStoreError> { + self.inner.get_by_block_number(block_number) + } + + fn get_by_block_hash(&self, block_hash: BlockHash) -> Result, BalStoreError> { + self.inner.get_by_block_hash(block_hash) + } +} + +impl DiskFileBalStoreInner { + /// Rebuilds in-memory indexes from disk. + /// + /// Malformed index files or orphaned entries are skipped instead of failing startup, then + /// capacity eviction is re-applied to reestablish retention guarantees. + fn rebuild_indexes(&self) -> Result<(), BalStoreError> { + let mut indexed = Vec::new(); + + let dir = match fs::read_dir(&self.index_dir) { + Ok(dir) => dir, + Err(err) if err.kind() == io::ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(err.into()), + }; + + for entry in dir { + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + debug!(target: "bal::store", %err, "Failed to read BAL index dir entry"); + continue; + } + }; + + let path = entry.path(); + let Some(file_name) = path.file_name().and_then(|f| f.to_str()) else { + debug!(target: "bal::store", ?path, "Skipping BAL index entry with non-utf8 name"); + continue; + }; + + let Ok(block_number) = file_name.parse::() else { + debug!(target: "bal::store", ?path, "Skipping BAL index entry with invalid block number name"); + continue; + }; + + let hash_bytes = match fs::read(&path) { + Ok(bytes) => bytes, + Err(err) => { + debug!(target: "bal::store", ?path, %err, "Failed reading BAL index file"); + continue; + } + }; + + if hash_bytes.len() != 32 { + debug!( + target: "bal::store", + ?path, + len = hash_bytes.len(), + "Skipping BAL index file with invalid hash length" + ); + continue; + } + + let block_hash = BlockHash::from_slice(&hash_bytes); + if !self.entry_file(block_hash).is_file() { + debug!( + target: "bal::store", + block_number, + ?block_hash, + "Skipping BAL index entry pointing to missing BAL payload file" + ); + continue; + } + + indexed.push((block_number, block_hash)); + } + + indexed.sort_unstable_by_key(|(number, _)| *number); + + let mut state = self.state.lock(); + state.block_to_hash.clear(); + state.hash_to_block.clear(); + + for (block_number, block_hash) in indexed { + if let Some(old_number) = state.hash_to_block.insert(block_hash, block_number) { + state.block_to_hash.remove(&old_number); + let _ = self.remove_if_exists(self.index_file(old_number)); + } + state.block_to_hash.insert(block_number, block_hash); + } + + self.evict_over_capacity(&mut state)?; + Ok(()) + } + + /// Persists a BAL payload and updates both in-memory and on-disk indexes. + /// + /// This also handles reorg/relocation semantics by removing stale index or payload files + /// that conflict with the new `(block_number, block_hash)` mapping. + fn insert( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: Bytes, + ) -> Result<(), BalStoreError> { + let mut state = self.state.lock(); + let mut removed_hashes = Vec::new(); + + // If the hash was previously indexed at another number, move the index. + if let Some(old_number) = + state.hash_to_block.get(&block_hash).copied().filter(|n| *n != block_number) + { + state.block_to_hash.remove(&old_number); + self.remove_if_exists(self.index_file(old_number))?; + } + + // Reorg replacement: remove old hash payload at this number. + if let Some(old_hash) = + state.block_to_hash.get(&block_number).copied().filter(|h| *h != block_hash) + { + state.hash_to_block.remove(&old_hash); + self.remove_if_exists(self.entry_file(old_hash))?; + removed_hashes.push(old_hash); + } + + fs::write(self.entry_file(block_hash), bal.as_ref())?; + fs::write(self.index_file(block_number), block_hash.as_slice())?; + + state.block_to_hash.insert(block_number, block_hash); + state.hash_to_block.insert(block_hash, block_number); + + self.evict_over_capacity(&mut state)?; + drop(state); + + let mut recent_cache = self.recent_cache.write(); + for old_hash in removed_hashes { + recent_cache.remove_by_hash(old_hash); + } + recent_cache.insert(block_hash, block_number, bal); + + Ok(()) + } + + /// Returns BAL bytes for each requested hash in request order. + /// + /// Missing files are mapped to `None`; other filesystem errors are surfaced. + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, BalStoreError> { + let block_numbers: Vec> = { + let state = self.state.lock(); + block_hashes.iter().map(|hash| state.hash_to_block.get(hash).copied()).collect() + }; + + let mut results = vec![None; block_hashes.len()]; + let mut misses = Vec::new(); + + { + let recent_cache = self.recent_cache.read(); + for (idx, hash) in block_hashes.iter().copied().enumerate() { + if let Some(bal) = + block_numbers[idx].and_then(|block_number| recent_cache.get(block_number, hash)) + { + results[idx] = Some(bal); + continue; + } + misses.push((idx, hash, block_numbers[idx])); + } + } + + if misses.is_empty() { + return Ok(results); + } + + let mut recovered = Vec::new(); + for (idx, hash, block_number) in misses { + let path = self.entry_file(hash); + match fs::read(path) { + Ok(bytes) => { + let bal = Bytes::from(bytes); + results[idx] = Some(bal.clone()); + if let Some(block_number) = block_number { + recovered.push((hash, block_number, bal)); + } + } + Err(err) if err.kind() == io::ErrorKind::NotFound => {} + Err(err) => return Err(err.into()), + } + } + + if !recovered.is_empty() { + let mut recent_cache = self.recent_cache.write(); + for (hash, block_number, bal) in recovered { + recent_cache.insert(hash, block_number, bal); + } + } + + Ok(results) + } + + /// Returns contiguous BAL bytes for `[start, start + count)`. + /// + /// The method first resolves the contiguous hash sequence from the in-memory block index, + /// then serves from hot cache first and falls back to disk, stopping at the first missing + /// payload file. + fn get_by_range(&self, start: BlockNumber, count: u64) -> Result, BalStoreError> { + let numbered_hashes: Vec<(BlockNumber, BlockHash)> = { + let state = self.state.lock(); + let mut hashes = Vec::new(); + for number in start..start.saturating_add(count) { + let Some(hash) = state.block_to_hash.get(&number).copied() else { + break; + }; + hashes.push((number, hash)); + } + hashes + }; + + let mut result = Vec::with_capacity(numbered_hashes.len()); + { + let recent_cache = self.recent_cache.read(); + for (block_number, hash) in &numbered_hashes { + let Some(bal) = recent_cache.get(*block_number, *hash) else { + break; + }; + result.push(bal); + } + } + + let cached_prefix_len = result.len(); + let mut recovered = Vec::new(); + for (block_number, hash) in numbered_hashes.into_iter().skip(cached_prefix_len) { + match fs::read(self.entry_file(hash)) { + Ok(bytes) => { + let bal = Bytes::from(bytes); + recovered.push((hash, block_number, bal.clone())); + result.push(bal); + } + Err(err) if err.kind() == io::ErrorKind::NotFound => break, + Err(err) => return Err(err.into()), + } + } + + if !recovered.is_empty() { + let mut recent_cache = self.recent_cache.write(); + for (hash, block_number, bal) in recovered { + recent_cache.insert(hash, block_number, bal); + } + } + + Ok(result) + } + + fn get_by_block_number(&self, number: BlockNumber) -> Result, BalStoreError> { + let hash = { + let state = self.state.lock(); + state.block_to_hash.get(&number).copied() + }; + + let Some(hash) = hash else { + return Ok(None); + }; + + if let Some(bal) = self.recent_cache.read().get(number, hash) { + return Ok(Some(bal)); + } + + match fs::read(self.entry_file(hash)) { + Ok(bytes) => { + let bal = Bytes::from(bytes); + + self.recent_cache.write().insert(hash, number, bal.clone()); + + Ok(Some(bal)) + } + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err.into()), + } + } + + fn get_by_block_hash(&self, hash: BlockHash) -> Result, BalStoreError> { + let block_number = { + let state = self.state.lock(); + state.hash_to_block.get(&hash).copied() + }; + + if let Some(block_number) = block_number { + if let Some(bal) = self.recent_cache.read().get(block_number, hash) { + return Ok(Some(bal)); + } + } + match fs::read(self.entry_file(hash)) { + Ok(bytes) => { + let bal = Bytes::from(bytes); + + if let Some(block_number) = block_number { + self.recent_cache.write().insert(hash, block_number, bal.clone()); + } + + Ok(Some(bal)) + } + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None), + Err(err) => Err(err.into()), + } + } + + /// Evicts oldest block-number entries until the configured capacity is satisfied. + /// + /// Eviction removes both entry payload files and index files to keep disk and memory in sync. + fn evict_over_capacity(&self, state: &mut IndexState) -> Result<(), BalStoreError> { + let mut evicted_hashes = Vec::new(); + while state.block_to_hash.len() as u32 > self.max_entries { + let Some((&oldest_number, &oldest_hash)) = state.block_to_hash.first_key_value() else { + break; + }; + + state.block_to_hash.remove(&oldest_number); + state.hash_to_block.remove(&oldest_hash); + evicted_hashes.push(oldest_hash); + + self.remove_if_exists(self.entry_file(oldest_hash))?; + self.remove_if_exists(self.index_file(oldest_number))?; + } + + if !evicted_hashes.is_empty() { + let mut recent_cache = self.recent_cache.write(); + for hash in evicted_hashes { + recent_cache.remove_by_hash(hash); + } + } + + Ok(()) + } + + /// Deletes a file if it exists. + /// + /// `NotFound` is treated as success to simplify idempotent cleanup paths. + fn remove_if_exists(&self, path: PathBuf) -> Result<(), BalStoreError> { + match fs::remove_file(path) { + Ok(()) => Ok(()), + Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()), + Err(err) => Err(err.into()), + } + } + + /// Returns the payload file path for a block hash. + #[inline] + fn entry_file(&self, block_hash: BlockHash) -> PathBuf { + self.entries_dir.join(format!("{block_hash:x}")) + } + + /// Returns the index file path for a block number. + #[inline] + fn index_file(&self, block_number: BlockNumber) -> PathBuf { + self.index_dir.join(block_number.to_string()) + } +} + +impl std::fmt::Display for DiskFileBalStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DiskFileBalStore({})", self.inner.root_dir.display()) + } +} + +impl BalStore for Arc { + fn insert( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: Bytes, + ) -> Result<(), BalStoreError> { + (**self).insert(block_hash, block_number, bal) + } + + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, BalStoreError> { + (**self).get_by_hashes(block_hashes) + } + + fn get_by_range(&self, start: BlockNumber, count: u64) -> Result, BalStoreError> { + (**self).get_by_range(start, count) + } + + fn get_by_block_number( + &self, + block_number: BlockNumber, + ) -> Result, BalStoreError> { + (**self).get_by_block_number(block_number) + } + + fn get_by_block_hash(&self, block_hash: BlockHash) -> Result, BalStoreError> { + (**self).get_by_block_hash(block_hash) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::B256; + + fn tmp_store_with_config( + config: DiskFileBalStoreConfig, + ) -> (DiskFileBalStore, tempfile::TempDir) { + let dir = tempfile::tempdir().unwrap(); + let store = DiskFileBalStore::open(dir.path(), config).unwrap(); + (store, dir) + } + + fn tmp_store(max_entries: u32) -> (DiskFileBalStore, tempfile::TempDir) { + tmp_store_with_config( + DiskFileBalStoreConfig::default() + .with_max_entries(max_entries) + .with_recent_cache_entries(DEFAULT_RECENT_BAL_CACHE_ENTRIES), + ) + } + + #[test] + fn insert_and_get_hashes() { + let (store, _dir) = tmp_store(10); + let h1 = B256::random(); + let h2 = B256::random(); + + store.insert(h1, 1, Bytes::from_static(b"a")).unwrap(); + store.insert(h2, 2, Bytes::from_static(b"b")).unwrap(); + + let got = store.get_by_hashes(&[h1, h2, B256::random()]).unwrap(); + assert_eq!(got, vec![Some(Bytes::from_static(b"a")), Some(Bytes::from_static(b"b")), None]); + } + + #[test] + fn range_stops_on_gap() { + let (store, _dir) = tmp_store(10); + store.insert(B256::random(), 1, Bytes::from_static(b"a")).unwrap(); + store.insert(B256::random(), 2, Bytes::from_static(b"b")).unwrap(); + store.insert(B256::random(), 4, Bytes::from_static(b"d")).unwrap(); + + let got = store.get_by_range(1, 10).unwrap(); + assert_eq!(got.len(), 2); + } + + #[test] + fn reorg_replaces_entry() { + let (store, _dir) = tmp_store(10); + let old_hash = B256::random(); + let new_hash = B256::random(); + + store.insert(old_hash, 42, Bytes::from_static(b"old")).unwrap(); + store.insert(new_hash, 42, Bytes::from_static(b"new")).unwrap(); + + let got_old = store.get_by_hashes(&[old_hash]).unwrap(); + let got_new = store.get_by_hashes(&[new_hash]).unwrap(); + assert_eq!(got_old, vec![None]); + assert_eq!(got_new, vec![Some(Bytes::from_static(b"new"))]); + } + + #[test] + fn evicts_oldest() { + let (store, _dir) = tmp_store(2); + store.insert(B256::random(), 10, Bytes::from_static(b"a")).unwrap(); + store.insert(B256::random(), 20, Bytes::from_static(b"b")).unwrap(); + store.insert(B256::random(), 30, Bytes::from_static(b"c")).unwrap(); + + let got = store.get_by_range(10, 1).unwrap(); + assert!(got.is_empty()); + } + + #[test] + fn recovers_after_restart() { + let dir = tempfile::tempdir().unwrap(); + let h1 = B256::random(); + let h2 = B256::random(); + { + let store = DiskFileBalStore::open(dir.path(), Default::default()).unwrap(); + store.insert(h1, 100, Bytes::from_static(b"a")).unwrap(); + store.insert(h2, 101, Bytes::from_static(b"b")).unwrap(); + } + + let reopened = DiskFileBalStore::open(dir.path(), Default::default()).unwrap(); + let got = reopened.get_by_hashes(&[h1, h2]).unwrap(); + assert_eq!(got, vec![Some(Bytes::from_static(b"a")), Some(Bytes::from_static(b"b"))]); + } + + #[test] + fn recent_cache_keeps_last_blocks() { + let (store, _dir) = tmp_store_with_config( + DiskFileBalStoreConfig::default().with_max_entries(10).with_recent_cache_entries(2), + ); + + let h1 = B256::random(); + let h2 = B256::random(); + let h3 = B256::random(); + + store.insert(h1, 1, Bytes::from_static(b"a")).unwrap(); + store.insert(h2, 2, Bytes::from_static(b"b")).unwrap(); + store.insert(h3, 3, Bytes::from_static(b"c")).unwrap(); + + fs::remove_file(store.inner.entry_file(h1)).unwrap(); + fs::remove_file(store.inner.entry_file(h2)).unwrap(); + fs::remove_file(store.inner.entry_file(h3)).unwrap(); + + let got = store.get_by_hashes(&[h1, h2, h3]).unwrap(); + assert_eq!(got, vec![None, Some(Bytes::from_static(b"b")), Some(Bytes::from_static(b"c"))]); + } +} diff --git a/crates/net/bal-store/src/lib.rs b/crates/net/bal-store/src/lib.rs new file mode 100644 index 00000000000..269bb00304d --- /dev/null +++ b/crates/net/bal-store/src/lib.rs @@ -0,0 +1,56 @@ +//! Storage abstractions and implementations for EIP-7928 Block Access Lists (BALs). + +use alloy_primitives::{BlockHash, BlockNumber, Bytes}; +use std::{fmt::Debug, io}; + +pub mod disk; +mod noop; +pub use disk::{ + DiskFileBalStore, DiskFileBalStoreConfig, DEFAULT_MAX_BAL_STORE_ENTRIES, + DEFAULT_RECENT_BAL_CACHE_ENTRIES, +}; +pub use noop::NoopBalStore; + +/// A store for EIP-7928 Block Access Lists (BALs). +/// +/// The store is keyed by block hash and maintains a block-number index for range queries. +/// Implementations should preserve contiguous-range semantics: +/// queries by range stop at the first missing block. +pub trait BalStore: Send + Sync + 'static { + /// Inserts a BAL for the given block hash and number. + fn insert( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: Bytes, + ) -> Result<(), BalStoreError>; + + /// Returns BALs for each requested block hash in the same order. + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, BalStoreError>; + + /// Returns contiguous BALs in `[start, start + count)` until the first gap. + fn get_by_range(&self, start: BlockNumber, count: u64) -> Result, BalStoreError>; + + /// Returns the BAL for the given block number, if it exists. + fn get_by_block_number( + &self, + block_number: BlockNumber, + ) -> Result, BalStoreError>; + + /// Returns the BAL for the given block hash, if it exists. + fn get_by_block_hash(&self, block_hash: BlockHash) -> Result, BalStoreError>; +} + +/// Error variants that can occur when interacting with a BAL store. +#[derive(Debug, thiserror::Error)] +pub enum BalStoreError { + /// Filesystem I/O error. + #[error("BAL store I/O error: {0}")] + Io(#[from] io::Error), + /// Other implementation-specific error. + #[error(transparent)] + Other(Box), +} diff --git a/crates/net/bal-store/src/noop.rs b/crates/net/bal-store/src/noop.rs new file mode 100644 index 00000000000..fb3d77c5995 --- /dev/null +++ b/crates/net/bal-store/src/noop.rs @@ -0,0 +1,44 @@ +//! No-op BAL store implementation. + +use crate::{BalStore, BalStoreError}; +use alloy_primitives::{BlockHash, BlockNumber, Bytes}; + +/// A no-op BAL store. +/// +/// This implementation never persists data and always returns cache-miss semantics: +/// hash lookups return `None` and range lookups return an empty result. +#[derive(Clone, Debug, Default)] +pub struct NoopBalStore; + +impl BalStore for NoopBalStore { + fn insert( + &self, + _block_hash: BlockHash, + _block_number: BlockNumber, + _bal: Bytes, + ) -> Result<(), BalStoreError> { + Ok(()) + } + + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, BalStoreError> { + Ok(vec![None; block_hashes.len()]) + } + + fn get_by_range(&self, _start: BlockNumber, _count: u64) -> Result, BalStoreError> { + Ok(Vec::new()) + } + + fn get_by_block_number( + &self, + _block_number: BlockNumber, + ) -> Result, BalStoreError> { + Ok(None) + } + + fn get_by_block_hash(&self, _block_hash: BlockHash) -> Result, BalStoreError> { + Ok(None) + } +} diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index b005f0d4a3a..f3f1654f7b5 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -34,6 +34,7 @@ reth-tokio-util.workspace = true reth-consensus.workspace = true reth-network-peers = { workspace = true, features = ["net"] } reth-network-types = { workspace = true, features = ["serde"] } +reth-bal-store.workspace = true # ethereum alloy-consensus.workspace = true diff --git a/crates/net/network/src/builder.rs b/crates/net/network/src/builder.rs index 8c8ebd8f1e5..afc25039437 100644 --- a/crates/net/network/src/builder.rs +++ b/crates/net/network/src/builder.rs @@ -11,9 +11,11 @@ use crate::{ }, NetworkHandle, NetworkManager, }; +use reth_bal_store::{BalStore, NoopBalStore}; use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives}; use reth_network_api::test_utils::PeersHandleProvider; use reth_transaction_pool::TransactionPool; +use std::sync::Arc; use tokio::sync::mpsc; /// We set the max channel capacity of the `EthRequestHandler` to 256 @@ -63,12 +65,21 @@ impl NetworkBuilder { pub fn request_handler( self, client: Client, + ) -> NetworkBuilder, N> { + self.request_handler_with_bal_store(client, Arc::new(NoopBalStore)) + } + + /// Creates a new [`EthRequestHandler`] with a custom BAL store and wires it to the network. + pub fn request_handler_with_bal_store( + self, + client: Client, + bal_store: Arc, ) -> NetworkBuilder, N> { let Self { mut network, transactions, .. } = self; let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY); network.set_eth_request_handler(tx); let peers = network.handle().peers_handle().clone(); - let request_handler = EthRequestHandler::new(client, peers, rx); + let request_handler = EthRequestHandler::with_bal_store(client, peers, rx, bal_store); NetworkBuilder { network, request_handler, transactions } } diff --git a/crates/net/network/src/eth_requests.rs b/crates/net/network/src/eth_requests.rs index d5944114467..31e80a1f243 100644 --- a/crates/net/network/src/eth_requests.rs +++ b/crates/net/network/src/eth_requests.rs @@ -9,6 +9,7 @@ use alloy_eips::BlockHashOrNumber; use alloy_primitives::Bytes; use alloy_rlp::Encodable; use futures::StreamExt; +use reth_bal_store::{BalStore, NoopBalStore}; use reth_eth_wire::{ BlockAccessLists, BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockAccessLists, GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, GetReceipts70, HeadersDirection, @@ -20,13 +21,16 @@ use reth_network_peers::PeerId; use reth_primitives_traits::Block; use reth_storage_api::{BlockReader, HeaderProvider}; use std::{ + fmt, future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, time::Duration, }; use tokio::sync::{mpsc::Receiver, oneshot}; use tokio_stream::wrappers::ReceiverStream; +use tracing::debug; // Limits: @@ -52,7 +56,6 @@ pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024; /// Manages eth related requests on top of the p2p network. /// /// This can be spawned to another task and is supposed to be run as background service. -#[derive(Debug)] #[must_use = "Manager does nothing unless polled."] pub struct EthRequestHandler { /// The client type that can interact with the chain. @@ -63,18 +66,37 @@ pub struct EthRequestHandler { peers: PeersHandle, /// Incoming request from the [`NetworkManager`](crate::NetworkManager). incoming_requests: ReceiverStream>, + /// Shared BAL store used to serve `GetBlockAccessLists` requests. + bal_store: Arc, /// Metrics for the eth request handler. metrics: EthRequestHandlerMetrics, } +impl fmt::Debug for EthRequestHandler { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EthRequestHandler").finish_non_exhaustive() + } +} + // === impl EthRequestHandler === impl EthRequestHandler { - /// Create a new instance + /// Create a new instance with a no-op BAL store. pub fn new(client: C, peers: PeersHandle, incoming: Receiver>) -> Self { + Self::with_bal_store(client, peers, incoming, Arc::new(NoopBalStore)) + } + + /// Create a new instance with an explicit BAL store. + pub fn with_bal_store( + client: C, + peers: PeersHandle, + incoming: Receiver>, + bal_store: Arc, + ) -> Self { Self { client, peers, incoming_requests: ReceiverStream::new(incoming), + bal_store, metrics: Default::default(), } } @@ -284,14 +306,38 @@ where /// Handles [`GetBlockAccessLists`] queries. /// - /// For now this returns one empty BAL per requested hash. + /// Returns one BAL entry per requested hash in request order. + /// + /// BAL data is loaded from the configured store; missing entries (or store failures) are + /// returned as empty bytes to preserve response cardinality. fn on_block_access_lists_request( &self, _peer_id: PeerId, request: GetBlockAccessLists, response: oneshot::Sender>, ) { - let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect(); + let block_hashes = request.0; + let requested_len = block_hashes.len(); + let mut access_lists = match self.bal_store.get_by_hashes(&block_hashes) { + Ok(bals) => bals.into_iter().map(|bal| bal.unwrap_or_else(Bytes::new)).collect(), + Err(err) => { + debug!( + target: "net::eth", + %err, + requested_len, + "Failed to read BALs from store, returning empty BALs" + ); + vec![Bytes::new(); requested_len] + } + }; + + // Enforce one response entry per requested hash. + if access_lists.len() < requested_len { + access_lists.resize(requested_len, Bytes::new()); + } else if access_lists.len() > requested_len { + access_lists.truncate(requested_len); + } + let _ = response.send(Ok(BlockAccessLists(access_lists))); } diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index 8cab662325b..113404a6fbc 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -15,7 +15,7 @@ use reth_provider::FullProvider; use reth_tasks::TaskExecutor; use reth_tokio_util::EventSender; use reth_transaction_pool::{PoolTransaction, TransactionPool}; -use std::{fmt::Debug, future::Future, marker::PhantomData}; +use std::{fmt, fmt::Debug, future::Future, marker::PhantomData}; /// A helper trait that is downstream of the [`NodeTypes`] trait and adds stateful /// components to the node. @@ -103,7 +103,7 @@ pub trait FullNodeComponents: FullNodeTypes + Clone + 'static { } /// Context passed to [`NodeAddOns::launch_add_ons`], -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AddOnsContext<'a, N: FullNodeComponents> { /// Node with all configured components. pub node: N, @@ -117,6 +117,12 @@ pub struct AddOnsContext<'a, N: FullNodeComponents> { pub jwt_secret: JwtSecret, } +impl fmt::Debug for AddOnsContext<'_, N> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("AddOnsContext").finish_non_exhaustive() + } +} + /// Customizable node add-on types. /// /// This trait defines the interface for extending a node with additional functionality beyond diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index c9288cf90f4..283cff20d25 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -37,6 +37,7 @@ reth-node-api.workspace = true reth-node-core.workspace = true reth-node-events.workspace = true reth-node-metrics.workspace = true +reth-bal-store.workspace = true reth-payload-builder.workspace = true reth-primitives-traits.workspace = true reth-provider.workspace = true diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index 506a31616e3..8ff60a9a03d 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -11,6 +11,7 @@ use crate::{ }; use alloy_eips::eip4844::env_settings::EnvKzgSettings; use futures::Future; +use reth_bal_store::BalStore; use reth_chainspec::{EthChainSpec, EthereumHardforks, Hardforks}; use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; use reth_exex::ExExContext; @@ -736,6 +737,8 @@ pub struct BuilderContext { pub(crate) executor: TaskExecutor, /// Config container pub(crate) config_container: WithConfigs<::ChainSpec>, + /// Shared BAL store used by network handlers and RPC Engine API. + pub(crate) bal_store: Arc, } impl BuilderContext { @@ -745,8 +748,9 @@ impl BuilderContext { provider: Node::Provider, executor: TaskExecutor, config_container: WithConfigs<::ChainSpec>, + bal_store: Arc, ) -> Self { - Self { head, provider, executor, config_container } + Self { head, provider, executor, config_container, bal_store } } /// Returns the configured provider to interact with the blockchain. @@ -781,6 +785,11 @@ impl BuilderContext { &self.executor } + /// Returns the shared BAL store. + pub fn bal_store(&self) -> &Arc { + &self.bal_store + } + /// Returns the chain spec of the node. pub fn chain_spec(&self) -> Arc<::ChainSpec> { self.provider().chain_spec() @@ -900,7 +909,7 @@ impl BuilderContext { { let (handle, network, txpool, eth) = builder .transactions_with_policies(pool, tx_config, propagation_policy, announcement_policy) - .request_handler(self.provider().clone()) + .request_handler_with_bal_store(self.provider().clone(), self.bal_store().clone()) .split_with_handle(); self.executor.spawn_critical_blocking_task("p2p txpool", txpool); diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 537cca6f46a..0091e7c207e 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -38,6 +38,7 @@ use alloy_eips::eip2124::Head; use alloy_primitives::{BlockNumber, B256}; use eyre::Context; use rayon::ThreadPoolBuilder; +use reth_bal_store::{BalStore, DiskFileBalStore, DiskFileBalStoreConfig}; use reth_chainspec::{Chain, EthChainSpec, EthereumHardforks}; use reth_config::{config::EtlConfig, PruneConfig}; use reth_consensus::noop::NoopConsensus; @@ -786,12 +787,20 @@ where { // fetch the head block from the database let head = self.lookup_head()?; + let bal_store = Arc::new( + DiskFileBalStore::open( + self.node_config().datadir().balstore(), + DiskFileBalStoreConfig::default(), + ) + .wrap_err("failed to open BAL store")?, + ) as Arc; let builder_ctx = BuilderContext::new( head, self.blockchain_db().clone(), self.task_executor().clone(), self.configs().clone(), + bal_store.clone(), ); debug!(target: "reth::cli", "creating components"); @@ -815,6 +824,7 @@ where }, node_adapter, head, + bal_store, }; let ctx = LaunchContextWith { @@ -1002,6 +1012,11 @@ where &self.node_adapter().components } + /// Returns the shared BAL store instance. + pub fn bal_store(&self) -> &Arc { + &self.right().bal_store + } + /// Launches ExEx (Execution Extensions) and returns the ExEx manager handle. #[allow(clippy::type_complexity)] pub async fn launch_exex( @@ -1233,6 +1248,7 @@ where db_provider_container: WithMeteredProvider>, node_adapter: NodeAdapter, head: Head, + bal_store: Arc, } /// Returns the metrics hooks for the node. diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 28a4cd570bf..c34e4d69843 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -31,7 +31,7 @@ use reth_node_core::{ }; use reth_node_events::node; use reth_provider::{ - providers::{BlockchainProvider, NodeTypesForProvider}, + providers::{BalProvider, BlockchainProvider, NodeTypesForProvider}, BlockNumReader, StorageSettingsCache, }; use reth_tasks::TaskExecutor; @@ -90,6 +90,9 @@ impl EngineNodeLauncher { // Create changeset cache that will be shared across the engine let changeset_cache = ChangesetCache::new(); + // Create the bal cache + let bal_provider = BalProvider::default(); + // setup the launch context let ctx = ctx .with_configured_globals(engine_tree_config.reserved_cpu_cores()) @@ -120,7 +123,7 @@ impl EngineNodeLauncher { // passing FullNodeTypes as type parameter here so that we can build // later the components. .with_blockchain_db::(move |provider_factory| { - Ok(BlockchainProvider::new(provider_factory)?) + Ok(BlockchainProvider::new(provider_factory,bal_provider)?) })? .with_components(components_builder, on_component_initialized).await?; diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 7eecedd5a73..9da3f4eb144 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -981,7 +981,8 @@ where let Self { eth_api_builder, engine_api_builder, hooks, .. } = self; let engine_api = engine_api_builder.build_engine_api(&ctx).await?; - let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx; + let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events, .. } = + ctx; info!(target: "reth::cli", "Engine API handler initialized"); diff --git a/crates/node/core/src/dirs.rs b/crates/node/core/src/dirs.rs index 85388727b63..f07a984e85d 100644 --- a/crates/node/core/src/dirs.rs +++ b/crates/node/core/src/dirs.rs @@ -346,6 +346,14 @@ impl ChainPath { self.data_dir().join("blobstore") } + /// Returns the path to the BAL store directory for this chain where EIP-7928 + /// block access lists are persisted. + /// + /// `//balstore` + pub fn balstore(&self) -> PathBuf { + self.data_dir().join("balstore") + } + /// Returns the path to the local transactions backup file /// /// `//txpool-transactions-backup.rlp` diff --git a/crates/prune/prune/src/segments/mod.rs b/crates/prune/prune/src/segments/mod.rs index 429ee60f24e..de3896fbed0 100644 --- a/crates/prune/prune/src/segments/mod.rs +++ b/crates/prune/prune/src/segments/mod.rs @@ -239,7 +239,7 @@ mod tests { use super::*; use alloy_primitives::B256; use reth_provider::{ - providers::BlockchainProvider, + providers::{BalProvider, BlockchainProvider}, test_utils::{create_test_provider_factory, MockEthProvider}, BlockWriter, }; @@ -291,7 +291,7 @@ mod tests { provider_rw.commit().expect("failed to commit"); // Create a new provider - let provider = BlockchainProvider::new(factory).unwrap(); + let provider = BlockchainProvider::new(factory, BalProvider::default()).unwrap(); // Since there are no transactions, expected None let range = input.get_next_tx_num_range(&provider).expect("Expected range"); @@ -329,7 +329,7 @@ mod tests { provider_rw.commit().expect("failed to commit"); // Create a new provider - let provider = BlockchainProvider::new(factory).unwrap(); + let provider = BlockchainProvider::new(factory, BalProvider::default()).unwrap(); // Get the next tx number range let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap(); @@ -375,7 +375,7 @@ mod tests { provider_rw.commit().expect("failed to commit"); // Create a new provider - let provider = BlockchainProvider::new(factory).unwrap(); + let provider = BlockchainProvider::new(factory, BalProvider::default()).unwrap(); // Fetch the range and check if it is correct let range = input.get_next_tx_num_range(&provider).expect("Expected range").unwrap(); @@ -411,7 +411,7 @@ mod tests { provider_rw.commit().expect("failed to commit"); // Create a new provider - let provider = BlockchainProvider::new(factory).unwrap(); + let provider = BlockchainProvider::new(factory, BalProvider::default()).unwrap(); // Get the last tx number // Calculate the total number of transactions diff --git a/crates/rpc/rpc-api/src/engine.rs b/crates/rpc/rpc-api/src/engine.rs index ea6d85cbe5f..4b841c90728 100644 --- a/crates/rpc/rpc-api/src/engine.rs +++ b/crates/rpc/rpc-api/src/engine.rs @@ -11,14 +11,11 @@ use alloy_eips::{ use alloy_json_rpc::RpcObject; use alloy_primitives::{Address, BlockHash, Bytes, B256, U256, U64}; use alloy_rpc_types_engine::{ - ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadInputV2, ExecutionPayloadV1, - ExecutionPayloadV3, ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, - PayloadStatus, + ClientVersionV1, ExecutionPayloadBodiesV1, ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, + ExecutionPayloadV1, ExecutionPayloadV3, ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, + PayloadId, PayloadStatus, }; -// TODO: Replace with alloy_rpc_types_engine::ExecutionPayloadBodiesV2 once available in alloy -// bal-devnet2 branch. V2 adds block_access_list field for EIP-7928. -type ExecutionPayloadBodiesV2 = ExecutionPayloadBodiesV1; use alloy_rpc_types_eth::{ state::StateOverride, BlockOverrides, EIP1186AccountProofResponse, Filter, Log, SyncStatus, }; diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index f73a48e8212..50ce49360fc 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -24,6 +24,7 @@ reth-engine-primitives.workspace = true reth-transaction-pool.workspace = true reth-primitives-traits.workspace = true reth-network-api.workspace = true +reth-bal-store.workspace = true # ethereum alloy-eips.workspace = true @@ -57,3 +58,4 @@ reth-node-ethereum.workspace = true reth-tasks = { workspace = true, features = ["test-utils"] } assert_matches.workspace = true +tempfile.workspace = true diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 1ee25c42f44..dbbb0fe5677 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -10,16 +10,15 @@ use alloy_eips::{ use alloy_primitives::{BlockHash, BlockNumber, B256, U64}; use alloy_rpc_types_engine::{ CancunPayloadFields, ClientVersionV1, ExecutionData, ExecutionPayloadBodiesV1, - ExecutionPayloadBodyV1, ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, - ExecutionPayloadV3, ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, - PayloadStatus, PraguePayloadFields, + ExecutionPayloadBodiesV2, ExecutionPayloadBodyV1, ExecutionPayloadBodyV2, + ExecutionPayloadInputV2, ExecutionPayloadSidecar, ExecutionPayloadV1, ExecutionPayloadV3, + ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, + PraguePayloadFields, }; -// TODO: Replace with alloy types once available in alloy bal-devnet2 branch -type ExecutionPayloadBodiesV2 = ExecutionPayloadBodiesV1; -type ExecutionPayloadBodyV2 = ExecutionPayloadBodyV1; use async_trait::async_trait; use jsonrpsee_core::{server::RpcModule, RpcResult}; +use reth_bal_store::{BalStore, BalStoreError}; use reth_chainspec::EthereumHardforks; use reth_engine_primitives::{ConsensusEngineHandle, EngineApiValidator, EngineTypes}; use reth_network_api::NetworkInfo; @@ -156,7 +155,8 @@ where .validator .validate_version_specific_fields(EngineApiMessageVersion::V1, payload_or_attrs)?; - Ok(self.inner.beacon_consensus.new_payload(payload).await?) + let status = self.inner.beacon_consensus.new_payload(payload).await?; + Ok(status) } /// Metered version of `new_payload_v1`. @@ -184,7 +184,9 @@ where self.inner .validator .validate_version_specific_fields(EngineApiMessageVersion::V2, payload_or_attrs)?; - Ok(self.inner.beacon_consensus.new_payload(payload).await?) + + let status = self.inner.beacon_consensus.new_payload(payload).await?; + Ok(status) } /// Metered version of `new_payload_v2`. @@ -213,7 +215,8 @@ where .validator .validate_version_specific_fields(EngineApiMessageVersion::V3, payload_or_attrs)?; - Ok(self.inner.beacon_consensus.new_payload(payload).await?) + let status = self.inner.beacon_consensus.new_payload(payload).await?; + Ok(status) } /// Metrics version of `new_payload_v3` @@ -243,7 +246,8 @@ where .validator .validate_version_specific_fields(EngineApiMessageVersion::V4, payload_or_attrs)?; - Ok(self.inner.beacon_consensus.new_payload(payload).await?) + let status = self.inner.beacon_consensus.new_payload(payload).await?; + Ok(status) } /// Metrics version of `new_payload_v4` @@ -328,7 +332,7 @@ where impl EngineApi where - Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static, + Provider: HeaderProvider + BlockReader + StateProviderFactory + BalStore + 'static, EngineT: EngineTypes, Pool: TransactionPool + 'static, Validator: EngineApiValidator, @@ -735,12 +739,20 @@ where start: BlockNumber, count: u64, ) -> EngineApiResult { - // TODO: add block_access_list field once ExecutionPayloadBodyV2 is in alloy bal-devnet2 - self.get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 { - transactions: block.body().encoded_2718_transactions(), - withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner), - }) - .await + let mut bodies = self + .get_payload_bodies_by_range_with(start, count, |block| ExecutionPayloadBodyV2 { + transactions: block.body().encoded_2718_transactions(), + withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner), + block_access_list: None, + }) + .await?; + let bals = self.inner.provider.get_by_range(start, count)?; + for (body_opt, bal) in bodies.iter_mut().zip(bals) { + if let Some(body) = body_opt.as_mut() { + body.block_access_list = Some(bal); + } + } + Ok(bodies) } /// Metrics version of `get_payload_bodies_by_range_v2` @@ -823,12 +835,21 @@ where &self, hashes: Vec, ) -> EngineApiResult { - // TODO: add block_access_list field once ExecutionPayloadBodyV2 is in alloy bal-devnet2 - self.get_payload_bodies_by_hash_with(hashes, |block| ExecutionPayloadBodyV2 { - transactions: block.body().encoded_2718_transactions(), - withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner), - }) - .await + let mut bodies = self + .get_payload_bodies_by_hash_with(hashes.clone(), |block| ExecutionPayloadBodyV2 { + transactions: block.body().encoded_2718_transactions(), + withdrawals: block.body().withdrawals().cloned().map(Withdrawals::into_inner), + block_access_list: None, + }) + .await?; + + let bals = self.get_bals_by_hash(hashes)?; + for (body_opt, bal) in bodies.iter_mut().zip(bals) { + if let Some(body) = body_opt.as_mut() { + body.block_access_list = Some(bal); + } + } + Ok(bodies) } /// Metrics version of `get_payload_bodies_by_hash_v2` @@ -1044,6 +1065,34 @@ where res } + + /// Retrieves BALs for the given block hashes from the cache. + /// + /// Returns the RLP-encoded BALs for blocks found in the cache or BAL store. + /// Missing blocks are returned as empty bytes. + pub fn get_bals_by_hash( + &self, + block_hashes: Vec, + ) -> Result, BalStoreError> { + Ok(self + .inner + .provider + .get_by_hashes(&block_hashes)? + .into_iter() + .map(|opt| opt.unwrap_or_default()) + .collect()) + } + + /// Retrieves BALs for a range of blocks from the cache or BAL store. + /// + /// Returns the RLP-encoded BALs for blocks in the range `[start, start + count)`. + pub fn get_bals_by_range( + &self, + start: u64, + count: u64, + ) -> Result, BalStoreError> { + self.inner.provider.get_by_range(start, count) + } } // This is the concrete ethereum engine API implementation. @@ -1051,7 +1100,7 @@ where impl EngineApiServer for EngineApi where - Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static, + Provider: HeaderProvider + BlockReader + StateProviderFactory + BalStore + 'static, EngineT: EngineTypes, Pool: TransactionPool + 'static, Validator: EngineApiValidator, @@ -1420,7 +1469,7 @@ where impl RethEngineApiServer for EngineApi where - Provider: HeaderProvider + BlockReader + StateProviderFactory + 'static, + Provider: HeaderProvider + BlockReader + StateProviderFactory + BalStore + 'static, EngineT: EngineTypes, Pool: TransactionPool + 'static, Validator: EngineApiValidator, diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 8d2b678a9f5..c744a2e4410 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -6,6 +6,7 @@ use alloy_rpc_types_engine::{ use jsonrpsee_types::error::{ INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE, INVALID_PARAMS_MSG, SERVER_ERROR_MSG, }; +use reth_bal_store::BalStoreError; use reth_engine_primitives::{BeaconForkChoiceUpdateError, BeaconOnNewPayloadError}; use reth_payload_builder_primitives::PayloadBuilderError; use reth_payload_primitives::{EngineObjectValidationError, VersionSpecificValidationError}; @@ -207,6 +208,12 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { } } +impl From for EngineApiError { + fn from(err: BalStoreError) -> Self { + EngineApiError::Internal(Box::new(err)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/rpc/rpc-engine-api/src/metrics.rs b/crates/rpc/rpc-engine-api/src/metrics.rs index 5c6e781631c..d22cfc573f4 100644 --- a/crates/rpc/rpc-engine-api/src/metrics.rs +++ b/crates/rpc/rpc-engine-api/src/metrics.rs @@ -8,6 +8,8 @@ pub(crate) struct EngineApiMetrics { pub(crate) latency: EngineApiLatencyMetrics, /// Blob-related metrics pub(crate) blob_metrics: BlobMetrics, + // BAL query metrics. + //pub(crate) bal_metrics: BalQueryMetrics, } /// Beacon consensus engine latency metrics. @@ -76,3 +78,24 @@ pub(crate) struct BlobMetrics { /// Number of times getBlobsV2 responded with “miss” pub(crate) get_blobs_requests_failure_total: Counter, } + +// #[derive(Metrics)] +// #[metrics(scope = "engine.rpc.bal")] +// pub(crate) struct BalQueryMetrics { +// /// Number of by-hash queries that required store fallback. +// pub(crate) store_hash_fallback_requests: Counter, +// /// Number of BAL entries recovered from store during by-hash fallback. +// pub(crate) store_hash_fallback_hits: Counter, +// /// Number of BAL entries still missing after by-hash fallback. +// pub(crate) store_hash_fallback_misses: Counter, +// /// Number of store errors during by-hash fallback. +// pub(crate) store_hash_fallback_errors: Counter, +// /// Number of by-range queries that required store fallback. +// pub(crate) store_range_fallback_requests: Counter, +// /// Number of BAL entries recovered from store during by-range fallback. +// pub(crate) store_range_fallback_hits: Counter, +// /// Number of BAL entries still missing after by-range fallback. +// pub(crate) store_range_fallback_misses: Counter, +// /// Number of store errors during by-range fallback. +// pub(crate) store_range_fallback_errors: Counter, +// } diff --git a/crates/rpc/rpc-eth-api/Cargo.toml b/crates/rpc/rpc-eth-api/Cargo.toml index 837adfbddee..2d8972b9bbd 100644 --- a/crates/rpc/rpc-eth-api/Cargo.toml +++ b/crates/rpc/rpc-eth-api/Cargo.toml @@ -13,7 +13,13 @@ workspace = true [dependencies] # reth -revm = { workspace = true, features = ["optional_block_gas_limit", "optional_eip3607", "optional_no_base_fee", "optional_fee_charge", "memory_limit"] } +revm = { workspace = true, features = [ + "optional_block_gas_limit", + "optional_eip3607", + "optional_no_base_fee", + "optional_fee_charge", + "memory_limit", +] } reth-chain-state.workspace = true revm-inspectors.workspace = true reth-primitives-traits = { workspace = true, features = ["rpc-compat"] } @@ -30,6 +36,7 @@ reth-rpc-server-types.workspace = true reth-network-api.workspace = true reth-node-api.workspace = true reth-trie-common = { workspace = true, features = ["eip1186"] } +reth-bal-store.workspace = true # ethereum alloy-evm = { workspace = true, features = ["overrides", "call-util"] } diff --git a/crates/rpc/rpc-eth-api/src/core.rs b/crates/rpc/rpc-eth-api/src/core.rs index 62541b79c1f..e18761be97f 100644 --- a/crates/rpc/rpc-eth-api/src/core.rs +++ b/crates/rpc/rpc-eth-api/src/core.rs @@ -5,7 +5,7 @@ use crate::{ RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, }; use alloy_dyn_abi::TypedData; -use alloy_eips::{eip2930::AccessListResult, BlockId, BlockNumberOrTag}; +use alloy_eips::{eip2930::AccessListResult, eip7928::BlockAccessList, BlockId, BlockNumberOrTag}; use alloy_json_rpc::RpcObject; use alloy_primitives::{Address, Bytes, B256, B64, U256, U64}; use alloy_rpc_types_eth::{ @@ -16,6 +16,7 @@ use alloy_rpc_types_eth::{ }; use alloy_serde::JsonStorageKey; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use reth_bal_store::BalStore; use reth_primitives_traits::TxTy; use reth_rpc_convert::RpcTxReq; use reth_rpc_eth_types::{error::FromEthApiError, EthApiError, FillTransaction}; @@ -918,7 +919,18 @@ where async fn block_access_list_by_block_hash(&self, block_hash: B256) -> RpcResult> { trace!(target: "rpc::eth", ?block_hash, "Serving eth_getBlockAccessListByBlockHash"); - let bal = self.get_block_access_list(block_hash).await?; + // Retrieve BAL either from cache or recompute + let bal: BlockAccessList = if let Some(cached_bal) = + self.provider().get_by_block_hash(block_hash.into()).map_err(T::Error::from_eth_err)? + { + // Decode cached bal and convert to JSON + alloy_rlp::decode_exact(&cached_bal) + .map_err(|e| EthApiError::Internal(reth_errors::RethError::msg(e.to_string())))? + } else { + // if cache is missed, we recompute + self.get_block_access_list(block_hash).await?.unwrap_or_default() + }; + let json = serde_json::to_value(&bal) .map_err(|e| EthApiError::Internal(reth_errors::RethError::msg(e.to_string())))?; @@ -937,7 +949,18 @@ where .map_err(T::Error::from_eth_err)? .ok_or(EthApiError::HeaderNotFound(number.into()))?; - let bal = self.get_block_access_list(block_hash).await?; + // Retrieve BAL either from cache or recompute + let bal: BlockAccessList = if let Some(cached_bal) = + self.provider().get_by_block_hash(block_hash.into()).map_err(T::Error::from_eth_err)? + { + // Decode cached bal and convert to JSON + alloy_rlp::decode_exact(&cached_bal) + .map_err(|e| EthApiError::Internal(reth_errors::RethError::msg(e.to_string())))? + } else { + // if cache is missed, we recompute + self.get_block_access_list(block_hash).await?.unwrap_or_default() + }; + let json = serde_json::to_value(&bal) .map_err(|e| EthApiError::Internal(reth_errors::RethError::msg(e.to_string())))?; diff --git a/crates/rpc/rpc-eth-api/src/node.rs b/crates/rpc/rpc-eth-api/src/node.rs index bde95b9c572..5d2b45972b0 100644 --- a/crates/rpc/rpc-eth-api/src/node.rs +++ b/crates/rpc/rpc-eth-api/src/node.rs @@ -1,5 +1,6 @@ //! Helper trait for interfacing with [`FullNodeComponents`]. +use reth_bal_store::BalStore; use reth_chain_state::CanonStateSubscriptions; use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks, Hardforks}; use reth_evm::ConfigureEvm; @@ -37,6 +38,7 @@ pub trait RpcNodeCore: Clone + Send + Sync + Unpin + 'static { > + StateProviderFactory + CanonStateSubscriptions + StageCheckpointReader + + BalStore + Send + Sync + Clone @@ -130,6 +132,7 @@ where > + StateProviderFactory + CanonStateSubscriptions + StageCheckpointReader + + BalStore + Send + Sync + Unpin diff --git a/crates/rpc/rpc-eth-types/Cargo.toml b/crates/rpc/rpc-eth-types/Cargo.toml index 1ef87874dbd..e07b7051536 100644 --- a/crates/rpc/rpc-eth-types/Cargo.toml +++ b/crates/rpc/rpc-eth-types/Cargo.toml @@ -27,6 +27,7 @@ reth-rpc-convert.workspace = true reth-tasks.workspace = true reth-transaction-pool.workspace = true reth-trie.workspace = true +reth-bal-store.workspace = true # ethereum alloy-eips.workspace = true diff --git a/crates/rpc/rpc-eth-types/src/error/mod.rs b/crates/rpc/rpc-eth-types/src/error/mod.rs index 12f0498a33a..6e71614724c 100644 --- a/crates/rpc/rpc-eth-types/src/error/mod.rs +++ b/crates/rpc/rpc-eth-types/src/error/mod.rs @@ -9,6 +9,7 @@ use alloy_sol_types::{ContractError, RevertReason}; use alloy_transport::{RpcError, TransportErrorKind}; pub use api::{AsEthApiError, FromEthApiError, FromEvmError, IntoEthApiError}; use core::time::Duration; +use reth_bal_store::BalStoreError; use reth_errors::{BlockExecutionError, BlockValidationError, RethError}; use reth_primitives_traits::transaction::{error::InvalidTransactionError, signed::RecoveryError}; use reth_revm::db::bal::EvmDatabaseError; @@ -210,6 +211,9 @@ pub enum EthApiError { /// Error thrown when trying to access block access list for blocks before Amsterdam #[error("Block access list not available for pre-Amsterdam blocks")] BlockAccessListNotAvailablePreAmsterdam, + /// Error thrown when bal cache doesnt work + #[error("Error fetching from bal cache")] + BalCacheError, /// Any other error #[error("{0}")] Other(Box), @@ -351,6 +355,9 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { EthApiError::BlockAccessListNotAvailablePreAmsterdam => { rpc_error_with_code(4445, error.to_string()) } + EthApiError::BalCacheError => { + internal_rpc_err("Error fetching from bal cache".to_string()) + } } } } @@ -563,6 +570,12 @@ where } } +impl From for EthApiError { + fn from(_: BalStoreError) -> Self { + Self::BalCacheError + } +} + impl From for EthApiError { fn from(_: RecoveryError) -> Self { Self::InvalidTransactionSignature diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 27d9fb8e5dd..5aa82d5a1f9 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -43,6 +43,7 @@ reth-ethereum-primitives.workspace = true reth-ethereum-engine-primitives.workspace = true reth-node-api.workspace = true reth-trie-common.workspace = true +reth-bal-store.workspace = true # ethereum alloy-evm = { workspace = true, features = ["overrides"] } @@ -66,7 +67,12 @@ alloy-rpc-types-txpool.workspace = true alloy-rpc-types-admin.workspace = true alloy-rpc-types-engine = { workspace = true, features = ["kzg"] } alloy-serde.workspace = true -revm = { workspace = true, features = ["optional_block_gas_limit", "optional_eip3607", "optional_no_base_fee", "memory_limit"] } +revm = { workspace = true, features = [ + "optional_block_gas_limit", + "optional_eip3607", + "optional_no_base_fee", + "memory_limit", +] } revm-primitives = { workspace = true, features = ["serde"] } # rpc diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index 92aec5106e6..be77106648c 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use futures::Stream; use jsonrpsee::core::RpcResult; use parking_lot::RwLock; +use reth_bal_store::BalStore; use reth_chainspec::{ChainSpecProvider, EthChainSpec, EthereumHardforks}; use reth_engine_primitives::ConsensusEngineEvent; use reth_errors::RethError; @@ -167,9 +168,18 @@ where .map_err(Eth::Error::from_eth_err)? .ok_or(EthApiError::HeaderNotFound(block_id))?; - let bal = self.eth_api().get_block_access_list(block_hash).await?; + let cached_bal = self + .eth_api() + .provider() + .get_by_block_hash(block_hash.into()) + .map_err(Eth::Error::from_eth_err)?; - Ok(alloy_rlp::encode(bal.unwrap_or_default()).into()) + if cached_bal.is_none() { + let bal = self.eth_api().get_block_access_list(block_hash).await?; + Ok(alloy_rlp::encode(bal.unwrap_or_default()).into()) + } else { + Ok(cached_bal.unwrap()) + } } /// Replays the given block and returns the trace of each transaction. diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index d6bc1884481..bab28bb4a12 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -569,6 +569,7 @@ mod tests { use alloy_rpc_types_eth::{Bundle, TransactionRequest}; use jsonrpsee_types::error::INVALID_PARAMS_CODE; use rand::Rng; + use reth_bal_store::BalStore; use reth_chain_state::CanonStateSubscriptions; use reth_chainspec::{ChainSpec, ChainSpecProvider, EthChainSpec}; use reth_ethereum_primitives::TransactionSigned; @@ -599,6 +600,7 @@ mod tests { + StateProviderFactory + CanonStateSubscriptions + StageCheckpointReader + + BalStore + Unpin + Clone + 'static, diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index f5e96631258..a58a5ed9c46 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -33,6 +33,7 @@ reth-chain-state.workspace = true reth-node-types.workspace = true reth-static-file-types = { workspace = true, features = ["std"] } reth-fs-util.workspace = true +reth-bal-store.workspace = true # ethereum alloy-eips.workspace = true diff --git a/crates/storage/provider/src/providers/bal_provider.rs b/crates/storage/provider/src/providers/bal_provider.rs new file mode 100644 index 00000000000..73c92024bbe --- /dev/null +++ b/crates/storage/provider/src/providers/bal_provider.rs @@ -0,0 +1,428 @@ +//! Block Access List (BAL) cache for EIP-7928. +//! +//! This module provides an in-memory cache for storing Block Access Lists received via +//! the Engine API. BALs are stored for valid payloads and can be retrieved through +//! Engine API BAL query paths that read from the cache/store provider. +//! +//! According to EIP-7928, the EL MUST retain BALs for at least the duration of the +//! weak subjectivity period (~3533 epochs) to support synchronization with re-execution. +//! This initial implementation uses a simple in-memory cache with configurable capacity. + +use alloy_primitives::{BlockHash, BlockNumber, Bytes}; +use parking_lot::RwLock; +use reth_bal_store::{BalStore, BalStoreError}; +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; +use tracing::warn; + +/// Default capacity for the BAL cache. +/// +/// This is a conservative default - production deployments should configure based on +/// weak subjectivity period requirements (~3533 epochs ≈ 113,000 blocks). +const DEFAULT_BAL_CACHE_CAPACITY: u32 = 1024; + +/// In-memory cache for Block Access Lists (BALs). +/// +/// Provides O(1) lookups by block hash and O(log n) range queries by block number. +/// Evicts the oldest (lowest) block numbers when capacity is exceeded. +/// +/// This type is cheaply cloneable as it wraps an `Arc` internally. +#[derive(Debug, Clone)] +pub struct BalCache { + inner: Arc, +} + +#[derive(Debug)] +struct BalCacheInner { + /// Maximum number of entries to store. + capacity: u32, + /// Mapping from block hash to BAL bytes. + entries: RwLock>, + /// Index mapping block number to block hash for range queries. + /// Uses `BTreeMap` for efficient range iteration and eviction of oldest blocks. + block_index: RwLock>, +} + +impl BalCache { + /// Creates a new BAL cache with the default capacity. + pub fn new() -> Self { + Self::with_capacity(DEFAULT_BAL_CACHE_CAPACITY) + } + + /// Creates a new BAL cache with the specified capacity. + pub fn with_capacity(capacity: u32) -> Self { + Self { + inner: Arc::new(BalCacheInner { + capacity, + entries: RwLock::new(HashMap::new()), + block_index: RwLock::new(BTreeMap::new()), + }), + } + } + + /// Inserts a BAL into the cache. + /// + /// If a different hash already exists for this block number (reorg), the old entry + /// is removed first. If the cache is at capacity, the oldest block number is evicted. + pub fn insert(&self, block_hash: BlockHash, block_number: BlockNumber, bal: Bytes) { + let mut entries = self.inner.entries.write(); + let mut block_index = self.inner.block_index.write(); + + // If this block number already has a different hash, remove the old entry + if let Some(old_hash) = block_index.get(&block_number) && + *old_hash != block_hash + { + entries.remove(old_hash); + } + + // Evict oldest block if at capacity and this is a new entry + if !entries.contains_key(&block_hash) && + entries.len() as u32 >= self.inner.capacity && + let Some((&oldest_num, &oldest_hash)) = block_index.first_key_value() + { + entries.remove(&oldest_hash); + block_index.remove(&oldest_num); + } + + entries.insert(block_hash, bal); + block_index.insert(block_number, block_hash); + } + + /// Retrieves BALs for the given block hashes. + /// + /// Returns a vector with the same length as `block_hashes`, where each element + /// is `Some(bal)` if found or `None` if not in cache. + pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> Vec> { + let entries = self.inner.entries.read(); + block_hashes.iter().map(|hash| entries.get(hash).cloned()).collect() + } + + /// Retrieves BALs for a range of blocks starting at `start` for `count` blocks. + /// + /// Returns a vector of contiguous BALs in block number order, stopping at the first + /// missing block. This ensures the caller knows the returned BALs correspond to + /// blocks `[start, start + len)`. + pub fn get_by_range(&self, start: BlockNumber, count: u64) -> Vec { + let entries = self.inner.entries.read(); + let block_index = self.inner.block_index.read(); + + let mut result = Vec::new(); + for block_num in start..start.saturating_add(count) { + let Some(hash) = block_index.get(&block_num) else { + break; + }; + let Some(bal) = entries.get(hash) else { + break; + }; + result.push(bal.clone()); + } + result + } + + /// Retrieves the BAL for a specific block hash, if it exists. + pub fn get_by_block_hash(&self, hash: BlockHash) -> Option { + let entries = self.inner.entries.read(); + entries.get(&hash).cloned() + } + + /// Retrieves the BAL for a specific block number, if it exists. + pub fn get_by_block_number(&self, number: BlockNumber) -> Option { + let hash = { + let block_index = self.inner.block_index.read(); + block_index.get(&number).copied() + }?; + + let entries = self.inner.entries.read(); + entries.get(&hash).cloned() + } + + /// Returns the number of entries in the cache. + #[cfg(test)] + fn len(&self) -> usize { + self.inner.entries.read().len() + } +} + +impl Default for BalCache { + fn default() -> Self { + Self::new() + } +} + +impl BalStore for BalCache { + fn insert( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: Bytes, + ) -> Result<(), BalStoreError> { + Self::insert(self, block_hash, block_number, bal); + Ok(()) + } + + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, BalStoreError> { + Ok(Self::get_by_hashes(self, block_hashes)) + } + + fn get_by_range(&self, start: BlockNumber, count: u64) -> Result, BalStoreError> { + Ok(Self::get_by_range(self, start, count)) + } + + fn get_by_block_hash(&self, block_hash: BlockHash) -> Result, BalStoreError> { + Ok(Self::get_by_block_hash(&self, block_hash)) + } + + fn get_by_block_number( + &self, + block_number: BlockNumber, + ) -> Result, BalStoreError> { + Ok(Self::get_by_block_number(&self, block_number)) + } +} + +/// Provides access to Block Access Lists (BALs). +/// +/// `BalProvider` acts as a thin abstraction over: +/// - a **durable store** (`BalStore`) which is the source of truth +/// - an **in-memory cache** (`BalCache`) for fast access. +/// +/// Reads are cache-first with fallback to the store. +/// Writes are store-first to ensure durability before cache visibility. +#[derive(Clone)] +pub struct BalProvider { + /// Persistent storage backend for BALs. + store: Arc, + /// In-memory cache for recently accessed BALs. + cache: BalCache, +} + +impl std::fmt::Debug for BalProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BalProvider").field("cache", &self.cache).finish_non_exhaustive() + } +} + +impl Default for BalProvider { + fn default() -> Self { + let cache = BalCache::new(); + Self { store: Arc::new(BalCache::new()), cache } + } +} + +impl BalProvider { + /// Creates a new `BalProvider` with the given store and cache. + pub fn new(store: Arc, cache: BalCache) -> Self { + Self { store, cache } + } + + /// Returns a reference to the in-memory cache. + pub const fn cache(&self) -> &BalCache { + &self.cache + } + + /// Persist first: store is the source of truth. We only populate the in-memory cache if + /// durability succeeds, so cache visibility cannot outlive failed persistence. + /// `Bytes` is consumed by each insert call, so we clone once for store and move the original + /// into cache. + pub fn cache_bal( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: Bytes, + ) -> Result<(), BalStoreError> { + self.store.insert(block_hash, block_number, bal.clone())?; + self.cache.insert(block_hash, block_number, bal); + Ok(()) + } + + /// Cache-first lookup: keep request order and fill only cache misses from durable storage. + pub fn get_by_hashes(&self, block_hashes: &[BlockHash]) -> Vec> { + let mut results = self.cache.get_by_hashes(block_hashes); + + // Collect missing positions so store fallback can patch holes in-place. + let mut missing_hashes = Vec::new(); + let mut missing_indices = Vec::new(); + for (idx, result) in results.iter().enumerate() { + if result.is_none() { + missing_indices.push(idx); + missing_hashes.push(block_hashes[idx]); + } + } + + if missing_hashes.is_empty() { + return results; + } + + match self.store.get_by_hashes(&missing_hashes) { + Ok(store_results) => { + for (missing_idx, store_result) in missing_indices.into_iter().zip(store_results) { + if let Some(value) = store_result { + results[missing_idx] = Some(value); + } + } + } + Err(err) => { + warn!(target: "provider::bal_provider", ?err, "Failed to retrieve BALs by hash from BAL store"); + } + } + + results + } + + /// Cache range reads are contiguous and stop at the first gap. + /// Only the missing suffix is queried from store to avoid re-reading cached prefix. + pub fn get_by_range(&self, start: BlockNumber, count: u64) -> Vec { + let mut cache_results = self.cache.get_by_range(start, count); + if cache_results.len() as u64 == count { + return cache_results; + } + + let cached_len = cache_results.len() as u64; + let missing_start = start.saturating_add(cached_len); + let missing_count = count - cached_len; + + match self.store.get_by_range(missing_start, missing_count) { + Ok(mut store_results) => { + cache_results.append(&mut store_results); + cache_results + } + Err(err) => { + warn!(target: "provider::bal_provider", ?err, "Failed to retrieve BALs by range from BAL store"); + cache_results + } + } + } + + /// Cache-first lookup by block hash with store fallback on miss. + pub fn get_by_block_hash(&self, hash: BlockHash) -> Option { + if let Some(bal) = self.cache.get_by_block_hash(hash) { + return Some(bal); + } + + match self.store.get_by_block_hash(hash) { + Ok(Some(bal)) => Some(bal), + Ok(None) => None, + Err(err) => { + warn!(target: "provider::bal_provider", ?err, "Failed to retrieve BAL by hash from BAL store"); + None + } + } + } + + /// Cache-first lookup by block number with store fallback on miss. + pub fn get_by_block_number(&self, number: BlockNumber) -> Option { + if let Some(bal) = self.cache.get_by_block_number(number) { + return Some(bal); + } + + match self.store.get_by_block_number(number) { + Ok(Some(bal)) => Some(bal), + Ok(None) => None, + Err(err) => { + warn!(target: "rpc::engine", ?err, "Failed to retrieve BAL by number from BAL store"); + None + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::B256; + + #[test] + fn test_insert_and_get_by_hash() { + let cache = BalCache::with_capacity(10); + + let hash1 = B256::random(); + let hash2 = B256::random(); + let bal1 = Bytes::from_static(b"bal1"); + let bal2 = Bytes::from_static(b"bal2"); + + cache.insert(hash1, 1, bal1.clone()); + cache.insert(hash2, 2, bal2.clone()); + + let results = cache.get_by_hashes(&[hash1, hash2, B256::random()]); + assert_eq!(results.len(), 3); + assert_eq!(results[0], Some(bal1)); + assert_eq!(results[1], Some(bal2)); + assert_eq!(results[2], None); + } + + #[test] + fn test_get_by_range() { + let cache = BalCache::with_capacity(10); + + for i in 1..=5 { + let hash = B256::random(); + let bal = Bytes::from(format!("bal{i}").into_bytes()); + cache.insert(hash, i, bal); + } + + let results = cache.get_by_range(2, 3); + assert_eq!(results.len(), 3); + } + + #[test] + fn test_get_by_range_stops_at_gap() { + let cache = BalCache::with_capacity(10); + + for i in [1, 2, 4, 5] { + let hash = B256::random(); + let bal = Bytes::from(format!("bal{i}").into_bytes()); + cache.insert(hash, i, bal); + } + + let results = cache.get_by_range(1, 5); + assert_eq!(results.len(), 2); + + let results = cache.get_by_range(4, 3); + assert_eq!(results.len(), 2); + } + + #[test] + fn test_eviction_oldest_first() { + let cache = BalCache::with_capacity(3); + + for i in [10, 20, 30] { + let hash = B256::random(); + cache.insert(hash, i, Bytes::from_static(b"bal")); + } + assert_eq!(cache.len(), 3); + + let hash40 = B256::random(); + cache.insert(hash40, 40, Bytes::from_static(b"bal40")); + assert_eq!(cache.len(), 3); + + let results = cache.get_by_range(10, 1); + assert_eq!(results.len(), 0); + + let results = cache.get_by_range(20, 1); + assert_eq!(results.len(), 1); + } + + #[test] + fn test_reorg_replaces_hash() { + let cache = BalCache::with_capacity(10); + + let hash1 = B256::random(); + let hash2 = B256::random(); + let bal1 = Bytes::from_static(b"bal1"); + let bal2 = Bytes::from_static(b"bal2"); + + cache.insert(hash1, 100, bal1.clone()); + assert_eq!(cache.get_by_hashes(&[hash1])[0], Some(bal1)); + + cache.insert(hash2, 100, bal2.clone()); + + assert_eq!(cache.get_by_hashes(&[hash1])[0], None); + assert_eq!(cache.get_by_hashes(&[hash2])[0], Some(bal2)); + assert_eq!(cache.len(), 1); + } +} diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index a9cf4c38f42..ee6c9dacb32 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -1,6 +1,6 @@ use crate::{ providers::{ - ConsistentProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider, + BalProvider, ConsistentProvider, ProviderNodeTypes, RocksDBProvider, StaticFileProvider, StaticFileProviderRWRefMut, }, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt, @@ -15,6 +15,7 @@ use alloy_consensus::transaction::TransactionMeta; use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag}; use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256}; use alloy_rpc_types_engine::ForkchoiceState; +use reth_bal_store::BalStore; use reth_chain_state::{ BlockState, CanonicalInMemoryState, ForkChoiceNotifications, ForkChoiceSubscriptions, MemoryOverlayStateProvider, PersistedBlockNotifications, PersistedBlockSubscriptions, @@ -50,6 +51,8 @@ pub struct BlockchainProvider { /// Tracks the chain info wrt forkchoice updates and in memory canonical /// state. pub(crate) canonical_in_memory_state: CanonicalInMemoryState, + /// Bal Provider used to access the the Bal Cache + pub(crate) bal_provider: BalProvider, } impl Clone for BlockchainProvider { @@ -57,6 +60,7 @@ impl Clone for BlockchainProvider { Self { database: self.database.clone(), canonical_in_memory_state: self.canonical_in_memory_state.clone(), + bal_provider: self.bal_provider.clone(), } } } @@ -64,13 +68,17 @@ impl Clone for BlockchainProvider { impl BlockchainProvider { /// Create a new [`BlockchainProvider`] using only the storage, fetching the latest /// header from the database to initialize the provider. - pub fn new(storage: ProviderFactory) -> ProviderResult { + pub fn new(storage: ProviderFactory, bal_provider: BalProvider) -> ProviderResult { let provider = storage.provider()?; let best = provider.chain_info()?; match provider.header_by_number(best.best_number)? { Some(header) => { drop(provider); - Ok(Self::with_latest(storage, SealedHeader::new(header, best.best_hash))?) + Ok(Self::with_latest( + storage, + SealedHeader::new(header, best.best_hash), + bal_provider, + )?) } None => Err(ProviderError::HeaderNotFound(best.best_number.into())), } @@ -84,6 +92,7 @@ impl BlockchainProvider { pub fn with_latest( storage: ProviderFactory, latest: SealedHeader>, + bal_provider: BalProvider, ) -> ProviderResult { let provider = storage.provider()?; let finalized_header = provider @@ -108,6 +117,7 @@ impl BlockchainProvider { finalized_header, safe_header, ), + bal_provider, }) } @@ -792,11 +802,51 @@ impl StateReader for BlockchainProvider { StateReader::get_state(&self.consistent_provider()?, block) } } +impl BalStore for BlockchainProvider { + fn insert( + &self, + block_hash: BlockHash, + block_number: BlockNumber, + bal: alloy_primitives::Bytes, + ) -> Result<(), reth_bal_store::BalStoreError> { + self.bal_provider.cache().insert(block_hash, block_number, bal); + Ok(()) + } + + fn get_by_hashes( + &self, + block_hashes: &[BlockHash], + ) -> Result>, reth_bal_store::BalStoreError> { + Ok(self.bal_provider.get_by_hashes(block_hashes)) + } + + fn get_by_range( + &self, + start: BlockNumber, + count: u64, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(self.bal_provider.get_by_range(start, count)) + } + + fn get_by_block_hash( + &self, + block_hash: BlockHash, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(self.bal_provider.get_by_block_hash(block_hash)) + } + + fn get_by_block_number( + &self, + block_number: BlockNumber, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(self.bal_provider.get_by_block_number(block_number)) + } +} #[cfg(test)] mod tests { use crate::{ - providers::BlockchainProvider, + providers::{BalProvider, BlockchainProvider}, test_utils::{ create_test_provider_factory, create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB, @@ -927,7 +977,7 @@ mod tests { provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; // Insert the rest of the blocks and receipts into the in-memory state let chain = NewCanonicalChain::Commit { @@ -1052,7 +1102,7 @@ mod tests { provider_rw.commit()?; // Create a new provider - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; // Useful blocks let first_db_block = database_blocks.first().unwrap(); @@ -1150,7 +1200,7 @@ mod tests { provider_rw.commit()?; // Create a new provider - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; // First in memory block let first_in_mem_block = in_memory_blocks.first().unwrap(); @@ -1364,7 +1414,7 @@ mod tests { provider_rw.insert_block(&block_1)?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; // Subscribe twice for canonical state updates. let in_memory_state = provider.canonical_in_memory_state(); @@ -1716,7 +1766,7 @@ mod tests { )?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap(); let chain = NewCanonicalChain::Commit { diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index ba03ae37b25..0eafc4cf23e 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -1702,8 +1702,9 @@ impl StateReader for ConsistentProvider { #[cfg(test)] mod tests { use crate::{ - providers::blockchain_provider::BlockchainProvider, - test_utils::create_test_provider_factory, BlockWriter, + providers::{blockchain_provider::BlockchainProvider, BalProvider}, + test_utils::create_test_provider_factory, + BlockWriter, }; use alloy_eips::BlockHashOrNumber; use alloy_primitives::B256; @@ -1783,7 +1784,7 @@ mod tests { provider_rw.commit()?; // Create a new provider - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let consistent_provider = provider.consistent_provider()?; // Useful blocks @@ -1894,7 +1895,7 @@ mod tests { provider_rw.commit()?; // Create a new provider - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let consistent_provider = provider.consistent_provider()?; // First in memory block @@ -2008,7 +2009,7 @@ mod tests { )?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let in_memory_changesets = in_memory_changesets.into_iter().next().unwrap(); let chain = NewCanonicalChain::Commit { @@ -2132,7 +2133,7 @@ mod tests { provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let consistent_provider = provider.consistent_provider()?; let outcome = @@ -2218,7 +2219,7 @@ mod tests { provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let consistent_provider = provider.consistent_provider()?; let outcome = @@ -2299,7 +2300,7 @@ mod tests { provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let consistent_provider = provider.consistent_provider()?; let outcome = @@ -2364,7 +2365,7 @@ mod tests { )?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let in_mem_block = in_memory_blocks.first().unwrap(); let senders = in_mem_block.senders().expect("failed to recover senders"); @@ -2462,7 +2463,7 @@ mod tests { )?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let in_mem_block = in_memory_blocks.first().unwrap(); let senders = in_mem_block.senders().expect("failed to recover senders"); @@ -2564,7 +2565,7 @@ mod tests { )?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let in_mem_block = in_memory_blocks.first().unwrap(); let senders = in_mem_block.senders().expect("failed to recover senders"); @@ -2660,7 +2661,7 @@ mod tests { )?; provider_rw.commit()?; - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let in_mem_block = in_memory_blocks.first().unwrap(); let senders = in_mem_block.senders().expect("failed to recover senders"); diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 9b8af6de663..03482665587 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -26,6 +26,9 @@ pub use state::{ mod consistent_view; pub use consistent_view::{ConsistentDbView, ConsistentViewError}; +mod bal_provider; +pub use bal_provider::*; + mod blockchain_provider; pub use blockchain_provider::BlockchainProvider; diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index c8c6e80e217..0fb5b9a6393 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -17,6 +17,7 @@ use alloy_primitives::{ Address, BlockHash, BlockNumber, Bytes, StorageKey, StorageValue, TxHash, TxNumber, B256, U256, }; use parking_lot::Mutex; +use reth_bal_store::BalStore; use reth_chain_state::{CanonStateNotifications, CanonStateSubscriptions}; use reth_chainspec::{ChainInfo, EthChainSpec}; use reth_db::transaction::DbTx; @@ -1071,6 +1072,48 @@ impl NodePrimitivesProvider type Primitives = T; } +impl BalStore + for MockEthProvider +{ + fn insert( + &self, + _block_hash: BlockHash, + _block_number: BlockNumber, + _bal: Bytes, + ) -> Result<(), reth_bal_store::BalStoreError> { + Ok(()) + } + + fn get_by_hashes( + &self, + _block_hashes: &[BlockHash], + ) -> Result>, reth_bal_store::BalStoreError> { + Ok(Vec::new()) + } + + fn get_by_range( + &self, + _start: BlockNumber, + _count: u64, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(Vec::new()) + } + + fn get_by_block_hash( + &self, + _block_hash: BlockHash, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(None) + } + + fn get_by_block_number( + &self, + _block_number: BlockNumber, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(None) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/storage/provider/src/traits/full.rs b/crates/storage/provider/src/traits/full.rs index 928ab55a568..e49c492bd91 100644 --- a/crates/storage/provider/src/traits/full.rs +++ b/crates/storage/provider/src/traits/full.rs @@ -6,6 +6,7 @@ use crate::{ RocksDBProviderFactory, StageCheckpointReader, StateProviderFactory, StateReader, StaticFileProviderFactory, }; +use reth_bal_store::BalStore; use reth_chain_state::{ CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions, }; @@ -42,6 +43,7 @@ pub trait FullProvider: + ForkChoiceSubscriptions
> + PersistedBlockSubscriptions + StageCheckpointReader + + BalStore + Clone + Debug + Unpin @@ -77,6 +79,7 @@ impl FullProvider for T where + ForkChoiceSubscriptions
> + PersistedBlockSubscriptions + StageCheckpointReader + + BalStore + Clone + Debug + Unpin diff --git a/crates/storage/storage-api/Cargo.toml b/crates/storage/storage-api/Cargo.toml index d83b1346c71..c1e0aee8f45 100644 --- a/crates/storage/storage-api/Cargo.toml +++ b/crates/storage/storage-api/Cargo.toml @@ -24,6 +24,7 @@ reth-storage-errors.workspace = true reth-trie-common.workspace = true revm-database.workspace = true reth-ethereum-primitives.workspace = true +reth-bal-store.workspace = true # ethereum alloy-eips.workspace = true @@ -56,10 +57,7 @@ std = [ "serde_json?/std", ] -db-api = [ - "dep:reth-db-api", - "dep:serde_json", -] +db-api = ["dep:reth-db-api", "dep:serde_json"] serde = [ "reth-ethereum-primitives/serde", diff --git a/crates/storage/storage-api/src/noop.rs b/crates/storage/storage-api/src/noop.rs index ee51b2458bc..7853a5a1e41 100644 --- a/crates/storage/storage-api/src/noop.rs +++ b/crates/storage/storage-api/src/noop.rs @@ -22,6 +22,7 @@ use core::{ marker::PhantomData, ops::{RangeBounds, RangeInclusive}, }; +use reth_bal_store::BalStore; use reth_chainspec::{ChainInfo, ChainSpecProvider, EthChainSpec, MAINNET}; #[cfg(feature = "db-api")] use reth_db_api::mock::{DatabaseMock, TxMock}; @@ -704,3 +705,43 @@ impl StorageSettingsCache for NoopProvid fn set_storage_settings_cache(&self, _settings: reth_db_api::models::StorageSettings) {} } + +impl BalStore for NoopProvider { + fn insert( + &self, + _block_hash: BlockHash, + _block_number: BlockNumber, + _bal: Bytes, + ) -> Result<(), reth_bal_store::BalStoreError> { + Ok(()) + } + + fn get_by_hashes( + &self, + _block_hashes: &[BlockHash], + ) -> Result>, reth_bal_store::BalStoreError> { + Ok(Vec::new()) + } + + fn get_by_range( + &self, + _start: BlockNumber, + _count: u64, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(Vec::new()) + } + + fn get_by_block_number( + &self, + _block_number: BlockNumber, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(None) + } + + fn get_by_block_hash( + &self, + _block_hash: BlockHash, + ) -> Result, reth_bal_store::BalStoreError> { + Ok(None) + } +} diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 1ff8e0f3e18..63efa604a37 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -24,7 +24,7 @@ use reth_ethereum::{ pool::noop::NoopTransactionPool, provider::{ db::{mdbx::DatabaseArguments, open_db_read_only, ClientVersion, DatabaseEnv}, - providers::{BlockchainProvider, RocksDBProvider, StaticFileProvider}, + providers::{BalProvider, BlockchainProvider, RocksDBProvider, StaticFileProvider}, ProviderFactory, }, rpc::{ @@ -61,7 +61,7 @@ async fn main() -> eyre::Result<()> { // 2. Set up the blockchain provider using only the database provider and a noop for the tree to // satisfy trait bounds. Tree is not used in this example since we are only operating on the // disk and don't handle new blocks/live sync etc, which is done by the blockchain tree. - let provider = BlockchainProvider::new(factory)?; + let provider = BlockchainProvider::new(factory, BalProvider::default())?; let rpc_builder = RpcModuleBuilder::default() .with_provider(provider.clone())