Skip to content
62 changes: 62 additions & 0 deletions crates/blockchain/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,68 @@ impl VmDatabase for StoreVmDatabase {
.map(|entry| entry.state))
}

#[instrument(
level = "trace",
name = "Account read batch",
skip_all,
fields(namespace = "block_execution", n = addresses.len())
)]
fn get_account_states_batch(
&self,
addresses: &[Address],
) -> Result<Vec<Option<AccountState>>, EvmError> {
// Split into cached / uncached so the rocksdb multi_get only fires for
// addresses we haven't memoized yet on this StoreVmDatabase.
let mut results: Vec<Option<AccountState>> = vec![None; addresses.len()];
let mut miss_idx: Vec<usize> = Vec::new();
let mut miss_addrs: Vec<Address> = Vec::new();
{
let cache = self
.account_state_cache
.read()
.map_err(|_| EvmError::Custom("LockError".to_string()))?;
for (i, addr) in addresses.iter().enumerate() {
match cache.get(addr) {
Some(Some(entry)) => results[i] = Some(entry.state),
Some(None) => results[i] = None,
None => {
miss_idx.push(i);
miss_addrs.push(*addr);
}
}
}
}

if miss_addrs.is_empty() {
return Ok(results);
}

let fetched = self
.store
.get_account_states_batch_by_root(self.state_root, &miss_addrs)
.map_err(|e| EvmError::DB(e.to_string()))?;

// Populate the per-DB cache and assemble results.
let mut cache = self
.account_state_cache
.write()
.map_err(|_| EvmError::Custom("LockError".to_string()))?;
for ((slot, addr), state) in miss_idx
.iter()
.zip(miss_addrs.iter())
.zip(fetched.into_iter())
{
let cached = state.map(|state| AccountStateCacheEntry {
state,
hashed_address: H256::from(keccak_hash(addr.to_fixed_bytes())),
});
cache.insert(*addr, cached);
results[*slot] = cached.map(|e| e.state);
}

Ok(results)
}

