diff --git a/Cargo.lock b/Cargo.lock index acd6de5ea23..a87f1da8b02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9613,6 +9613,7 @@ dependencies = [ "alloy-genesis", "alloy-primitives", "auto_impl", + "bincode 1.3.3", "bytes", "derive_more", "eyre", diff --git a/crates/optimism/trie/Cargo.toml b/crates/optimism/trie/Cargo.toml index accb1388715..d6450b03d1c 100644 --- a/crates/optimism/trie/Cargo.toml +++ b/crates/optimism/trie/Cargo.toml @@ -35,6 +35,7 @@ tokio = { workspace = true, features = ["sync"] } # codec bytes.workspace = true serde.workspace = true +bincode.workspace = true # misc thiserror.workspace = true diff --git a/crates/optimism/trie/src/db/models/change_set.rs b/crates/optimism/trie/src/db/models/change_set.rs new file mode 100644 index 00000000000..12ec68594b6 --- /dev/null +++ b/crates/optimism/trie/src/db/models/change_set.rs @@ -0,0 +1,125 @@ +use crate::db::{HashedStorageKey, StorageTrieKey}; +use alloy_primitives::B256; +use reth_db::{ + table::{self, Decode, Encode}, + DatabaseError, +}; +use reth_trie::StoredNibbles; +use serde::{Deserialize, Serialize}; + +/// The keys of the entries in the history tables. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct ChangeSet { + /// Keys changed in [`AccountTrieHistory`](super::AccountTrieHistory) table. + pub account_trie_keys: Vec, + /// Keys changed in [`StorageTrieHistory`](super::StorageTrieHistory) table. + pub storage_trie_keys: Vec, + /// Keys changed in [`HashedAccountHistory`](super::HashedAccountHistory) table. + pub hashed_account_keys: Vec, + /// Keys changed in [`HashedStorageHistory`](super::HashedStorageHistory) table. + pub hashed_storage_keys: Vec, +} + +impl table::Encode for ChangeSet { + type Encoded = Vec; + + fn encode(self) -> Self::Encoded { + bincode::serialize(&self).expect("ChangeSet serialization should not fail") + } +} + +impl table::Decode for ChangeSet { + fn decode(value: &[u8]) -> Result { + bincode::deserialize(value).map_err(|_| DatabaseError::Decode) + } +} + +impl table::Compress for ChangeSet { + type Compressed = Vec; + + fn compress_to_buf>(&self, buf: &mut B) { + let encoded = self.clone().encode(); + buf.put_slice(&encoded); + } +} + +impl table::Decompress for ChangeSet { + fn decompress(value: &[u8]) -> Result { + Self::decode(value) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use alloy_primitives::B256; + use reth_db::table::{Compress, Decompress}; + + #[test] + fn test_encode_decode_empty_change_set() { + let change_set = ChangeSet { + account_trie_keys: vec![], + storage_trie_keys: vec![], + hashed_account_keys: vec![], + hashed_storage_keys: vec![], + }; + + let encoded = change_set.clone().encode(); + let decoded = ChangeSet::decode(&encoded).expect("Failed to decode"); + assert_eq!(change_set, decoded); + } + + #[test] + fn test_encode_decode_populated_change_set() { + let account_key = StoredNibbles::from(vec![1, 2, 3, 4]); + let storage_key = StorageTrieKey { + hashed_address: B256::repeat_byte(0x11), + path: StoredNibbles::from(vec![5, 6, 7, 8]), + }; + let hashed_storage_key = HashedStorageKey { + hashed_address: B256::repeat_byte(0x22), + hashed_storage_key: B256::repeat_byte(0x33), + }; + + let change_set = ChangeSet { + account_trie_keys: vec![account_key], + storage_trie_keys: vec![storage_key], + hashed_account_keys: vec![B256::repeat_byte(0x44)], + hashed_storage_keys: vec![hashed_storage_key], + }; + + let encoded = change_set.clone().encode(); + let decoded = ChangeSet::decode(&encoded).expect("Failed to decode"); + assert_eq!(change_set, decoded); + } + + #[test] + fn test_decode_invalid_data() { + let invalid_data = vec![0xFF; 32]; + let result = ChangeSet::decode(&invalid_data); + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), DatabaseError::Decode)); + } + + #[test] + fn test_compress_decompress() { + let change_set = ChangeSet { + account_trie_keys: vec![StoredNibbles::from(vec![1, 2, 3])], + storage_trie_keys: vec![StorageTrieKey { + hashed_address: B256::ZERO, + path: StoredNibbles::from(vec![4, 5, 6]), + }], + hashed_account_keys: vec![B256::ZERO], + hashed_storage_keys: vec![HashedStorageKey { + hashed_address: B256::ZERO, + hashed_storage_key: B256::repeat_byte(0x42), + }], + }; + + let mut buf = Vec::new(); + change_set.compress_to_buf(&mut buf); + + let decompressed = ChangeSet::decompress(&buf).expect("Failed to decompress"); + assert_eq!(change_set, decompressed); + } +} diff --git a/crates/optimism/trie/src/db/models/mod.rs b/crates/optimism/trie/src/db/models/mod.rs index 10f625689a5..af19945f677 100644 --- a/crates/optimism/trie/src/db/models/mod.rs +++ b/crates/optimism/trie/src/db/models/mod.rs @@ -11,6 +11,8 @@ mod version; pub use version::*; mod storage; pub use storage::*; +mod change_set; +pub use change_set::*; use alloy_primitives::B256; use reth_db::{ @@ -70,4 +72,11 @@ tables! { type Key = ProofWindowKey; type Value = BlockNumberHash; } + + /// A reverse mapping of block numbers to a keys of the tables. + /// This is used for efficiently locating data by block number. + table BlockChangeSet { + type Key = u64; // Block number + type Value = ChangeSet; + } } diff --git a/crates/optimism/trie/src/db/store.rs b/crates/optimism/trie/src/db/store.rs index 7cc4308fb00..2e7da0007a4 100644 --- a/crates/optimism/trie/src/db/store.rs +++ b/crates/optimism/trie/src/db/store.rs @@ -3,8 +3,9 @@ use crate::{ db::{ cursor::Dup, models::{ - AccountTrieHistory, HashedAccountHistory, HashedStorageHistory, HashedStorageKey, - MaybeDeleted, StorageTrieHistory, StorageTrieKey, StorageValue, VersionedValue, + AccountTrieHistory, BlockChangeSet, ChangeSet, HashedAccountHistory, + HashedStorageHistory, HashedStorageKey, MaybeDeleted, StorageTrieHistory, + StorageTrieKey, StorageValue, VersionedValue, }, MdbxAccountCursor, MdbxStorageCursor, MdbxTrieCursor, }, @@ -14,7 +15,7 @@ use alloy_eips::eip1898::BlockWithParent; use alloy_primitives::{map::HashMap, B256, U256}; use itertools::Itertools; use reth_db::{ - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRW}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, mdbx::{init_db_for, DatabaseArguments}, transaction::DbTx, Database, DatabaseEnv, @@ -274,10 +275,25 @@ impl OpProofsStore for MdbxProofsStorage { } self.env.update(|tx| { + let account_trie_len = sorted_account_nodes.len(); + let storage_trie_len = sorted_storage_nodes.len(); + let hashed_account_len = sorted_accounts.size_hint().0; + let hashed_storage_len = sorted_storage.len(); + + // Preparing the entries for the `BlockChangeSet` table + let mut account_trie_keys = Vec::::with_capacity(account_trie_len); + let mut storage_trie_keys = Vec::::with_capacity(storage_trie_len); + let mut hashed_account_keys = Vec::::with_capacity(hashed_account_len); + let mut hashed_storage_keys = + Vec::::with_capacity(hashed_storage_len); + let mut account_trie_cursor = tx.new_cursor::()?; for (path, node) in sorted_account_nodes { + let key: StoredNibbles = path.into(); let vv = VersionedValue { block_number, value: MaybeDeleted(node) }; - account_trie_cursor.append_dup(path.into(), vv)?; + account_trie_cursor.append_dup(key.clone(), vv)?; + + account_trie_keys.push(key); } let mut storage_trie_cursor = tx.new_cursor::()?; @@ -286,7 +302,9 @@ impl OpProofsStore for MdbxProofsStorage { for (path, node) in nodes.storage_nodes { let key = StorageTrieKey::new(hashed_address, path.into()); let vv = VersionedValue { block_number, value: MaybeDeleted(node) }; - storage_trie_cursor.append_dup(key, vv)?; + storage_trie_cursor.append_dup(key.clone(), vv)?; + + storage_trie_keys.push(key); } } @@ -294,6 +312,8 @@ impl OpProofsStore for MdbxProofsStorage { for (hashed_address, account) in sorted_accounts { let vv = VersionedValue { block_number, value: MaybeDeleted(account) }; account_cursor.append_dup(hashed_address, vv)?; + + hashed_account_keys.push(hashed_address); } let mut storage_cursor = tx.new_cursor::()?; @@ -306,10 +326,24 @@ impl OpProofsStore for MdbxProofsStorage { value: MaybeDeleted(Some(StorageValue(storage_value))), }; let key = HashedStorageKey::new(*hashed_address, storage_key); - storage_cursor.append_dup(key, vv)?; + storage_cursor.append_dup(key.clone(), vv)?; + + hashed_storage_keys.push(key); } } + // Cursor for recording all changes made in this block for all history tables + let mut change_set_cursor = tx.new_cursor::()?; + change_set_cursor.append( + block_number, + &ChangeSet { + account_trie_keys, + storage_trie_keys, + hashed_account_keys, + hashed_storage_keys, + }, + )?; + // update proof window latest block let mut proof_window_cursor = tx.new_cursor::()?; proof_window_cursor.append( @@ -327,12 +361,86 @@ impl OpProofsStore for MdbxProofsStorage { unimplemented!() } + /// Prune all historical trie data prior to `new_earliest_block_number` using + /// the [`BlockChangeSet`] index. async fn prune_earliest_state( &self, - _new_earliest_block_number: u64, + new_earliest_block_number: u64, _diff: BlockStateDiff, ) -> OpProofsStorageResult<()> { - unimplemented!() + let Some((start_block, _)) = self.get_earliest_block_number().await? else { + return Ok(()); // Nothing to prune + }; + + if start_block >= new_earliest_block_number { + return Ok(()); // Nothing to prune + } + + self.env.update(|tx| { + // Collect keys to prune first to avoid borrow checker issues with cursors. + let keys_to_prune: Vec<(u64, ChangeSet)> = { + let mut change_set_cursor = tx.new_cursor::()?; + change_set_cursor + .walk_range(start_block..new_earliest_block_number)? + .collect::, _>>()? + }; + + if !keys_to_prune.is_empty() { + let mut account_trie_cursor = tx.new_cursor::()?; + let mut storage_trie_cursor = tx.new_cursor::()?; + let mut hashed_account_cursor = tx.new_cursor::()?; + let mut hashed_storage_cursor = tx.new_cursor::()?; + let mut change_set_cursor = tx.new_cursor::()?; + + // Process already sorted entries directly + for (block_number, change_set) in &keys_to_prune { + // Process account trie entries + for key in &change_set.account_trie_keys { + if let Some(vv) = + account_trie_cursor.seek_by_key_subkey(key.clone(), *block_number)? && + vv.block_number == *block_number + { + account_trie_cursor.delete_current()?; + } + } + + // Process storage trie entries + for key in &change_set.storage_trie_keys { + if let Some(vv) = + storage_trie_cursor.seek_by_key_subkey(key.clone(), *block_number)? && + vv.block_number == *block_number + { + storage_trie_cursor.delete_current()?; + } + } + + // Process hashed account entries + for key in &change_set.hashed_account_keys { + if let Some(vv) = + hashed_account_cursor.seek_by_key_subkey(*key, *block_number)? && + vv.block_number == *block_number + { + hashed_account_cursor.delete_current()?; + } + } + + // Process hashed storage entries + for key in &change_set.hashed_storage_keys { + if let Some(vv) = + hashed_storage_cursor.seek_by_key_subkey(key.clone(), *block_number)? && + vv.block_number == *block_number + { + hashed_storage_cursor.delete_current()?; + } + } + + // Delete the change set immediately + change_set_cursor.seek_exact(*block_number)?; + change_set_cursor.delete_current()?; + } + } + Ok(()) + })? } async fn replace_updates( @@ -361,7 +469,10 @@ mod tests { }; use alloy_eips::NumHash; use alloy_primitives::B256; - use reth_db::{cursor::DbDupCursorRO, transaction::DbTx}; + use reth_db::{ + cursor::DbDupCursorRO, + transaction::{DbTx, DbTxMut}, + }; use reth_trie::{ updates::StorageTrieUpdates, BranchNodeCompact, HashedStorage, Nibbles, StoredNibbles, }; @@ -935,6 +1046,14 @@ mod tests { assert_eq!(inner2.0, val2); } + // Verify BlockChangeSet entries + { + let tx = store.env.tx().expect("tx"); + let mut cur = tx.new_cursor::().expect("cursor"); + let entries: Vec<_> = cur.walk(Some(BLOCK.block.number)).expect("walk").collect(); + assert_eq!(entries.len(), 1, "Expected 1 BlockChangeSet entry"); + } + // check the latest block number in proof window { let tx = store.env.tx().expect("tx"); @@ -1032,6 +1151,178 @@ mod tests { let mut cur4 = tx.new_cursor::().expect("cursor"); assert!(cur4.next_dup_val().expect("first").is_none(), "Hashed storage should be empty"); + + let mut cur5 = tx.new_cursor::().expect("cursor"); + assert!( + cur5.next().expect("first").is_some(), + "Pruning index SHOULD populate the change set even for empty diffs" + ); + } + + #[tokio::test] + async fn test_prune_earliest_state_single_entry() { + let dir = TempDir::new().unwrap(); + let store = MdbxProofsStorage::new(dir.path()).expect("env"); + let block = BlockWithParent::new(B256::ZERO, NumHash::new(1, B256::random())); + let diff = BlockStateDiff::default(); + store.set_earliest_block_number(0, B256::ZERO).await.unwrap(); + + // Insert a single entry to be pruned + let addr = B256::random(); + let mut state_diff = BlockStateDiff::default(); + state_diff.post_state.accounts.insert(addr, Some(Account::default())); + store.store_trie_updates(block, state_diff).await.unwrap(); + + // Prune the entry + store.prune_earliest_state(block.block.number + 1, diff).await.unwrap(); + + // Verify the entry was pruned + let tx = store.env.tx().unwrap(); + let mut cur = tx.new_cursor::().unwrap(); + assert!(cur.seek_by_key_subkey(addr, block.block.number).unwrap().is_none()); + let mut pruning_cur = tx.new_cursor::().unwrap(); + assert!(pruning_cur.seek_exact(block.block.number).unwrap().is_none()); + } + + #[tokio::test] + async fn test_prune_earliest_state_multiple_entries_same_block() { + let dir = TempDir::new().unwrap(); + let store = MdbxProofsStorage::new(dir.path()).expect("env"); + let block = BlockWithParent::new(B256::ZERO, NumHash::new(1, B256::random())); + let diff = BlockStateDiff::default(); + store.set_earliest_block_number(0, B256::ZERO).await.unwrap(); + + // Insert multiple entries for the same block + let addr1 = B256::random(); + let addr2 = B256::random(); + let mut state_diff = BlockStateDiff::default(); + state_diff.post_state.accounts.insert(addr1, Some(Account::default())); + state_diff.post_state.accounts.insert(addr2, Some(Account::default())); + store.store_trie_updates(block, state_diff).await.unwrap(); + + // Prune the entries + store.prune_earliest_state(block.block.number + 1, diff).await.unwrap(); + + // Verify the entries were pruned + let tx = store.env.tx().unwrap(); + let mut cur = tx.new_cursor::().unwrap(); + assert!(cur.seek_by_key_subkey(addr1, block.block.number).unwrap().is_none()); + assert!(cur.seek_by_key_subkey(addr2, block.block.number).unwrap().is_none()); + let mut pruning_cur = tx.new_cursor::().unwrap(); + assert!(pruning_cur.seek_exact(block.block.number).unwrap().is_none()); + } + + #[tokio::test] + async fn test_prune_earliest_state_multiple_blocks() { + let dir = TempDir::new().unwrap(); + let store = MdbxProofsStorage::new(dir.path()).expect("env"); + let diff = BlockStateDiff::default(); + let block_1 = BlockWithParent::new(B256::ZERO, NumHash::new(1, B256::random())); + let block_2 = BlockWithParent::new(block_1.block.hash, NumHash::new(2, B256::random())); + store.set_earliest_block_number(0, B256::ZERO).await.unwrap(); + + // Insert entries for multiple blocks + let addr1 = B256::random(); + let addr2 = B256::random(); + let mut state_diff1 = BlockStateDiff::default(); + state_diff1.post_state.accounts.insert(addr1, Some(Account::default())); + store.store_trie_updates(block_1, state_diff1).await.unwrap(); + + let mut state_diff2 = BlockStateDiff::default(); + state_diff2.post_state.accounts.insert(addr2, Some(Account::default())); + store.store_trie_updates(block_2, state_diff2).await.unwrap(); + + // Prune up to block 3 (should remove blocks 1 and 2) + store.prune_earliest_state(3, diff).await.unwrap(); + + // Verify the entries were pruned + let tx = store.env.tx().unwrap(); + let mut cur = tx.new_cursor::().unwrap(); + assert!(cur.seek_by_key_subkey(addr1, 1).unwrap().is_none()); + assert!(cur.seek_by_key_subkey(addr2, 2).unwrap().is_none()); + let mut pruning_cur = tx.new_cursor::().unwrap(); + assert!(pruning_cur.seek_exact(1).unwrap().is_none()); + assert!(pruning_cur.seek_exact(2).unwrap().is_none()); + } + + #[tokio::test] + async fn test_prune_earliest_state_no_op() { + let dir = TempDir::new().unwrap(); + let store = MdbxProofsStorage::new(dir.path()).expect("env"); + let diff = BlockStateDiff::default(); + store.set_earliest_block_number(1, B256::random()).await.unwrap(); + + // Attempt to prune with a new earliest block that is not newer + store.prune_earliest_state(1, diff.clone()).await.unwrap(); + store.prune_earliest_state(0, diff).await.unwrap(); + + // Nothing should have been pruned, this call should not panic or error + } + + #[tokio::test] + async fn test_prune_earliest_state_no_entries_to_prune() { + let dir = TempDir::new().unwrap(); + let store = MdbxProofsStorage::new(dir.path()).expect("env"); + let diff = BlockStateDiff::default(); + store.set_earliest_block_number(1, B256::random()).await.unwrap(); + + // Prune a range where no entries exist + store.prune_earliest_state(10, diff).await.unwrap(); + + // Nothing should have been pruned, this call should not panic or error + } + + #[test] + fn test_block_change_set_crud_operations() { + let dir = TempDir::new().unwrap(); + let store = MdbxProofsStorage::new(dir.path()).expect("env"); + let tx = store.env.tx_mut().expect("rw tx"); + let mut cursor = tx.cursor_write::().expect("cursor"); + + let block_1 = 42u64; + let block_2 = 43u64; + + let entry1 = ChangeSet { + account_trie_keys: vec![StoredNibbles::default()], + storage_trie_keys: vec![], + hashed_account_keys: vec![B256::ZERO], + hashed_storage_keys: vec![], + }; + let entry2 = ChangeSet { + account_trie_keys: vec![], + storage_trie_keys: vec![StorageTrieKey::new(B256::ZERO, StoredNibbles::default())], + hashed_account_keys: vec![], + hashed_storage_keys: vec![HashedStorageKey::new(B256::ZERO, B256::ZERO)], + }; + + // Insert entries + cursor.insert(block_1, &entry1).unwrap(); + cursor.insert(block_2, &entry2).unwrap(); + + // Read entries + let mut walker = cursor.walk(Some(block_1)).unwrap(); + let mut entries = vec![walker.next().unwrap().unwrap().1]; + if let Some(Ok((_, val))) = walker.next() { + entries.push(val); + } + entries.sort(); + let mut expected = vec![entry1.clone(), entry2.clone()]; + expected.sort(); + assert_eq!(entries, expected); + + // Delete entry1 + let mut walker = cursor.walk(Some(block_1)).unwrap(); + while let Some(Ok((_, val))) = walker.next() { + if val == entry1 { + walker.delete_current().unwrap(); + break; + } + } + + // Verify delete + let mut walker = cursor.walk(Some(block_1)).unwrap(); + assert_eq!(walker.next().unwrap().unwrap().1, entry2); + assert!(walker.next().is_none()); } #[tokio::test]