Skip to content
Open
12 changes: 11 additions & 1 deletion crates/common/trie/trie_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ fn flush_nodes_to_write(
db: &dyn TrieDB,
sender: Sender<Vec<(Nibbles, Node)>>,
) -> Result<(), TrieGenerationError> {
let start = std::time::Instant::now();
let node_count = nodes_to_write.len();
db.put_batch_no_alloc(&nodes_to_write)
.map_err(TrieGenerationError::FlushToDbError)?;
tracing::debug!(
node_count,
elapsed_ms = start.elapsed().as_millis() as u64,
"flush_nodes_to_write"
);
nodes_to_write.clear();
let _ = sender.send(nodes_to_write);
Ok(())
Expand Down Expand Up @@ -351,7 +358,10 @@ where
let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize));
}
scope(|s| {
let pool = ThreadPool::new(12, s);
let thread_count = std::thread::available_parallelism()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be at least more than 2. If there is only a single thread it will wait on the write buffers getting freed.

.map(|n| n.get())
.unwrap_or(8);
let pool = ThreadPool::new(thread_count, s);
trie_from_sorted_accounts(
db,
accounts_iter,
Expand Down
19 changes: 15 additions & 4 deletions crates/networking/p2p/sync/healing/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,28 +283,39 @@ async fn heal_state_trie(
// PERF: reuse buffers?
let to_write = std::mem::take(&mut nodes_to_write);
let store = store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
// NOTE: we keep only a single task in the background to avoid out of order deletes.
// Parent-path empty markers from batch N could overwrite real node data from batch N+1
// if commits happen out of order.
if !db_joinset.is_empty() {
db_joinset
.join_next()
.await
.expect("we just checked joinset is not empty")?;
.expect("joinset is not empty")?;
}
db_joinset.spawn_blocking(move || {
let encode_start = std::time::Instant::now();
let node_count = to_write.len();
let mut encoded_to_write = BTreeMap::new();
for (path, node) in to_write {
for i in 0..path.len() {
encoded_to_write.insert(path.slice(0, i), vec![]);
encoded_to_write.entry(path.slice(0, i)).or_insert(vec![]);
}
encoded_to_write.insert(path, node.encode_to_vec());
}
let encode_ms = encode_start.elapsed().as_millis() as u64;
let db_start = std::time::Instant::now();
let trie_db = store
.open_direct_state_trie(*EMPTY_TRIE_HASH)
.expect("Store should open");
let db = trie_db.db();
// PERF: use put_batch_no_alloc (note that it needs to remove nodes too)
db.put_batch(encoded_to_write.into_iter().collect())
.expect("The put batch on the store failed");
debug!(
node_count,
encode_ms,
db_write_ms = db_start.elapsed().as_millis() as u64,
"state healing batch write"
);
});
}

Expand Down
31 changes: 22 additions & 9 deletions crates/networking/p2p/sync/healing/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,38 @@ pub async fn heal_storage_trie(
if nodes_to_write.values().map(Vec::len).sum::<usize>() > 100_000 || is_done || is_stale {
let to_write: Vec<_> = nodes_to_write.drain().collect();
let store = state.store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
if !db_joinset.is_empty() {
db_joinset.join_next().await;
// NOTE: we keep only a single task in the background to avoid out of order deletes.
// Parent-path empty markers from batch N could overwrite real node data from batch N+1
// if commits happen out of order.
if !db_joinset.is_empty()
&& let Some(Err(e)) = db_joinset.join_next().await
{
return Err(SyncError::JoinHandle(e));
}
db_joinset.spawn_blocking(move || {
let encode_start = std::time::Instant::now();
let account_count = to_write.len();
let mut encoded_to_write = vec![];
for (hashed_account, nodes) in to_write {
let mut account_nodes = vec![];
let mut account_nodes = std::collections::BTreeMap::new();
for (path, node) in nodes {
for i in 0..path.len() {
account_nodes.push((path.slice(0, i), vec![]));
account_nodes.entry(path.slice(0, i)).or_insert(vec![]);
}
account_nodes.push((path, node.encode_to_vec()));
account_nodes.insert(path, node.encode_to_vec());
}
encoded_to_write.push((hashed_account, account_nodes));
encoded_to_write.push((hashed_account, account_nodes.into_iter().collect()));
}
// PERF: use put_batch_no_alloc? (it needs to remove parent nodes too)
spawned_rt::tasks::block_on(store.write_storage_trie_nodes_batch(encoded_to_write))
let encode_ms = encode_start.elapsed().as_millis() as u64;
let db_start = std::time::Instant::now();
store.write_storage_trie_nodes_batch_sync(encoded_to_write)
.expect("db write failed");
debug!(
account_count,
encode_ms,
db_write_ms = db_start.elapsed().as_millis() as u64,
"storage healing batch write"
);
});
}

Expand Down
23 changes: 14 additions & 9 deletions crates/networking/p2p/sync/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,16 +956,12 @@ async fn insert_accounts(
.collect();
db.ingest_external_file(file_paths)
.map_err(|err| SyncError::RocksDBError(err.into_string()))?;
let iter = db.full_iterator(rocksdb::IteratorMode::Start);
for account in iter {
let account = account.map_err(|err| SyncError::RocksDBError(err.into_string()))?;
let account_state = AccountState::decode(&account.1).map_err(SyncError::Rlp)?;
if account_state.code_hash != *EMPTY_KECCACK_HASH {
code_hash_collector.add(account_state.code_hash);
code_hash_collector.flush_if_needed().await?;
}
}

let start = std::time::Instant::now();
// We collect code hashes directly into the collector's HashSet during the trie
// build pass. The collector deduplicates, so memory is bounded by unique contract
// accounts (~5M on mainnet = ~160MB). We can't call flush_if_needed() here because

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says ~5M entries = ~160MB, but HashSet<H256> has per-entry overhead beyond the raw 32-byte key — typically ~72 bytes/entry on 64-bit (key + hash + bucket metadata + load factor headroom). At 5M entries that's closer to 350-400MB peak during the synchronous trie build.

This might be fine for the target hardware running snap sync, but the comment understates actual memory usage. Worth either correcting the estimate or noting that it's the raw key size only.

// the trie build is synchronous, so we flush after the build completes.
let iter = db.full_iterator(rocksdb::IteratorMode::Start);
Comment on lines +960 to 965

Copilot AI Feb 9, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code_hashes collects every non-empty account code hash into a Vec during the trie-build pass. On large networks this can grow very large (and includes duplicates), increasing peak RSS and risking OOM compared to the previous streaming flush.

Consider avoiding the intermediate Vec by either (a) adding hashes directly into CodeHashCollector during .inspect() (no await), and flushing/spawning file dumps in a bounded way after the trie build, or (b) sending hashes to a background writer via a channel so the buffer can be flushed incrementally while the trie is being built.

Copilot uses AI. Check for mistakes.
let compute_state_root = trie_from_sorted_accounts_wrap(
trie.db(),
Expand All @@ -976,6 +972,9 @@ async fn insert_accounts(
.account_tries_inserted
.fetch_add(1, Ordering::Relaxed);
let account_state = AccountState::decode(v).expect("We should have accounts here");
Comment on lines 972 to 974

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics on bad input

insert_accounts now hard-panics on any temp RocksDB iterator error (.expect(...)) and on any malformed account value (AccountState::decode(v).expect(...)). These are external inputs (snapshot files / DB iterator) and can be corrupted; panicking will crash the sync process instead of returning SyncError like the rest of this function does.

Consider propagating these as SyncError::RocksDBError / SyncError::Rlp instead of using expect inside the iterator (map_err(...)?).

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 972:974

Comment:
**Panics on bad input**

`insert_accounts` now hard-panics on any temp RocksDB iterator error (`.expect(...)`) and on any malformed account value (`AccountState::decode(v).expect(...)`). These are external inputs (snapshot files / DB iterator) and can be corrupted; panicking will crash the sync process instead of returning `SyncError` like the rest of this function does.

Consider propagating these as `SyncError::RocksDBError` / `SyncError::Rlp` instead of using `expect` inside the iterator (`map_err(...)?`).

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This absolutely shouldn't happen. The only scenario is where the user has a faulty file system.

Comment on lines 966 to 974

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics on iterator/rlp decode

insert_accounts still has expect(...) calls inside the RocksDB iterator (.map(|k| k.expect(...)) and AccountState::decode(v).expect(...)). These are external inputs (temp RocksDB iterator + snapshot content), so a single iterator error or malformed RLP will crash the whole sync instead of returning a SyncError like the rest of the function does. This should be converted to map_err(...)? / Result propagation so snap sync can fail gracefully with a useful error.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 966:974

Comment:
**Panics on iterator/rlp decode**

`insert_accounts` still has `expect(...)` calls inside the RocksDB iterator (`.map(|k| k.expect(...))` and `AccountState::decode(v).expect(...)`). These are external inputs (temp RocksDB iterator + snapshot content), so a single iterator error or malformed RLP will crash the whole sync instead of returning a `SyncError` like the rest of the function does. This should be converted to `map_err(...)?` / `Result` propagation so snap sync can fail gracefully with a useful error.

How can I resolve this? If you propose a fix, please make it concise.

if account_state.code_hash != *EMPTY_KECCACK_HASH {
code_hash_collector.add(account_state.code_hash);
}
Comment on lines 960 to +977

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded code hash buffer
insert_accounts now collects all non-empty code_hash values into code_hashes: Vec<H256> and only flushes after trie_from_sorted_accounts_wrap finishes. On large snapshots this can grow to millions of hashes and OOM before the trie build completes. This used to be bounded by flush_if_needed() during the scan. Consider flushing incrementally during the iterator pass (e.g., push+flush when the collector is full) or using a bounded buffer.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 960:974

Comment:
**Unbounded code hash buffer**
`insert_accounts` now collects all non-empty `code_hash` values into `code_hashes: Vec<H256>` and only flushes after `trie_from_sorted_accounts_wrap` finishes. On large snapshots this can grow to millions of hashes and OOM before the trie build completes. This used to be bounded by `flush_if_needed()` during the scan. Consider flushing incrementally during the iterator pass (e.g., push+flush when the collector is full) or using a bounded buffer.


How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 08ba6f92c. Removed the intermediate Vec<H256> entirely — code hashes are now added directly into CodeHashCollector's internal HashSet during the .inspect() callback. The HashSet deduplicates, so memory is bounded by unique contract accounts (~5M on mainnet at 32 bytes each = ~160MB) instead of growing unboundedly with duplicates.

flush_if_needed() is called once after the trie build completes. We can't call it during the iterator because trie_from_sorted_accounts_wrap is synchronous and flush_if_needed() is async.

if account_state.storage_root != *EMPTY_TRIE_HASH {
storage_accounts.accounts_with_storage_root.insert(
H256::from_slice(k),
Expand All @@ -986,6 +985,12 @@ async fn insert_accounts(
.map(|(k, v)| (H256::from_slice(&k), v.to_vec())),
)
.map_err(SyncError::TrieGenerationError)?;
debug!(
elapsed_ms = start.elapsed().as_millis() as u64,
"insert_accounts trie build"
);
// Flush any remaining code hashes that accumulated during the trie build
code_hash_collector.flush_if_needed().await?;

drop(db); // close db before removing directory

Expand Down
39 changes: 28 additions & 11 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,22 +1139,39 @@ impl Store {
) -> Result<(), StoreError> {
let mut txn = self.backend.begin_write()?;
tokio::task::spawn_blocking(move || {
for (address_hash, nodes) in storage_trie_nodes {
for (node_path, node_data) in nodes {
let key = apply_prefix(Some(address_hash), node_path);
if node_data.is_empty() {
txn.delete(STORAGE_TRIE_NODES, key.as_ref())?;
} else {
txn.put(STORAGE_TRIE_NODES, key.as_ref(), &node_data)?;
}
}
}
txn.commit()
Self::write_storage_trie_nodes_inner(&mut *txn, storage_trie_nodes)
})
.await
.map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
}

/// Synchronous version of [`Self::write_storage_trie_nodes_batch`] for use inside
/// `spawn_blocking` closures, avoiding a double thread hop through `block_on`.
pub fn write_storage_trie_nodes_batch_sync(
&self,
storage_trie_nodes: StorageUpdates,
) -> Result<(), StoreError> {
let mut txn = self.backend.begin_write()?;
Self::write_storage_trie_nodes_inner(&mut *txn, storage_trie_nodes)
}

fn write_storage_trie_nodes_inner(
txn: &mut dyn crate::api::StorageWriteBatch,
storage_trie_nodes: StorageUpdates,
) -> Result<(), StoreError> {
for (address_hash, nodes) in storage_trie_nodes {
for (node_path, node_data) in nodes {
let key = apply_prefix(Some(address_hash), node_path);
if node_data.is_empty() {
txn.delete(STORAGE_TRIE_NODES, key.as_ref())?;
} else {
txn.put(STORAGE_TRIE_NODES, key.as_ref(), &node_data)?;
}
}
}
txn.commit()
}

/// CAUTION: This method writes directly to the underlying database, bypassing any caching layer.
/// For updating the state after block execution, use [`Self::store_block_updates`].
pub async fn write_account_code_batch(
Expand Down
43 changes: 40 additions & 3 deletions crates/storage/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::api::{StorageBackend, StorageLockedView};
use crate::error::StoreError;
use crate::layering::apply_prefix;
use ethrex_common::H256;
use ethrex_trie::{Nibbles, TrieDB, error::TrieError};
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::{Nibbles, Node, TrieDB, error::TrieError};
use std::sync::Arc;
use std::time::Instant;

/// StorageWriteBatch implementation for the TrieDB trait
/// Wraps a transaction to allow multiple trie operations on the same transaction
Expand Down Expand Up @@ -101,17 +103,52 @@ impl TrieDB for BackendTrieDB {
}

fn put_batch(&self, key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
let start = Instant::now();
let item_count = key_values.len();
let mut tx = self.db.begin_write().map_err(|e| {
TrieError::DbError(anyhow::anyhow!("Failed to begin write transaction: {}", e))
})?;
for (key, value) in key_values {
let prefixed_key = self.make_key(key);
let table = self.table_for_key(&prefixed_key);
tx.put_batch(table, vec![(prefixed_key, value)])
tx.put(table, &prefixed_key, &value)
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
}
let commit_start = Instant::now();
tx.commit()
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
tracing::debug!(
items = item_count,
total_ms = start.elapsed().as_millis() as u64,
commit_ms = commit_start.elapsed().as_millis() as u64,
"BackendTrieDB::put_batch"
);
Ok(())
}

fn put_batch_no_alloc(&self, key_values: &[(Nibbles, Node)]) -> Result<(), TrieError> {
let start = Instant::now();
let item_count = key_values.len();
let mut tx = self.db.begin_write().map_err(|e| {
TrieError::DbError(anyhow::anyhow!("Failed to begin write transaction: {}", e))
})?;
for (path, node) in key_values {
let prefixed_key = self.make_key(path.clone());
let table = self.table_for_key(&prefixed_key);
let encoded = node.encode_to_vec();
tx.put(table, &prefixed_key, &encoded)
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
}
let commit_start = Instant::now();
tx.commit()
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
tracing::debug!(
items = item_count,
total_ms = start.elapsed().as_millis() as u64,
commit_ms = commit_start.elapsed().as_millis() as u64,
"BackendTrieDB::put_batch_no_alloc"
);
Ok(())
}
}

Expand Down
Loading