diff --git a/CHANGELOG.md b/CHANGELOG.md index 594fbb7c3de..f6fbfb9e826 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ - Short-circuit the `KECCAK256` opcode on zero-length input by returning the precomputed `keccak256("")` constant, skipping the permutation [#6775](https://github.com/lambdaclass/ethrex/pull/6775) +### 2026-05-28 + +- Batch account-state prefetch via rocksdb `multi_get_cf` on the flat key-value table [#6712](https://github.com/lambdaclass/ethrex/pull/6712) + ### 2026-05-27 - Prefetch all BAL storage synchronously before execution [#6732](https://github.com/lambdaclass/ethrex/pull/6732) diff --git a/crates/blockchain/vm.rs b/crates/blockchain/vm.rs index c720eb58958..d0a9b2079af 100644 --- a/crates/blockchain/vm.rs +++ b/crates/blockchain/vm.rs @@ -122,6 +122,72 @@ impl VmDatabase for StoreVmDatabase { .map(|entry| entry.state)) } + #[instrument( + level = "trace", + name = "Account read batch", + skip_all, + fields(namespace = "block_execution", n = addresses.len()) + )] + fn get_account_states_batch( + &self, + addresses: &[Address], + ) -> Result>, EvmError> { + // Split into cached / uncached so the rocksdb multi_get only fires for + // addresses we haven't memoized yet on this StoreVmDatabase. + let mut results: Vec> = vec![None; addresses.len()]; + let mut miss_idx: Vec = Vec::new(); + let mut miss_addrs: Vec
= Vec::new(); + { + let cache = self + .account_state_cache + .read() + .map_err(|_| EvmError::Custom("LockError".to_string()))?; + for (i, addr) in addresses.iter().enumerate() { + match cache.get(addr) { + Some(Some(entry)) => results[i] = Some(entry.state), + Some(None) => results[i] = None, + None => { + miss_idx.push(i); + miss_addrs.push(*addr); + } + } + } + } + + if miss_addrs.is_empty() { + return Ok(results); + } + + let fetched = self + .store + .get_account_states_batch_by_root(self.state_root, &miss_addrs) + .map_err(|e| EvmError::DB(e.to_string()))?; + + // Populate the per-DB cache and assemble results. `insert` (vs `or_insert`) + // is intentional: `state_root` is fixed for this `StoreVmDatabase`, so a + // concurrent populator can only have written the same value for the same + // address — overwriting is a no-op, and the unconditional insert avoids + // the extra `entry`-API lookup. + let mut cache = self + .account_state_cache + .write() + .map_err(|_| EvmError::Custom("LockError".to_string()))?; + for ((slot, addr), state) in miss_idx + .iter() + .zip(miss_addrs.iter()) + .zip(fetched.into_iter()) + { + let cached = state.map(|state| AccountStateCacheEntry { + state, + hashed_address: H256::from(keccak_hash(addr.to_fixed_bytes())), + }); + cache.insert(*addr, cached); + results[*slot] = cached.map(|e| e.state); + } + + Ok(results) + } + #[instrument( level = "trace", name = "Storage read", diff --git a/crates/storage/api/mod.rs b/crates/storage/api/mod.rs index 69b19e14e25..675ff897d5d 100644 --- a/crates/storage/api/mod.rs +++ b/crates/storage/api/mod.rs @@ -59,6 +59,21 @@ pub trait StorageReadView: Send + Sync { /// Retrieves a value by key from the specified table. fn get(&self, table: &'static str, key: &[u8]) -> Result>, StoreError>; + /// Retrieves multiple values by key from the specified table. + /// Returns results in the same order as the input keys. + /// Backends that support batched reads (e.g. RocksDB `multi_get_cf`) + /// should override this for better throughput. Callers should not + /// assume `multi_get` is asymptotically faster than `get`; on backends + /// without a batched read primitive (e.g. the in-memory backend) the + /// default impl below is equivalent to N independent `get` calls. + fn multi_get( + &self, + table: &'static str, + keys: &[&[u8]], + ) -> Vec>, StoreError>> { + keys.iter().map(|k| self.get(table, k)).collect() + } + /// Returns an iterator over all key-value pairs with the given prefix. fn prefix_iterator( &self, diff --git a/crates/storage/backend/rocksdb.rs b/crates/storage/backend/rocksdb.rs index 7f94f74fac7..7746e2350ce 100644 --- a/crates/storage/backend/rocksdb.rs +++ b/crates/storage/backend/rocksdb.rs @@ -371,6 +371,28 @@ impl StorageReadView for RocksDBReadTx { .map_err(|e| StoreError::Custom(format!("Failed to get from {}: {}", table, e))) } + fn multi_get( + &self, + table: &'static str, + keys: &[&[u8]], + ) -> Vec>, StoreError>> { + let Some(cf) = self.db.cf_handle(table) else { + let err_msg = format!("Table {} not found", table); + return (0..keys.len()) + .map(|_| Err(StoreError::Custom(err_msg.clone()))) + .collect(); + }; + // `sorted_input=false`: rocksdb sorts internally. Caller may pass arbitrary order. + self.db + .batched_multi_get_cf(&cf, keys.iter().copied(), false) + .into_iter() + .map(|res| { + res.map(|opt| opt.map(|slice| slice.to_vec())) + .map_err(|e| StoreError::Custom(format!("multi_get {}: {}", table, e))) + }) + .collect() + } + fn prefix_iterator( &self, table: &'static str, diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 49c835f4943..1d7cc5072d8 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -41,6 +41,7 @@ use ethrex_rlp::{ use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitness}; use ethrex_trie::{Node, NodeRLP}; use lru::LruCache; +use rayon::prelude::*; use rustc_hash::FxBuildHasher; use serde::{Deserialize, Serialize}; use std::{ @@ -2672,6 +2673,104 @@ impl Store { Ok(Some(AccountState::decode(&encoded_state)?)) } + /// Batch lookup of account states by address against a given state root. + /// + /// Fast path: for addresses whose hashed path falls within the FKV cursor + /// (and which are not present in the in-memory diff-layer cache), values + /// are fetched in a single `multi_get` on `ACCOUNT_FLATKEYVALUE`. Other + /// addresses fall back to per-address trie walks. + /// + /// Results are returned in the same order as the input addresses. + pub fn get_account_states_batch_by_root( + &self, + state_root: H256, + addresses: &[Address], + ) -> Result>, StoreError> { + if addresses.is_empty() { + return Ok(Vec::new()); + } + + let last_written = self.last_written()?; + let trie_cache = self + .trie_cache + .read() + .map_err(|_| StoreError::LockError)? + .clone(); + + let mut results: Vec> = vec![None; addresses.len()]; + // Per-address leaf paths (nibbles + leaf flag). Length 65. + let leaf_paths: Vec> = addresses + .iter() + .map(|addr| { + let hashed = hash_address_fixed(addr); + Nibbles::from_bytes(hashed.as_bytes()).into_vec() + }) + .collect(); + + let mut fkv_indices: Vec = Vec::new(); + let mut trie_indices: Vec = Vec::new(); + + // Match `BackendTrieDB::flatkeyvalue_computed` semantics: a path is + // covered by FKV iff `last_written >= path` as raw nibble bytes. This + // is the same check `Trie::get` uses; the related helper + // `Store::flatkeyvalue_computed_with_last_written` slices `[0..64]` + // and is intentionally more conservative — using that here would + // unnecessarily fall back to the trie when the cursor sits inside an + // account's storage sweep (the account leaf is already in FKV at that + // point; see `flatkeyvalue_generator`). + let fkv_cursor: &[u8] = last_written.as_slice(); + for (i, path) in leaf_paths.iter().enumerate() { + if let Some(value) = trie_cache.get(state_root, path.as_slice()) { + if !value.is_empty() { + results[i] = Some(AccountState::decode(&value)?); + } + continue; + } + if fkv_cursor >= path.as_slice() { + fkv_indices.push(i); + } else { + trie_indices.push(i); + } + } + + if !fkv_indices.is_empty() { + let read_view = self.backend.begin_read()?; + let keys: Vec<&[u8]> = fkv_indices + .iter() + .map(|&i| leaf_paths[i].as_slice()) + .collect(); + let raw = read_view.multi_get(ACCOUNT_FLATKEYVALUE, &keys); + for (slot, res) in fkv_indices.iter().zip(raw.into_iter()) { + let Some(encoded) = res? else { continue }; + if encoded.is_empty() { + continue; + } + results[*slot] = Some(AccountState::decode(&encoded)?); + } + } + + if !trie_indices.is_empty() { + // Fall back to the regular trie path for any addresses whose path + // hasn't been swept by the FKV generator yet. Parallelized to + // recover the per-address fan-out the pre-batch `par_iter` path + // had, which matters during initial sync when most addresses + // miss FKV. + let state_trie = self.open_state_trie(state_root)?; + let fetched: Result)>, StoreError> = trie_indices + .par_iter() + .map(|&i| { + self.get_account_state_from_trie(&state_trie, addresses[i]) + .map(|s| (i, s)) + }) + .collect(); + for (i, s) in fetched? { + results[i] = s; + } + } + + Ok(results) + } + /// Constructs a merkle proof for the given account address against a given state. /// If storage_keys are provided, also constructs the storage proofs for those keys. /// diff --git a/crates/vm/backends/levm/db.rs b/crates/vm/backends/levm/db.rs index 59e847b670d..127cfb39099 100644 --- a/crates/vm/backends/levm/db.rs +++ b/crates/vm/backends/levm/db.rs @@ -93,6 +93,18 @@ impl LevmDatabase for DynVmDatabase { Ok(acc_state) } + fn get_account_states_batch( + &self, + addresses: &[CoreAddress], + ) -> Result, DatabaseError> { + let states = ::get_account_states_batch(self.as_ref(), addresses) + .map_err(|e| DatabaseError::Custom(e.to_string()))?; + Ok(states + .into_iter() + .map(|opt| opt.unwrap_or_default()) + .collect()) + } + fn get_storage_value( &self, address: CoreAddress, diff --git a/crates/vm/db.rs b/crates/vm/db.rs index 7161df19f47..a60a2ad45e6 100644 --- a/crates/vm/db.rs +++ b/crates/vm/db.rs @@ -12,6 +12,19 @@ pub trait VmDatabase: Send + Sync + DynClone { fn get_chain_config(&self) -> Result; fn get_account_code(&self, code_hash: H256) -> Result; fn get_code_metadata(&self, code_hash: H256) -> Result; + + /// Batch account-state lookup. Default impl loops `get_account_state`. + /// Backends that can amortize per-key cost (e.g. rocksdb `multi_get_cf` on + /// the flat key-value table) should override this. + fn get_account_states_batch( + &self, + addresses: &[Address], + ) -> Result>, EvmError> { + addresses + .iter() + .map(|a| self.get_account_state(*a)) + .collect() + } } dyn_clone::clone_trait_object!(VmDatabase); diff --git a/crates/vm/levm/src/db/mod.rs b/crates/vm/levm/src/db/mod.rs index 4452151758c..7b443bac193 100644 --- a/crates/vm/levm/src/db/mod.rs +++ b/crates/vm/levm/src/db/mod.rs @@ -26,6 +26,18 @@ pub trait Database: Send + Sync { fn precompile_cache(&self) -> Option<&PrecompileCache> { None } + /// Batch lookup. Default: loop. Backends with a batched read path (e.g. rocksdb + /// `multi_get_cf` on the flat key-value table) should override this and the + /// caching layer above will dispatch to it. + fn get_account_states_batch( + &self, + addresses: &[Address], + ) -> Result, DatabaseError> { + addresses + .iter() + .map(|a| self.get_account_state(*a)) + .collect() + } /// Prefetch a batch of accounts into the cache. Default: sequential fallback. fn prefetch_accounts(&self, addresses: &[Address]) -> Result<(), DatabaseError> { for &addr in addresses { @@ -180,15 +192,25 @@ impl Database for CachingDatabase { self.precompile_cache.as_ref() } - #[cfg(all(feature = "rayon", not(feature = "eip-8025")))] fn prefetch_accounts(&self, addresses: &[Address]) -> Result<(), DatabaseError> { - // Fetch from inner in parallel (no lock contention), then single write-lock to populate cache. - let fetched: Vec<(Address, AccountState)> = addresses - .par_iter() - .map(|&addr| self.inner.get_account_state(addr).map(|s| (addr, s))) - .collect::>()?; + // Filter out already-cached addresses before issuing the batch read. + let missing: Vec
= { + let cache = self.read_accounts()?; + addresses + .iter() + .copied() + .filter(|a| !cache.contains_key(a)) + .collect() + }; + if missing.is_empty() { + return Ok(()); + } + // Dispatch to inner's batch path. For the rocksdb-backed StoreVmDatabase + // this collapses into a single multi_get on ACCOUNT_FLATKEYVALUE for the + // FKV-covered subset; default impl loops for other backends. + let states = self.inner.get_account_states_batch(&missing)?; let mut cache = self.write_accounts()?; - for (addr, state) in fetched { + for (addr, state) in missing.into_iter().zip(states.into_iter()) { cache.entry(addr).or_insert(state); } Ok(())