Skip to content
Open
12 changes: 11 additions & 1 deletion crates/common/trie/trie_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ fn flush_nodes_to_write(
db: &dyn TrieDB,
sender: Sender<Vec<(Nibbles, Node)>>,
) -> Result<(), TrieGenerationError> {
let start = std::time::Instant::now();
let node_count = nodes_to_write.len();
db.put_batch_no_alloc(&nodes_to_write)
.map_err(TrieGenerationError::FlushToDbError)?;
tracing::debug!(
node_count,
elapsed_ms = start.elapsed().as_millis() as u64,
"flush_nodes_to_write"
);
nodes_to_write.clear();
let _ = sender.send(nodes_to_write);
Ok(())
Expand Down Expand Up @@ -351,7 +358,10 @@ where
let _ = buffer_sender.send(Vec::with_capacity(SIZE_TO_WRITE_DB as usize));
}
scope(|s| {
let pool = ThreadPool::new(12, s);
let thread_count = std::thread::available_parallelism()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be at least more than 2. If there is only a single thread it will wait on the write buffers getting freed.

.map(|n| n.get())
.unwrap_or(8);
let pool = ThreadPool::new(thread_count, s);
trie_from_sorted_accounts(
db,
accounts_iter,
Expand Down
19 changes: 14 additions & 5 deletions crates/networking/p2p/sync/healing/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,28 +283,37 @@ async fn heal_state_trie(
// PERF: reuse buffers?
let to_write = std::mem::take(&mut nodes_to_write);
let store = store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
if !db_joinset.is_empty() {
// Allow up to 2 tasks in flight to overlap encoding with DB commit
while db_joinset.len() >= 2 {
db_joinset
.join_next()
.await
.expect("we just checked joinset is not empty")?;
.expect("joinset is not empty")?;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JoinSet errors dropped
This loop uses join_next().await.expect(...)?; but the spawned closure returns (), so task panics/errors are effectively ignored (and ? here can only be applying to the Option, not the task result). That means DB write panics can be silently swallowed and healing can proceed as if writes succeeded. You likely want to match join_next().await and handle Err(JoinError) explicitly (and propagate/abort on panic).

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/healing/state.rs
Line: 285:292

Comment:
**JoinSet errors dropped**
This loop uses `join_next().await.expect(...)?;` but the spawned closure returns `()`, so task panics/errors are effectively ignored (and `?` here can only be applying to the `Option`, not the task result). That means DB write panics can be silently swallowed and healing can proceed as if writes succeeded. You likely want to `match join_next().await` and handle `Err(JoinError)` explicitly (and propagate/abort on panic).


How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the suggestion from the bot is wrong, but I would check the types of the task to make it return errors.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct — my comment was wrong.

The spawned closure does return Result<(), SyncError> (you can see Ok(()) at the end of the closure). The JoinSet type is JoinSet<Result<(), SyncError>>, so join_next().await returns:

  • Some(Ok(Ok(()))) — task succeeded, closure returned Ok
  • Some(Ok(Err(e))) — task succeeded, closure returned Err
  • Some(Err(JoinError)) — task panicked/cancelled

The PR properly handles all three cases via the wait_for_pending_task() helper added in healing/mod.rs (lines 22-28 of the diff), which matches on the result and propagates errors correctly. This is actually a fix — before the PR, the closures used .expect() which would panic instead of returning errors.

The error handling is correct as-is. My apologies for the noise!

db_joinset.spawn_blocking(move || {
let encode_start = std::time::Instant::now();
let node_count = to_write.len();
let mut encoded_to_write = BTreeMap::new();
for (path, node) in to_write {
for i in 0..path.len() {
encoded_to_write.insert(path.slice(0, i), vec![]);
encoded_to_write.entry(path.slice(0, i)).or_insert(vec![]);
}
encoded_to_write.insert(path, node.encode_to_vec());
}
let encode_ms = encode_start.elapsed().as_millis() as u64;
let db_start = std::time::Instant::now();
let trie_db = store
.open_direct_state_trie(*EMPTY_TRIE_HASH)
.expect("Store should open");
let db = trie_db.db();
// PERF: use put_batch_no_alloc (note that it needs to remove nodes too)
db.put_batch(encoded_to_write.into_iter().collect())
.expect("The put batch on the store failed");
debug!(
node_count,
encode_ms,
db_write_ms = db_start.elapsed().as_millis() as u64,
"state healing batch write"
);
});
}

Expand Down
25 changes: 17 additions & 8 deletions crates/networking/p2p/sync/healing/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,34 @@ pub async fn heal_storage_trie(
if nodes_to_write.values().map(Vec::len).sum::<usize>() > 100_000 || is_done || is_stale {
let to_write: Vec<_> = nodes_to_write.drain().collect();
let store = state.store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
if !db_joinset.is_empty() {
// Allow up to 2 tasks in flight to overlap encoding with DB commit
while db_joinset.len() >= 2 {
db_joinset.join_next().await;

Copilot AI Feb 9, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The join_next().await result is ignored here. If a background DB write task panics or returns an error, this will be silently dropped and healing may continue/finish without persisting data.

Handle the returned Option<Result<_, JoinError>> (and propagate/log failures) when draining the JoinSet, similar to how state healing uses expect(...) + ? on join_next().

Suggested change
db_joinset.join_next().await;
match db_joinset.join_next().await {
Some(Ok(_)) => {
// successfully completed background DB write task
}
Some(Err(e)) => {
error!(?e, "background DB write task failed during storage healing");
}
None => {
// no more tasks to join
break;
}
}

Copilot uses AI. Check for mistakes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 08ba6f92c. The join_next() result is now checked and JoinError is propagated as SyncError::JoinHandle(e), which will surface task panics instead of silently dropping them.

}
db_joinset.spawn_blocking(move || {
let encode_start = std::time::Instant::now();
let account_count = to_write.len();
let mut encoded_to_write = vec![];
for (hashed_account, nodes) in to_write {
let mut account_nodes = vec![];
let mut account_nodes = std::collections::BTreeMap::new();
for (path, node) in nodes {
for i in 0..path.len() {
account_nodes.push((path.slice(0, i), vec![]));
account_nodes.entry(path.slice(0, i)).or_insert(vec![]);
}
account_nodes.push((path, node.encode_to_vec()));
account_nodes.insert(path, node.encode_to_vec());
}
encoded_to_write.push((hashed_account, account_nodes));
encoded_to_write.push((hashed_account, account_nodes.into_iter().collect()));
}
// PERF: use put_batch_no_alloc? (it needs to remove parent nodes too)
spawned_rt::tasks::block_on(store.write_storage_trie_nodes_batch(encoded_to_write))
let encode_ms = encode_start.elapsed().as_millis() as u64;
let db_start = std::time::Instant::now();
store.write_storage_trie_nodes_batch_sync(encoded_to_write)
.expect("db write failed");
debug!(
account_count,
encode_ms,
db_write_ms = db_start.elapsed().as_millis() as u64,
"storage healing batch write"
);
});
}

Expand Down
23 changes: 14 additions & 9 deletions crates/networking/p2p/sync/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,16 +956,9 @@ async fn insert_accounts(
.collect();
db.ingest_external_file(file_paths)
.map_err(|err| SyncError::RocksDBError(err.into_string()))?;
let iter = db.full_iterator(rocksdb::IteratorMode::Start);
for account in iter {
let account = account.map_err(|err| SyncError::RocksDBError(err.into_string()))?;
let account_state = AccountState::decode(&account.1).map_err(SyncError::Rlp)?;
if account_state.code_hash != *EMPTY_KECCACK_HASH {
code_hash_collector.add(account_state.code_hash);
code_hash_collector.flush_if_needed().await?;
}
}

let start = std::time::Instant::now();
let mut code_hashes: Vec<H256> = Vec::new();
let iter = db.full_iterator(rocksdb::IteratorMode::Start);
Comment on lines +960 to 965

Copilot AI Feb 9, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code_hashes collects every non-empty account code hash into a Vec during the trie-build pass. On large networks this can grow very large (and includes duplicates), increasing peak RSS and risking OOM compared to the previous streaming flush.

Consider avoiding the intermediate Vec by either (a) adding hashes directly into CodeHashCollector during .inspect() (no await), and flushing/spawning file dumps in a bounded way after the trie build, or (b) sending hashes to a background writer via a channel so the buffer can be flushed incrementally while the trie is being built.

Copilot uses AI. Check for mistakes.
let compute_state_root = trie_from_sorted_accounts_wrap(
trie.db(),
Expand All @@ -976,6 +969,9 @@ async fn insert_accounts(
.account_tries_inserted
.fetch_add(1, Ordering::Relaxed);
let account_state = AccountState::decode(v).expect("We should have accounts here");
Comment on lines 972 to 974

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics on bad input

insert_accounts now hard-panics on any temp RocksDB iterator error (.expect(...)) and on any malformed account value (AccountState::decode(v).expect(...)). These are external inputs (snapshot files / DB iterator) and can be corrupted; panicking will crash the sync process instead of returning SyncError like the rest of this function does.

Consider propagating these as SyncError::RocksDBError / SyncError::Rlp instead of using expect inside the iterator (map_err(...)?).

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 972:974

Comment:
**Panics on bad input**

`insert_accounts` now hard-panics on any temp RocksDB iterator error (`.expect(...)`) and on any malformed account value (`AccountState::decode(v).expect(...)`). These are external inputs (snapshot files / DB iterator) and can be corrupted; panicking will crash the sync process instead of returning `SyncError` like the rest of this function does.

Consider propagating these as `SyncError::RocksDBError` / `SyncError::Rlp` instead of using `expect` inside the iterator (`map_err(...)?`).

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This absolutely shouldn't happen. The only scenario is where the user has a faulty file system.

Comment on lines 966 to 974

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panics on iterator/rlp decode

insert_accounts still has expect(...) calls inside the RocksDB iterator (.map(|k| k.expect(...)) and AccountState::decode(v).expect(...)). These are external inputs (temp RocksDB iterator + snapshot content), so a single iterator error or malformed RLP will crash the whole sync instead of returning a SyncError like the rest of the function does. This should be converted to map_err(...)? / Result propagation so snap sync can fail gracefully with a useful error.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 966:974

Comment:
**Panics on iterator/rlp decode**

`insert_accounts` still has `expect(...)` calls inside the RocksDB iterator (`.map(|k| k.expect(...))` and `AccountState::decode(v).expect(...)`). These are external inputs (temp RocksDB iterator + snapshot content), so a single iterator error or malformed RLP will crash the whole sync instead of returning a `SyncError` like the rest of the function does. This should be converted to `map_err(...)?` / `Result` propagation so snap sync can fail gracefully with a useful error.

How can I resolve this? If you propose a fix, please make it concise.

if account_state.code_hash != *EMPTY_KECCACK_HASH {
code_hashes.push(account_state.code_hash);
}
Comment on lines 960 to +977

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded code hash buffer
insert_accounts now collects all non-empty code_hash values into code_hashes: Vec<H256> and only flushes after trie_from_sorted_accounts_wrap finishes. On large snapshots this can grow to millions of hashes and OOM before the trie build completes. This used to be bounded by flush_if_needed() during the scan. Consider flushing incrementally during the iterator pass (e.g., push+flush when the collector is full) or using a bounded buffer.

Prompt To Fix With AI
This is a comment left during a code review.
Path: crates/networking/p2p/sync/snap_sync.rs
Line: 960:974

Comment:
**Unbounded code hash buffer**
`insert_accounts` now collects all non-empty `code_hash` values into `code_hashes: Vec<H256>` and only flushes after `trie_from_sorted_accounts_wrap` finishes. On large snapshots this can grow to millions of hashes and OOM before the trie build completes. This used to be bounded by `flush_if_needed()` during the scan. Consider flushing incrementally during the iterator pass (e.g., push+flush when the collector is full) or using a bounded buffer.


How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 08ba6f92c. Removed the intermediate Vec<H256> entirely — code hashes are now added directly into CodeHashCollector's internal HashSet during the .inspect() callback. The HashSet deduplicates, so memory is bounded by unique contract accounts (~5M on mainnet at 32 bytes each = ~160MB) instead of growing unboundedly with duplicates.

flush_if_needed() is called once after the trie build completes. We can't call it during the iterator because trie_from_sorted_accounts_wrap is synchronous and flush_if_needed() is async.

if account_state.storage_root != *EMPTY_TRIE_HASH {
storage_accounts.accounts_with_storage_root.insert(
H256::from_slice(k),
Expand All @@ -986,6 +982,15 @@ async fn insert_accounts(
.map(|(k, v)| (H256::from_slice(&k), v.to_vec())),
)
.map_err(SyncError::TrieGenerationError)?;
debug!(
elapsed_ms = start.elapsed().as_millis() as u64,
code_hashes_count = code_hashes.len(),
"insert_accounts trie build"
);
for hash in code_hashes {
code_hash_collector.add(hash);
code_hash_collector.flush_if_needed().await?;
}

drop(db); // close db before removing directory

Expand Down
39 changes: 28 additions & 11 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1139,22 +1139,39 @@ impl Store {
) -> Result<(), StoreError> {
let mut txn = self.backend.begin_write()?;
tokio::task::spawn_blocking(move || {
for (address_hash, nodes) in storage_trie_nodes {
for (node_path, node_data) in nodes {
let key = apply_prefix(Some(address_hash), node_path);
if node_data.is_empty() {
txn.delete(STORAGE_TRIE_NODES, key.as_ref())?;
} else {
txn.put(STORAGE_TRIE_NODES, key.as_ref(), &node_data)?;
}
}
}
txn.commit()
Self::write_storage_trie_nodes_inner(&mut *txn, storage_trie_nodes)
})
.await
.map_err(|e| StoreError::Custom(format!("Task panicked: {}", e)))?
}

/// Synchronous version of [`Self::write_storage_trie_nodes_batch`] for use inside
/// `spawn_blocking` closures, avoiding a double thread hop through `block_on`.
pub fn write_storage_trie_nodes_batch_sync(
&self,
storage_trie_nodes: StorageUpdates,
) -> Result<(), StoreError> {
let mut txn = self.backend.begin_write()?;
Self::write_storage_trie_nodes_inner(&mut *txn, storage_trie_nodes)
}

fn write_storage_trie_nodes_inner(
txn: &mut dyn crate::api::StorageWriteBatch,
storage_trie_nodes: StorageUpdates,
) -> Result<(), StoreError> {
for (address_hash, nodes) in storage_trie_nodes {
for (node_path, node_data) in nodes {
let key = apply_prefix(Some(address_hash), node_path);
if node_data.is_empty() {
txn.delete(STORAGE_TRIE_NODES, key.as_ref())?;
} else {
txn.put(STORAGE_TRIE_NODES, key.as_ref(), &node_data)?;
}
}
}
txn.commit()
}

/// CAUTION: This method writes directly to the underlying database, bypassing any caching layer.
/// For updating the state after block execution, use [`Self::store_block_updates`].
pub async fn write_account_code_batch(
Expand Down
43 changes: 40 additions & 3 deletions crates/storage/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::api::{StorageBackend, StorageLockedView};
use crate::error::StoreError;
use crate::layering::apply_prefix;
use ethrex_common::H256;
use ethrex_trie::{Nibbles, TrieDB, error::TrieError};
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::{Nibbles, Node, TrieDB, error::TrieError};
use std::sync::Arc;
use std::time::Instant;

/// StorageWriteBatch implementation for the TrieDB trait
/// Wraps a transaction to allow multiple trie operations on the same transaction
Expand Down Expand Up @@ -101,17 +103,52 @@ impl TrieDB for BackendTrieDB {
}

fn put_batch(&self, key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
let start = Instant::now();
let item_count = key_values.len();
let mut tx = self.db.begin_write().map_err(|e| {
TrieError::DbError(anyhow::anyhow!("Failed to begin write transaction: {}", e))
})?;
for (key, value) in key_values {
let prefixed_key = self.make_key(key);
let table = self.table_for_key(&prefixed_key);
tx.put_batch(table, vec![(prefixed_key, value)])
tx.put(table, &prefixed_key, &value)
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
}
let commit_start = Instant::now();
tx.commit()
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
tracing::debug!(
items = item_count,
total_ms = start.elapsed().as_millis() as u64,
commit_ms = commit_start.elapsed().as_millis() as u64,
"BackendTrieDB::put_batch"
);
Ok(())
}

fn put_batch_no_alloc(&self, key_values: &[(Nibbles, Node)]) -> Result<(), TrieError> {
let start = Instant::now();
let item_count = key_values.len();
let mut tx = self.db.begin_write().map_err(|e| {
TrieError::DbError(anyhow::anyhow!("Failed to begin write transaction: {}", e))
})?;
for (path, node) in key_values {
let prefixed_key = self.make_key(path.clone());
let table = self.table_for_key(&prefixed_key);
let encoded = node.encode_to_vec();
tx.put(table, &prefixed_key, &encoded)
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
}
let commit_start = Instant::now();
tx.commit()
.map_err(|e| TrieError::DbError(anyhow::anyhow!("Failed to write batch: {}", e)))?;
tracing::debug!(
items = item_count,
total_ms = start.elapsed().as_millis() as u64,
commit_ms = commit_start.elapsed().as_millis() as u64,
"BackendTrieDB::put_batch_no_alloc"
);
Ok(())
}
}

Expand Down
Loading