Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions crates/networking/p2p/sync/healing/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use std::{
cmp::min,
collections::{BTreeMap, HashMap},
collections::HashMap,
sync::atomic::Ordering,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -283,24 +283,20 @@ async fn heal_state_trie(
// PERF: reuse buffers?
let to_write = std::mem::take(&mut nodes_to_write);
let store = store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
// NOTE: we keep only a single task in the background to preserve write ordering
if !db_joinset.is_empty()
&& let Some(result) = db_joinset.join_next().await
{
result??;
}
db_joinset.spawn_blocking(move || -> Result<(), SyncError> {
let mut encoded_to_write = BTreeMap::new();
for (path, node) in to_write {
for i in 0..path.len() {
encoded_to_write.insert(path.slice(0, i), vec![]);
}
encoded_to_write.insert(path, node.encode_to_vec());
}
let encoded_to_write: Vec<_> = to_write
.into_iter()
.map(|(path, node)| (path, node.encode_to_vec()))
.collect();
let trie_db = store.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
let db = trie_db.db();
// PERF: use put_batch_no_alloc (note that it needs to remove nodes too)
db.put_batch(encoded_to_write.into_iter().collect())?;
db.put_batch(encoded_to_write)?;
Ok(())
});
}
Expand Down
14 changes: 5 additions & 9 deletions crates/networking/p2p/sync/healing/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,23 +212,19 @@ pub async fn heal_storage_trie(
if nodes_to_write.values().map(Vec::len).sum::<usize>() > 100_000 || is_done || is_stale {
let to_write: Vec<_> = nodes_to_write.drain().collect();
let store = state.store.clone();
// NOTE: we keep only a single task in the background to avoid out of order deletes
// NOTE: we keep only a single task in the background to preserve write ordering
if !db_joinset.is_empty() {
db_joinset.join_next().await;
}
db_joinset.spawn_blocking(move || {
let mut encoded_to_write = vec![];
for (hashed_account, nodes) in to_write {
let mut account_nodes = vec![];
for (path, node) in nodes {
for i in 0..path.len() {
account_nodes.push((path.slice(0, i), vec![]));
}
account_nodes.push((path, node.encode_to_vec()));
}
let account_nodes = nodes
.into_iter()
.map(|(path, node)| (path, node.encode_to_vec()))
.collect();
encoded_to_write.push((hashed_account, account_nodes));
}
// PERF: use put_batch_no_alloc? (it needs to remove parent nodes too)
spawned_rt::tasks::block_on(store.write_storage_trie_nodes_batch(encoded_to_write))
.expect("db write failed");
});
Expand Down
Loading