Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 157 additions & 6 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,19 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
let mut total_write_state = Duration::ZERO;
let mut total_write_hashed_state = Duration::ZERO;
let mut total_write_trie_changesets = Duration::ZERO;
let mut total_write_trie_updates = Duration::ZERO;

// TODO: Do performant / batched writes for each type of object
// instead of a loop over all blocks,
// meaning:
// * blocks
// * state
// * hashed state
// * trie updates (cannot naively extend, need helper)
// * trie updates
// * indices (already done basically)

// Insert the blocks
let mut trie_update_acc = TrieUpdatesSorted::default();

for block in blocks {
let trie_data = block.trie_data();
let ExecutedBlock { recovered_block, execution_output, .. } = block;
Expand All @@ -409,14 +411,19 @@ impl<TX: DbTx + DbTxMut + 'static, N: NodeTypesForProvider> DatabaseProvider<TX,
total_write_hashed_state += start.elapsed();

let start = Instant::now();
self.write_trie_changesets(block_number, &trie_data.trie_updates, None)?;
// changesets must respect acc. overlay
let overlay = if trie_update_acc.is_empty() { None } else { Some(&trie_update_acc) };
self.write_trie_changesets(block_number, &trie_data.trie_updates, overlay)?;
total_write_trie_changesets += start.elapsed();

let start = Instant::now();
self.write_trie_updates_sorted(&trie_data.trie_updates)?;
total_write_trie_updates += start.elapsed();
// update accumulator for trie updates
trie_update_acc.extend_ref(&trie_data.trie_updates);
}

let start = Instant::now();
self.write_trie_updates_sorted(&trie_update_acc)?;
let total_write_trie_updates = start.elapsed();

// update history indices
let start = Instant::now();
self.update_history_indices(first_number..=last_block_number)?;
Expand Down Expand Up @@ -5102,4 +5109,148 @@ mod tests {
}
}
}

#[test]
fn test_save_blocks_accumulates_trie_updates_for_changesets() {
use reth_chain_state::ComputedTrieData;
use reth_trie::BranchNodeCompact;

let factory = create_test_provider_factory();
let provider_rw = factory.provider_rw().unwrap();

let account_nibbles = Nibbles::from_nibbles([0x1, 0x2, 0x3]);
let account_nibbles_key = StoredNibbles(account_nibbles);
let account_nibbles_subkey = StoredNibblesSubKey(account_nibbles);

let node0 = BranchNodeCompact::new(
0b1111_0000_0000_0000,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
let node1 = BranchNodeCompact::new(
0b0000_1111_0000_0000,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);
let node2 = BranchNodeCompact::new(
0b0000_0000_1111_0000,
0b0000_0000_0000_0000,
0b0000_0000_0000_0000,
vec![],
None,
);

// Pre-populate AccountsTrie with the initial node value.
{
let mut cursor = provider_rw.tx_ref().cursor_write::<tables::AccountsTrie>().unwrap();
cursor.insert(account_nibbles_key.clone(), &node0).unwrap();
}

// Create executed blocks
let mut rng = generators::rng();
let block0 = random_block(&mut rng, 0, BlockParams { parent: None, ..Default::default() });
let block1 = random_block(
&mut rng,
1,
BlockParams { parent: Some(block0.hash()), ..Default::default() },
);
let block2 = random_block(
&mut rng,
2,
BlockParams { parent: Some(block1.hash()), ..Default::default() },
);

let executed0 = ExecutedBlock::new(
Arc::new(RecoveredBlock::new_sealed(block0, Vec::new())),
Arc::new(ExecutionOutcome {
first_block: 0,
receipts: vec![vec![]],
..Default::default()
}),
ComputedTrieData::without_trie_input(
Arc::new(HashedPostStateSorted::default()),
Arc::new(TrieUpdatesSorted::default()),
),
);

let executed1 = ExecutedBlock::new(
Arc::new(RecoveredBlock::new_sealed(block1, Vec::new())),
Arc::new(ExecutionOutcome {
first_block: 1,
receipts: vec![vec![]],
..Default::default()
}),
ComputedTrieData::without_trie_input(
Arc::new(HashedPostStateSorted::default()),
Arc::new(TrieUpdatesSorted::new(
vec![(account_nibbles, Some(node1.clone()))],
B256Map::default(),
)),
),
);

let executed2 = ExecutedBlock::new(
Arc::new(RecoveredBlock::new_sealed(block2, Vec::new())),
Arc::new(ExecutionOutcome {
first_block: 2,
receipts: vec![vec![]],
..Default::default()
}),
ComputedTrieData::without_trie_input(
Arc::new(HashedPostStateSorted::default()),
Arc::new(TrieUpdatesSorted::new(
vec![(account_nibbles, Some(node2.clone()))],
B256Map::default(),
)),
),
);

provider_rw.save_blocks(vec![executed0, executed1, executed2]).unwrap();

// Block 1 changeset should see the pre-existing db value.
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
let entries =
cursor.walk_dup(Some(1u64), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(
entries,
vec![(
1u64,
TrieChangeSetsEntry {
nibbles: account_nibbles_subkey.clone(),
node: Some(node0),
}
)]
);
}

// Block 2 changeset should see block 1's update via the overlay.
{
let mut cursor =
provider_rw.tx_ref().cursor_dup_read::<tables::AccountsTrieChangeSets>().unwrap();
let entries =
cursor.walk_dup(Some(2u64), None).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(
entries,
vec![(
2u64,
TrieChangeSetsEntry { nibbles: account_nibbles_subkey, node: Some(node1) }
)]
);
}

// Final trie state should reflect the last update.
{
let mut cursor = provider_rw.tx_ref().cursor_read::<tables::AccountsTrie>().unwrap();
let entry = cursor.seek_exact(account_nibbles_key).unwrap();
assert_eq!(entry.map(|e| e.1), Some(node2));
}

provider_rw.commit().unwrap();
}
}
Loading