#[instrument(
level = "trace",
name = "Storage read",
Expand Down
12 changes: 12 additions & 0 deletions crates/storage/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ pub trait StorageReadView: Send + Sync {
/// Retrieves a value by key from the specified table.
fn get(&self, table: &'static str, key: &[u8]) -> Result<Option<Vec<u8>>, StoreError>;

/// Retrieves multiple values by key from the specified table.
/// Returns results in the same order as the input keys.
/// Backends that support batched reads (e.g. RocksDB `multi_get_cf`)
/// should override this for better throughput.
fn multi_get(
&self,
table: &'static str,
keys: &[&[u8]],
) -> Vec<Result<Option<Vec<u8>>, StoreError>> {
Comment thread
edg-l marked this conversation as resolved.
keys.iter().map(|k| self.get(table, k)).collect()
}

/// Returns an iterator over all key-value pairs with the given prefix.
fn prefix_iterator(
&self,
Expand Down
22 changes: 22 additions & 0 deletions crates/storage/backend/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,28 @@ impl StorageReadView for RocksDBReadTx {
.map_err(|e| StoreError::Custom(format!("Failed to get from {}: {}", table, e)))
}

fn multi_get(
&self,
table: &'static str,
keys: &[&[u8]],
) -> Vec<Result<Option<Vec<u8>>, StoreError>> {
let Some(cf) = self.db.cf_handle(table) else {
let err = StoreError::Custom(format!("Table {} not found", table));
return (0..keys.len())
.map(|_| Err(StoreError::Custom(err.to_string())))
.collect();
};
// `sorted_input=false`: rocksdb sorts internally. Caller may pass arbitrary order.
self.db
.batched_multi_get_cf(&cf, keys.iter().copied(), false)
.into_iter()
.map(|res| {
res.map(|opt| opt.map(|slice| slice.to_vec()))
.map_err(|e| StoreError::Custom(format!("multi_get {}: {}", table, e)))
})
.collect()
}

fn prefix_iterator(
&self,
table: &'static str,
Expand Down
87 changes: 87 additions & 0 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2460,6 +2460,93 @@ impl Store {
Ok(Some(AccountState::decode(&encoded_state)?))
}

/// Batch lookup of account states by address against a given state root.
///
/// Fast path: for addresses whose hashed path falls within the FKV cursor
/// (and which are not present in the in-memory diff-layer cache), values
/// are fetched in a single `multi_get` on `ACCOUNT_FLATKEYVALUE`. Other
/// addresses fall back to per-address trie walks.
///
/// Results are returned in the same order as the input addresses.
pub fn get_account_states_batch_by_root(
&self,
state_root: H256,
addresses: &[Address],
) -> Result<Vec<Option<AccountState>>, StoreError> {
if addresses.is_empty() {
return Ok(Vec::new());
}

let last_written = self.last_written()?;
let trie_cache = self
.trie_cache
.read()
.map_err(|_| StoreError::LockError)?
.clone();

let mut results: Vec<Option<AccountState>> = vec![None; addresses.len()];
// Per-address leaf paths (nibbles + leaf flag). Length 65.
let leaf_paths: Vec<Vec<u8>> = addresses
.iter()
.map(|addr| {
let hashed = hash_address_fixed(addr);
Nibbles::from_bytes(hashed.as_bytes()).into_vec()
})
.collect();

let mut fkv_indices: Vec<usize> = Vec::new();
let mut trie_indices: Vec<usize> = Vec::new();
let mut diff_hits: usize = 0;

for (i, path) in leaf_paths.iter().enumerate() {
if let Some(value) = trie_cache.get(state_root, path.as_slice()) {
if !value.is_empty() {
results[i] = Some(AccountState::decode(&value)?);
}
diff_hits += 1;
continue;
}
// Reuse the trie's FKV-cursor check semantics via the leaf path.
let path_nibbles = Nibbles::from_hex(path.clone());
Comment thread
edg-l marked this conversation as resolved.
Outdated
// `last_computed_flatkeyvalue >= path` ⇒ FKV row is authoritative
// (either present with value, or absent meaning the account does not exist).
let fkv_cursor = Nibbles::from_hex(last_written.clone());
if fkv_cursor >= path_nibbles {
Comment thread
edg-l marked this conversation as resolved.
Outdated
fkv_indices.push(i);
} else {
trie_indices.push(i);
}
}

if !fkv_indices.is_empty() {
let read_view = self.backend.begin_read()?;
let keys: Vec<&[u8]> = fkv_indices
.iter()
.map(|&i| leaf_paths[i].as_slice())
.collect();
let raw = read_view.multi_get(ACCOUNT_FLATKEYVALUE, &keys);
for (slot, res) in fkv_indices.iter().zip(raw.into_iter()) {
let Some(encoded) = res? else { continue };
if encoded.is_empty() {
continue;
}
results[*slot] = Some(AccountState::decode(&encoded)?);
}
}

if !trie_indices.is_empty() {
// Fall back to the regular trie path for any addresses whose path
// hasn't been swept by the FKV generator yet.
let state_trie = self.open_state_trie(state_root)?;
for &i in &trie_indices {
results[i] = self.get_account_state_from_trie(&state_trie, addresses[i])?;
Comment thread
edg-l marked this conversation as resolved.
Outdated
}
}

let _ = diff_hits; // surface via tracing if needed.
Comment thread
edg-l marked this conversation as resolved.
Outdated
Ok(results)
}

/// Constructs a merkle proof for the given account address against a given state.
/// If storage_keys are provided, also constructs the storage proofs for those keys.
///
Expand Down
12 changes: 12 additions & 0 deletions crates/vm/backends/levm/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,18 @@ impl LevmDatabase for DynVmDatabase {
Ok(acc_state)
}

fn get_account_states_batch(
&self,
addresses: &[CoreAddress],
) -> Result<Vec<AccountState>, DatabaseError> {
let states = <dyn VmDatabase>::get_account_states_batch(self.as_ref(), addresses)
.map_err(|e| DatabaseError::Custom(e.to_string()))?;
Ok(states
.into_iter()
.map(|opt| opt.unwrap_or_default())
.collect())
}

fn get_storage_value(
&self,
address: CoreAddress,
Expand Down
13 changes: 13 additions & 0 deletions crates/vm/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ pub trait VmDatabase: Send + Sync + DynClone {
fn get_chain_config(&self) -> Result<ChainConfig, EvmError>;
fn get_account_code(&self, code_hash: H256) -> Result<Code, EvmError>;
fn get_code_metadata(&self, code_hash: H256) -> Result<CodeMetadata, EvmError>;

/// Batch account-state lookup. Default impl loops `get_account_state`.
/// Backends that can amortize per-key cost (e.g. rocksdb `multi_get_cf` on
/// the flat key-value table) should override this.
fn get_account_states_batch(
&self,
addresses: &[Address],
) -> Result<Vec<Option<AccountState>>, EvmError> {
addresses
.iter()
.map(|a| self.get_account_state(*a))
.collect()
}
Comment on lines +19 to +27

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like this trait growing even more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both levm::Database and VmDatabase need it because the batched read is in StoreVmDatabase but the cache it fills (CachingDatabase) is a layer up; DynVmDatabase bridges the two. Both have a default-loop impl so no other backend changes. The only way to drop the trait method is downcasting via as_any, which adds a trait method anyway plus a runtime check and fallback. Can do that if you'd rather, but this seemed lighter.

}

dyn_clone::clone_trait_object!(VmDatabase);
Expand Down
35 changes: 29 additions & 6 deletions crates/vm/levm/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ pub trait Database: Send + Sync {
fn precompile_cache(&self) -> Option<&PrecompileCache> {
None
}
/// Batch lookup. Default: loop. Backends with a batched read path (e.g. rocksdb
/// `multi_get_cf` on the flat key-value table) should override this and the
/// caching layer above will dispatch to it.
fn get_account_states_batch(
&self,
addresses: &[Address],
) -> Result<Vec<AccountState>, DatabaseError> {
addresses
.iter()
.map(|a| self.get_account_state(*a))
.collect()
}
/// Prefetch a batch of accounts into the cache. Default: sequential fallback.
fn prefetch_accounts(&self, addresses: &[Address]) -> Result<(), DatabaseError> {
for &addr in addresses {
Expand Down Expand Up @@ -182,13 +194,24 @@ impl Database for CachingDatabase {

#[cfg(all(feature = "rayon", not(feature = "eip-8025")))]
Comment thread
edg-l marked this conversation as resolved.
Outdated
fn prefetch_accounts(&self, addresses: &[Address]) -> Result<(), DatabaseError> {
// Fetch from inner in parallel (no lock contention), then single write-lock to populate cache.
let fetched: Vec<(Address, AccountState)> = addresses
.par_iter()
.map(|&addr| self.inner.get_account_state(addr).map(|s| (addr, s)))
.collect::<Result<_, _>>()?;
// Filter out already-cached addresses before issuing the batch read.
let missing: Vec<Address> = {
let cache = self.read_accounts()?;
addresses
.iter()
.copied()
.filter(|a| !cache.contains_key(a))
.collect()
};
if missing.is_empty() {
return Ok(());
}
// Dispatch to inner's batch path. For the rocksdb-backed StoreVmDatabase
// this collapses into a single multi_get on ACCOUNT_FLATKEYVALUE for the
// FKV-covered subset; default impl loops for other backends.
let states = self.inner.get_account_states_batch(&missing)?;
let mut cache = self.write_accounts()?;
for (addr, state) in fetched {
for (addr, state) in missing.into_iter().zip(states.into_iter()) {
cache.entry(addr).or_insert(state);
}
Ok(())
Expand Down
Loading