diff --git a/config/src/config/storage_config.rs b/config/src/config/storage_config.rs index 7d08296d9595f..1b3cfb60fdc95 100644 --- a/config/src/config/storage_config.rs +++ b/config/src/config/storage_config.rs @@ -71,7 +71,8 @@ pub struct StorageConfig { pub const NO_OP_STORAGE_PRUNER_CONFIG: StoragePrunerConfig = StoragePrunerConfig { state_store_prune_window: None, ledger_prune_window: None, - pruning_batch_size: 10_000, + ledger_pruning_batch_size: 10_000, + state_store_pruning_batch_size: 10_000, }; #[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -84,21 +85,27 @@ pub struct StoragePrunerConfig { /// being big in size, we might want to configure a smaller window for state store vs other /// store. pub ledger_prune_window: Option, - /// Batch size of the versions to be sent to the pruner - this is to avoid slowdown due to - /// issuing too many DB calls and batch prune instead. - pub pruning_batch_size: usize, + /// Batch size of the versions to be sent to the ledger pruner - this is to avoid slowdown due to + /// issuing too many DB calls and batch prune instead. For ledger pruner, this means the number + /// of versions to prune a time. + pub ledger_pruning_batch_size: usize, + /// Similar to the variable above but for state store pruner. It means the number of stale + /// nodes to prune a time. + pub state_store_pruning_batch_size: usize, } impl StoragePrunerConfig { pub fn new( state_store_prune_window: Option, ledger_store_prune_window: Option, - pruning_batch_size: usize, + ledger_pruning_batch_size: usize, + state_store_pruning_batch_size: usize, ) -> Self { StoragePrunerConfig { state_store_prune_window, ledger_prune_window: ledger_store_prune_window, - pruning_batch_size, + ledger_pruning_batch_size, + state_store_pruning_batch_size, } } } @@ -119,7 +126,10 @@ impl Default for StorageConfig { storage_pruner_config: StoragePrunerConfig { state_store_prune_window: Some(1_000_000), ledger_prune_window: Some(10_000_000), - pruning_batch_size: 500, + ledger_pruning_batch_size: 500, + // A 10k transaction block (touching 60k state values, in the case of the account + // creation benchmark) on a 4B items DB (or 1.33B accounts) yields 300k JMT nodes + state_store_pruning_batch_size: 1_000, }, data_dir: PathBuf::from("/opt/aptos/data"), // Default read/write/connection timeout, in milliseconds diff --git a/execution/executor-benchmark/src/main.rs b/execution/executor-benchmark/src/main.rs index df3fdc9e0ff69..d0007f3e23a89 100644 --- a/execution/executor-benchmark/src/main.rs +++ b/execution/executor-benchmark/src/main.rs @@ -19,7 +19,10 @@ struct PrunerOpt { ledger_prune_window: i64, #[structopt(long, default_value = "500")] - pruning_batch_size: usize, + ledger_pruning_batch_size: usize, + + #[structopt(long, default_value = "500")] + state_store_pruning_batch_size: usize, } impl PrunerOpt { @@ -35,7 +38,8 @@ impl PrunerOpt { } else { Some(self.ledger_prune_window as u64) }, - pruning_batch_size: self.pruning_batch_size, + ledger_pruning_batch_size: self.ledger_pruning_batch_size, + state_store_pruning_batch_size: self.state_store_pruning_batch_size, } } } diff --git a/storage/aptosdb/src/aptosdb_test.rs b/storage/aptosdb/src/aptosdb_test.rs index 47946c002025d..e98835f4d9332 100644 --- a/storage/aptosdb/src/aptosdb_test.rs +++ b/storage/aptosdb/src/aptosdb_test.rs @@ -89,7 +89,8 @@ fn test_error_if_version_is_pruned() { StoragePrunerConfig { state_store_prune_window: Some(0), ledger_prune_window: Some(0), - pruning_batch_size: 1, + ledger_pruning_batch_size: 1, + state_store_pruning_batch_size: 1, }, ); pruner.testonly_update_min_version(&[Some(5), Some(10)]); diff --git a/storage/aptosdb/src/metrics.rs b/storage/aptosdb/src/metrics.rs index 3536089265e07..e2cbb49583b15 100644 --- a/storage/aptosdb/src/metrics.rs +++ b/storage/aptosdb/src/metrics.rs @@ -84,8 +84,19 @@ pub static PRUNER_LEAST_READABLE_VERSION: Lazy = Lazy::new(|| { .unwrap() }); -pub static PRUNER_BATCH_SIZE: Lazy = - Lazy::new(|| register_int_gauge!("pruner_batch_size", "Aptos pruner batch size").unwrap()); +/// Pruner batch size. For ledger pruner, this means the number of versions to be pruned at a time. +/// For state store pruner, this means the number of stale nodes to be pruned at a time. +pub static PRUNER_BATCH_SIZE: Lazy = Lazy::new(|| { + register_int_gauge_vec!( + // metric name + "pruner_batch_size", + // metric description + "Aptos pruner batch size", + // metric labels (dimensions) + &["pruner_name",] + ) + .unwrap() +}); pub static API_LATENCY_SECONDS: Lazy = Lazy::new(|| { register_histogram_vec!( diff --git a/storage/aptosdb/src/pruner/db_pruner.rs b/storage/aptosdb/src/pruner/db_pruner.rs index f383b267fa003..5b6bc5a5826e6 100644 --- a/storage/aptosdb/src/pruner/db_pruner.rs +++ b/storage/aptosdb/src/pruner/db_pruner.rs @@ -37,7 +37,7 @@ pub trait DBPruner { /// Performs the actual pruning, a target version is passed, which is the target the pruner /// tries to prune. - fn prune(&self, max_versions: u64) -> anyhow::Result; + fn prune(&self, batch_size: usize) -> anyhow::Result; /// Initializes the least readable version stored in underlying DB storage fn initialize_min_readable_version(&self) -> anyhow::Result; diff --git a/storage/aptosdb/src/pruner/event_store/test.rs b/storage/aptosdb/src/pruner/event_store/test.rs index 4c96ed9917e68..549200c103baf 100644 --- a/storage/aptosdb/src/pruner/event_store/test.rs +++ b/storage/aptosdb/src/pruner/event_store/test.rs @@ -61,7 +61,8 @@ fn verify_event_store_pruner(events: Vec>) { StoragePrunerConfig { state_store_prune_window: Some(0), ledger_prune_window: Some(0), - pruning_batch_size: 1, + ledger_pruning_batch_size: 1, + state_store_pruning_batch_size: 100, }, ); @@ -108,7 +109,8 @@ fn verify_event_store_pruner_disabled(events: Vec>) { StoragePrunerConfig { state_store_prune_window: Some(0), ledger_prune_window: None, - pruning_batch_size: 1, + ledger_pruning_batch_size: 1, + state_store_pruning_batch_size: 100, }, ); diff --git a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs index bae9499c8769b..ec2db63ab0f68 100644 --- a/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs +++ b/storage/aptosdb/src/pruner/ledger_store/ledger_store_pruner.rs @@ -36,7 +36,7 @@ impl DBPruner for LedgerPruner { LEDGER_PRUNER_NAME } - fn prune(&self, max_versions: u64) -> anyhow::Result { + fn prune(&self, max_versions: usize) -> anyhow::Result { if !self.is_pruning_pending() { return Ok(self.min_readable_version()); } @@ -44,7 +44,7 @@ impl DBPruner for LedgerPruner { let min_readable_version = self.min_readable_version(); // Current target version might be less than the target version to ensure we don't prune // more than max_version in one go. - let current_target_version = self.get_currrent_batch_target(max_versions); + let current_target_version = self.get_currrent_batch_target(max_versions as Version); self.transaction_store_pruner.prune( &mut db_batch, diff --git a/storage/aptosdb/src/pruner/mod.rs b/storage/aptosdb/src/pruner/mod.rs index 6007d54c59a1c..2c68d06e9ae60 100644 --- a/storage/aptosdb/src/pruner/mod.rs +++ b/storage/aptosdb/src/pruner/mod.rs @@ -92,7 +92,13 @@ impl Pruner { .with_label_values(&["ledger_pruner"]) .set((storage_pruner_config.ledger_prune_window.unwrap_or(0)) as i64); - PRUNER_BATCH_SIZE.set(storage_pruner_config.pruning_batch_size as i64); + PRUNER_BATCH_SIZE + .with_label_values(&["ledger_pruner"]) + .set(storage_pruner_config.ledger_pruning_batch_size as i64); + + PRUNER_BATCH_SIZE + .with_label_values(&["state_store_pruner"]) + .set(storage_pruner_config.state_store_pruning_batch_size as i64); let worker = Worker::new( ledger_rocksdb, @@ -113,7 +119,7 @@ impl Pruner { command_sender: Mutex::new(command_sender), min_readable_versions: worker_progress_clone, last_version_sent_to_pruners: Arc::new(Mutex::new(0)), - pruning_batch_size: storage_pruner_config.pruning_batch_size, + pruning_batch_size: storage_pruner_config.ledger_pruning_batch_size, latest_version: Arc::new(Mutex::new(0)), } } @@ -139,6 +145,10 @@ impl Pruner { /// Sends pruning command to the worker thread when necessary. pub fn maybe_wake_pruner(&self, latest_version: Version) { *self.latest_version.lock() = latest_version; + // TODO(zcc): Right now we will still wake up the state store pruner once per pruning batch + // size even though state store pruner not necessarily prune one single version of nodes + // each time. We should make ledger pruner and state store separate and wake up them + // separately. if latest_version >= *self.last_version_sent_to_pruners.as_ref().lock() + self.pruning_batch_size as u64 { diff --git a/storage/aptosdb/src/pruner/state_store/mod.rs b/storage/aptosdb/src/pruner/state_store/mod.rs index badc4eb33eb9b..4cf9a81949300 100644 --- a/storage/aptosdb/src/pruner/state_store/mod.rs +++ b/storage/aptosdb/src/pruner/state_store/mod.rs @@ -6,16 +6,12 @@ use crate::{ pruner::db_pruner::DBPruner, stale_node_index::StaleNodeIndexSchema, OTHER_TIMERS_SECONDS, }; use anyhow::Result; -use aptos_infallible::Mutex; use aptos_jellyfish_merkle::StaleNodeIndex; -use aptos_logger::{error, warn}; +use aptos_logger::error; use aptos_types::transaction::{AtomicVersion, Version}; -use schemadb::{ReadOptions, SchemaBatch, SchemaIterator, DB}; -use std::{ - iter::Peekable, - sync::{atomic::Ordering, Arc}, - time::{Duration, Instant}, -}; +use schemadb::{ReadOptions, SchemaBatch, DB}; +use std::sync::atomic::AtomicBool; +use std::sync::{atomic::Ordering, Arc}; #[cfg(test)] mod test; @@ -24,11 +20,12 @@ pub const STATE_STORE_PRUNER_NAME: &str = "state store pruner"; pub struct StateStorePruner { db: Arc, - max_index_purged_version: AtomicVersion, - index_purged_at: Mutex, /// Keeps track of the target version that the pruner needs to achieve. target_version: AtomicVersion, min_readable_version: AtomicVersion, + // Keeps track of if the target version has been fully pruned to see if there is pruning + // pending. + pruned_to_the_end_of_target_version: AtomicBool, } impl DBPruner for StateStorePruner { @@ -36,33 +33,19 @@ impl DBPruner for StateStorePruner { STATE_STORE_PRUNER_NAME } - fn prune(&self, max_versions: u64) -> Result { + fn prune(&self, batch_size: usize) -> Result { if !self.is_pruning_pending() { return Ok(self.min_readable_version()); } let min_readable_version = self.min_readable_version.load(Ordering::Relaxed); let target_version = self.target_version(); - return match prune_state_store( - &self.db, - min_readable_version, - target_version, - max_versions as usize, - ) { - Ok(new_min_readable_version) => { - self.record_progress(new_min_readable_version); - // Try to purge the log. - if let Err(e) = self.maybe_purge_index() { - warn!( - error = ?e, - "Failed purging state node index, ignored.", - ); - } - Ok(new_min_readable_version) - } + + return match self.prune_state_store(min_readable_version, target_version, batch_size) { + Ok(new_min_readable_version) => Ok(new_min_readable_version), Err(e) => { error!( error = ?e, - "Error pruning stale state nodes.", + "Error pruning stale states.", ); Err(e) // On error, stop retrying vigorously by making next recv() blocking. @@ -102,140 +85,104 @@ impl DBPruner for StateStorePruner { .with_label_values(&["state_store"]) .set(min_readable_version as i64); } + + fn is_pruning_pending(&self) -> bool { + self.target_version() > self.min_readable_version() + || !self + .pruned_to_the_end_of_target_version + .load(Ordering::Relaxed) + } } impl StateStorePruner { - pub fn new(db: Arc, max_index_purged_version: Version, index_purged_at: Instant) -> Self { + pub fn new(db: Arc) -> Self { let pruner = StateStorePruner { db, - max_index_purged_version: AtomicVersion::new(max_index_purged_version), - index_purged_at: Mutex::new(index_purged_at), target_version: AtomicVersion::new(0), min_readable_version: AtomicVersion::new(0), + pruned_to_the_end_of_target_version: AtomicBool::new(false), }; pruner.initialize(); pruner } - /// Purge the stale node index so that after restart not too much already pruned stuff is dealt - /// with again (although no harm is done deleting those then non-existent things.) - /// - /// We issue (range) deletes on the index only periodically instead of after every pruning batch - /// to avoid sending too many deletions to the DB, which takes disk space and slows it down. - fn maybe_purge_index(&self) -> Result<()> { - const MIN_INTERVAL: Duration = Duration::from_secs(10); - const MIN_VERSIONS: u64 = 60000; - - let min_readable_version = self.min_readable_version.load(Ordering::Relaxed); - let max_index_purged_version = self.max_index_purged_version.load(Ordering::Relaxed); - - // A deletion is issued at most once in one minute and when the pruner has progressed by at - // least 60000 versions (assuming the pruner deletes as slow as 1000 versions per second, - // this imposes at most one minute of work in vain after restarting.) - let now = Instant::now(); - if now - *self.index_purged_at.lock() > MIN_INTERVAL - && min_readable_version - max_index_purged_version > MIN_VERSIONS - { - self.db.range_delete::( - &(max_index_purged_version + 1), // begin is inclusive - &(min_readable_version + 1), // end is exclusive - )?; - // The index items at min_readable_version has been consumed already because they - // indicate nodes that became stale at that version, keeping that version readable, - // hence is purged above. - self.max_index_purged_version - .store(min_readable_version, Ordering::Relaxed); - *self.index_purged_at.lock() = now; - } - Ok(()) - } - - pub fn target_version(&self) -> Version { - self.target_version.load(Ordering::Relaxed) - } -} - -pub fn prune_state_store( - db: &DB, - min_readable_version: Version, - target_version: Version, - max_versions: usize, -) -> Result { - let indices = - StaleNodeIndicesByVersionIterator::new(db, min_readable_version + 1, target_version)? - .take(max_versions) // Iterator>> - .collect::>>()? // now Vec> - .into_iter() - .flatten() - .collect::>(); - - if indices.is_empty() { - Ok(target_version) - } else { - let _timer = OTHER_TIMERS_SECONDS - .with_label_values(&["pruner_commit"]) - .start_timer(); - let new_min_readable_version = indices.last().expect("Should exist.").stale_since_version; - let mut batch = SchemaBatch::new(); - indices - .into_iter() - .try_for_each(|index| batch.delete::(&index.node_key))?; - db.write_schemas(batch)?; - Ok(new_min_readable_version) - } -} - -struct StaleNodeIndicesByVersionIterator<'a> { - inner: Peekable>, - target_min_readable_version: Version, -} - -impl<'a> StaleNodeIndicesByVersionIterator<'a> { - fn new( - db: &'a DB, + pub fn prune_state_store( + &self, min_readable_version: Version, - target_min_readable_version: Version, - ) -> Result { - let mut iter = db.iter::(ReadOptions::default())?; - iter.seek(&min_readable_version)?; - - Ok(Self { - inner: iter.peekable(), - target_min_readable_version, - }) + target_version: Version, + batch_size: usize, + ) -> anyhow::Result { + assert_ne!(batch_size, 0); + let (indices, is_end_of_target_version) = + self.get_stale_node_indices(min_readable_version, target_version, batch_size)?; + if indices.is_empty() { + self.pruned_to_the_end_of_target_version + .store(is_end_of_target_version, Ordering::Relaxed); + self.record_progress(target_version); + Ok(target_version) + } else { + let _timer = OTHER_TIMERS_SECONDS + .with_label_values(&["state_pruner_commit"]) + .start_timer(); + let new_min_readable_version = + indices.last().expect("Should exist.").stale_since_version; + let mut batch = SchemaBatch::new(); + // Delete stale nodes. + indices.into_iter().try_for_each(|index| { + batch.delete::(&index.node_key)?; + batch.delete::(&index) + })?; + // Delete the stale node indices. + self.db.write_schemas(batch)?; + self.pruned_to_the_end_of_target_version + .store(is_end_of_target_version, Ordering::Relaxed); + self.record_progress(new_min_readable_version); + Ok(new_min_readable_version) + } } - fn next_result(&mut self) -> Result>> { - match self.inner.next().transpose()? { - None => Ok(None), - Some((index, _)) => { - let version = index.stale_since_version; - if version > self.target_min_readable_version { - return Ok(None); - } - - let mut indices = vec![index]; - while let Some(res) = self.inner.peek() { - if let Ok((index_ref, _)) = res { - if index_ref.stale_since_version != version { - break; - } - } - - let (index, _) = self.inner.next().transpose()?.expect("Should be Some."); - indices.push(index); + fn get_stale_node_indices( + &self, + start_version: Version, + target_version: Version, + batch_size: usize, + ) -> Result<(Vec, bool)> { + let mut indices = Vec::new(); + let mut iter = self + .db + .iter::(ReadOptions::default())?; + iter.seek(&start_version)?; + + let mut num_items = batch_size; + while num_items > 0 { + if let Some(item) = iter.next() { + let (index, _) = item?; + if index.stale_since_version > target_version { + return Ok((indices, /*is_end_of_target_version=*/ true)); } - - Ok(Some(indices)) + num_items -= 1; + indices.push(index); + } else { + // No more stale nodes. + break; } } - } -} -impl<'a> Iterator for StaleNodeIndicesByVersionIterator<'a> { - type Item = Result>; + // This is to deal with the case where number of items reaches 0 but there are still + // stale nodes in the indices. + if let Some(next_item) = iter.next() { + let (next_index, _) = next_item?; + if next_index.stale_since_version > target_version { + return Ok((indices, /*is_end_of_target_version=*/ true)); + } + } - fn next(&mut self) -> Option { - self.next_result().transpose() + // This is to deal with the case where we reaches the end of the indices regardless of + // whether we have `num_items` in `indices`. + let mut is_end_of_target_version = true; + if let Some(last_index) = indices.last() { + is_end_of_target_version = last_index.stale_since_version == target_version; + } + Ok((indices, is_end_of_target_version)) } } diff --git a/storage/aptosdb/src/pruner/state_store/test.rs b/storage/aptosdb/src/pruner/state_store/test.rs index 5f5df63e499e1..b362307726a6b 100644 --- a/storage/aptosdb/src/pruner/state_store/test.rs +++ b/storage/aptosdb/src/pruner/state_store/test.rs @@ -6,8 +6,10 @@ use std::collections::HashMap; use aptos_crypto::HashValue; use aptos_temppath::TempPath; use aptos_types::state_store::{state_key::StateKey, state_value::StateValue}; +use schemadb::ReadOptions; use storage_interface::{jmt_update_refs, jmt_updates, DbReader}; +use crate::stale_node_index::StaleNodeIndexSchema; use crate::{change_set::ChangeSet, pruner::*, state_store::StateStore, AptosDB}; fn put_value_set( @@ -72,7 +74,8 @@ fn test_state_store_pruner() { StoragePrunerConfig { state_store_prune_window: Some(0), ledger_prune_window: Some(0), - pruning_batch_size: prune_batch_size, + ledger_pruning_batch_size: prune_batch_size, + state_store_pruning_batch_size: prune_batch_size, }, ); @@ -140,6 +143,149 @@ fn test_state_store_pruner() { } } +#[test] +fn test_state_store_pruner_partial_version() { + // ```text + // | batch | 0 | 1 | 2 | + // | address1 | value1 | | | + // | address2 | value2 | value2_update | | + // | address3 | | value3 | value3_update | + // ``` + // The stale node indexes will have 4 entries in total. + // ``` + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: 2 } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: d } } + // ``` + // On version 1, there are two entries, one changes address2 and the other changes the root node. + // On version 2, there are two entries, one changes address3 and the other changes the root node. + let key1 = StateKey::Raw(String::from("test_key1").into_bytes()); + let key2 = StateKey::Raw(String::from("test_key2").into_bytes()); + let key3 = StateKey::Raw(String::from("test_key3").into_bytes()); + + let value1 = StateValue::from(String::from("test_val1").into_bytes()); + let value2 = StateValue::from(String::from("test_val2").into_bytes()); + let value2_update = StateValue::from(String::from("test_val2_update").into_bytes()); + let value3 = StateValue::from(String::from("test_val3").into_bytes()); + let value3_update = StateValue::from(String::from("test_val3_update").into_bytes()); + + let prune_batch_size = 1; + let tmp_dir = TempPath::new(); + let aptos_db = AptosDB::new_for_test(&tmp_dir); + let state_store = &StateStore::new( + Arc::clone(&aptos_db.ledger_db), + Arc::clone(&aptos_db.state_merkle_db), + false, /* hack_for_tests */ + ); + let pruner = Pruner::new( + Arc::clone(&aptos_db.ledger_db), + Arc::clone(&aptos_db.state_merkle_db), + StoragePrunerConfig { + state_store_prune_window: Some(0), + ledger_prune_window: Some(0), + ledger_pruning_batch_size: prune_batch_size, + state_store_pruning_batch_size: prune_batch_size, + }, + ); + + let _root0 = put_value_set( + &aptos_db.ledger_db, + state_store, + vec![(key1.clone(), value1.clone()), (key2.clone(), value2)], + 0, /* version */ + ); + let _root1 = put_value_set( + &aptos_db.ledger_db, + state_store, + vec![ + (key2.clone(), value2_update.clone()), + (key3.clone(), value3.clone()), + ], + 1, /* version */ + ); + let _root2 = put_value_set( + &aptos_db.ledger_db, + state_store, + vec![(key3.clone(), value3_update.clone())], + 2, /* version */ + ); + + // Prune till version=0. This should basically be a no-op + { + pruner + .wake_and_wait( + 0, /* latest_version */ + PrunerIndex::StateStorePrunerIndex as usize, + ) + .unwrap(); + verify_state_in_store(state_store, key1.clone(), Some(&value1), 1); + verify_state_in_store(state_store, key2.clone(), Some(&value2_update), 1); + verify_state_in_store(state_store, key3.clone(), Some(&value3), 1); + } + + // Test for batched pruning, since we use a batch size of 1, updating the latest version to 1 + // should prune 1 stale node with the version 0. + { + assert!(pruner + .wake_and_wait( + 1, /* latest_version */ + PrunerIndex::StateStorePrunerIndex as usize, + ) + .is_ok()); + assert!(state_store + .get_state_value_with_proof_by_version(&key1, 0_u64) + .is_err()); + // root1 is still there. + verify_state_in_store(state_store, key1.clone(), Some(&value1), 1); + verify_state_in_store(state_store, key2.clone(), Some(&value2_update), 1); + verify_state_in_store(state_store, key3.clone(), Some(&value3), 1); + } + // Prune 3 more times. All version 0 and 1 stale nodes should be gone. + { + assert!(pruner + .wake_and_wait( + 2, /* latest_version */ + PrunerIndex::StateStorePrunerIndex as usize, + ) + .is_ok()); + assert!(pruner + .wake_and_wait( + 2, /* latest_version */ + PrunerIndex::StateStorePrunerIndex as usize, + ) + .is_ok()); + + assert!(pruner + .wake_and_wait( + 2, /* latest_version */ + PrunerIndex::StateStorePrunerIndex as usize, + ) + .is_ok()); + assert!(state_store + .get_state_value_with_proof_by_version(&key1, 0_u64) + .is_err()); + assert!(state_store + .get_state_value_with_proof_by_version(&key2, 1_u64) + .is_err()); + // root2 is still there. + verify_state_in_store(state_store, key1, Some(&value1), 2); + verify_state_in_store(state_store, key2, Some(&value2_update), 2); + verify_state_in_store(state_store, key3, Some(&value3_update), 2); + } + + // Make sure all stale indices are gone. + assert_eq!( + aptos_db + .state_merkle_db + .iter::(ReadOptions::default()) + .unwrap() + .collect::>() + .len(), + 0 + ); +} + #[test] fn test_state_store_pruner_disabled() { let key = StateKey::Raw(String::from("test_key1").into_bytes()); @@ -159,7 +305,8 @@ fn test_state_store_pruner_disabled() { StoragePrunerConfig { state_store_prune_window: None, ledger_prune_window: Some(0), - pruning_batch_size: prune_batch_size, + ledger_pruning_batch_size: prune_batch_size, + state_store_pruning_batch_size: prune_batch_size, }, ); @@ -258,7 +405,8 @@ fn test_worker_quit_eagerly() { StoragePrunerConfig { state_store_prune_window: Some(1), ledger_prune_window: Some(1), - pruning_batch_size: 100, + ledger_pruning_batch_size: 100, + state_store_pruning_batch_size: 100, }, ); command_sender diff --git a/storage/aptosdb/src/pruner/transaction_store/test.rs b/storage/aptosdb/src/pruner/transaction_store/test.rs index a0999cb008f53..b780b0c123da6 100644 --- a/storage/aptosdb/src/pruner/transaction_store/test.rs +++ b/storage/aptosdb/src/pruner/transaction_store/test.rs @@ -53,7 +53,8 @@ fn verify_write_set_pruner(write_sets: Vec) { StoragePrunerConfig { state_store_prune_window: Some(0), ledger_prune_window: Some(0), - pruning_batch_size: 1, + ledger_pruning_batch_size: 1, + state_store_pruning_batch_size: 100, }, ); @@ -102,7 +103,8 @@ fn verify_txn_store_pruner( StoragePrunerConfig { state_store_prune_window: Some(0), ledger_prune_window: Some(0), - pruning_batch_size: 1, + ledger_pruning_batch_size: 1, + state_store_pruning_batch_size: 100, }, ); diff --git a/storage/aptosdb/src/pruner/utils.rs b/storage/aptosdb/src/pruner/utils.rs index 91236e37ca8f7..be3c990ec4d4b 100644 --- a/storage/aptosdb/src/pruner/utils.rs +++ b/storage/aptosdb/src/pruner/utils.rs @@ -13,7 +13,7 @@ use crate::{ use aptos_config::config::StoragePrunerConfig; use aptos_infallible::Mutex; use schemadb::DB; -use std::{sync::Arc, time::Instant}; +use std::sync::Arc; /// A useful utility function to instantiate all db pruners. pub fn create_db_pruners( @@ -23,11 +23,9 @@ pub fn create_db_pruners( ) -> Vec>>> { vec![ if storage_pruner_config.state_store_prune_window.is_some() { - Some(Mutex::new(Arc::new(StateStorePruner::new( - Arc::clone(&state_merkle_db), - 0, - Instant::now(), - )))) + Some(Mutex::new(Arc::new(StateStorePruner::new(Arc::clone( + &state_merkle_db, + ))))) } else { None }, diff --git a/storage/aptosdb/src/pruner/worker.rs b/storage/aptosdb/src/pruner/worker.rs index 4b04ba8a0b14f..0b2bb31aa4626 100644 --- a/storage/aptosdb/src/pruner/worker.rs +++ b/storage/aptosdb/src/pruner/worker.rs @@ -4,6 +4,7 @@ use aptos_types::transaction::Version; use schemadb::DB; use crate::pruner::{db_pruner::DBPruner, utils}; +use crate::PrunerIndex; use aptos_config::config::StoragePrunerConfig; use aptos_infallible::Mutex; use itertools::zip_eq; @@ -24,7 +25,10 @@ pub struct Worker { /// Indicates if there's NOT any pending work to do currently, to hint /// `Self::receive_commands()` to `recv()` blocking-ly. blocking_recv: bool, - max_version_to_prune_per_batch: u64, + /// Max items to prune per batch. For the ledger pruner, this means the max versions to prune + /// and for the state pruner, this means the max stale nodes to prune. + ledger_store_max_versions_to_prune_per_batch: u64, + state_store_max_nodes_to_prune_per_batch: u64, } impl Worker { @@ -42,26 +46,51 @@ impl Worker { command_receiver, min_readable_versions, blocking_recv: true, - max_version_to_prune_per_batch: storage_pruner_config.pruning_batch_size as u64, + ledger_store_max_versions_to_prune_per_batch: storage_pruner_config + .ledger_pruning_batch_size + as u64, + state_store_max_nodes_to_prune_per_batch: storage_pruner_config + .state_store_pruning_batch_size + as u64, } } pub(crate) fn work(mut self) { + assert_eq!(self.db_pruners.len(), 2); while self.receive_commands() { // Process a reasonably small batch of work before trying to receive commands again, // in case `Command::Quit` is received (that's when we should quit.) let mut error_in_pruning = false; - for db_pruner in self.db_pruners.iter().flatten() { - let result = db_pruner.lock().prune(self.max_version_to_prune_per_batch); - result.map_err(|_| error_in_pruning = true).ok(); - } let mut pruning_pending = false; - for db_pruner in self.db_pruners.iter().flatten() { - // if any of the pruner has pending pruning, then we don't block on receive - if db_pruner.lock().is_pruning_pending() { + + if let Some(state_store_pruner_locked) = + &self.db_pruners[PrunerIndex::StateStorePrunerIndex as usize] + { + let state_store_pruner = state_store_pruner_locked.lock(); + state_store_pruner + .prune(self.state_store_max_nodes_to_prune_per_batch as usize) + .map_err(|_| error_in_pruning = true) + .ok(); + + if state_store_pruner.is_pruning_pending() { pruning_pending = true; } } + + if let Some(ledger_pruner_locked) = + &self.db_pruners[PrunerIndex::LedgerPrunerIndex as usize] + { + let ledger_pruner = ledger_pruner_locked.lock(); + ledger_pruner + .prune(self.ledger_store_max_versions_to_prune_per_batch as usize) + .map_err(|_| error_in_pruning = true) + .ok(); + + if ledger_pruner.is_pruning_pending() { + pruning_pending = true; + } + } + if !pruning_pending || error_in_pruning { self.blocking_recv = true; } else { diff --git a/storage/aptosdb/src/state_store/state_store_test.rs b/storage/aptosdb/src/state_store/state_store_test.rs index 6541747750b8c..b1573717aa02e 100644 --- a/storage/aptosdb/src/state_store/state_store_test.rs +++ b/storage/aptosdb/src/state_store/state_store_test.rs @@ -15,7 +15,7 @@ use aptos_types::{ }; use storage_interface::{jmt_update_refs, jmt_updates, DbReader, StateSnapshotReceiver}; -use crate::{pruner, AptosDB}; +use crate::{pruner::state_store::StateStorePruner, AptosDB}; use super::*; @@ -43,18 +43,14 @@ fn put_value_set( } fn prune_stale_indices( - store: &StateStore, + state_pruner: &StateStorePruner, min_readable_version: Version, target_min_readable_version: Version, limit: usize, -) { - pruner::state_store::prune_state_store( - &store.state_merkle_db, - min_readable_version, - target_min_readable_version, - limit, - ) - .unwrap(); +) -> Version { + state_pruner + .prune_state_store(min_readable_version, target_min_readable_version, limit) + .unwrap() } fn verify_value_and_proof( @@ -240,7 +236,7 @@ fn test_get_values_by_key_prefix() { } #[test] -fn test_retired_records() { +fn test_stale_node_index() { let key1 = StateKey::Raw(String::from("test_key1").into_bytes()); let key2 = StateKey::Raw(String::from("test_key2").into_bytes()); let key3 = StateKey::Raw(String::from("test_key3").into_bytes()); @@ -254,6 +250,7 @@ fn test_retired_records() { let tmp_dir = TempPath::new(); let db = AptosDB::new_for_test(&tmp_dir); let store = &db.state_store; + let pruner = StateStorePruner::new(Arc::clone(&db.state_merkle_db)); // Update. // ```text @@ -262,7 +259,17 @@ fn test_retired_records() { // | address2 | value2 | value2_update | | // | address3 | | value3 | value3_update | // ``` - let root0 = put_value_set( + // The stale node indexes will have 4 entries in total. + // ``` + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: 2 } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: d } } + // ``` + // On version 1, there are two entries, one changes address2 and the other changes the root node. + // On version 2, there are two entries, one changes address3 and the other changes the root node. + + let _root0 = put_value_set( store, vec![(key1.clone(), value1.clone()), (key2.clone(), value2)], 0, /* version */ @@ -285,23 +292,145 @@ fn test_retired_records() { ); // Verify. - // Prune with limit=0, nothing is gone. + // Prune with limit = 2 and target_min_readable_version = 2, two entries with + // stale_since_version = 1 will be pruned. min_readable_version will be promoted to 1. { - prune_stale_indices( - store, 0, /* min_readable_version */ - 1, /* target_min_readable_version */ - 0, /* limit */ + assert_eq!( + prune_stale_indices( + &pruner, 0, /* min_readable_version */ + 2, /* target_min_readable_version */ + 2 /* limit */ + ), + 1 ); - verify_value_and_proof(store, key1.clone(), Some(&value1), 0, root0); + assert!(store + .get_state_value_with_proof_by_version(&key2, 0) + .is_err()); + // root1 is still there. + verify_value_and_proof(store, key1.clone(), Some(&value1), 1, root1); + verify_value_and_proof(store, key2.clone(), Some(&value2_update), 1, root1); + verify_value_and_proof(store, key3.clone(), Some(&value3), 1, root1); } - // Prune till version=1. + // Prune with limit = 1 and target_min_readable_version = 2, one entries with + // stale_since_version = 2 will be pruned. Min readable version will change even though there + // is one more entry with stale_since_version = 2 remaining. + { + assert_eq!( + prune_stale_indices( + &pruner, 1, /* min_readable_version */ + 2, /* target_min_readable_version */ + 1, /* limit */ + ), + 2 + ); + // root1 is gone. + assert!(store + .get_state_value_with_proof_by_version(&key2, 1) + .is_err()); + // root2 is still there. + verify_value_and_proof(store, key1.clone(), Some(&value1), 2, root2); + verify_value_and_proof(store, key2.clone(), Some(&value2_update), 2, root2); + verify_value_and_proof(store, key3.clone(), Some(&value3_update), 2, root2); + } + // Prune with limit = 1 and target_min_readable_version = 2, one entries with + // stale_since_version = 2 will be pruned. Min_readable_version will change since there is + // one more entry with stale_since_version = 2 remaining. + { + assert_eq!( + prune_stale_indices( + &pruner, 1, /* min_readable_version */ + 2, /* target_min_readable_version */ + 1, /* limit */ + ), + 2 + ); + // root1 is gone. + assert!(store + .get_state_value_with_proof_by_version(&key2, 1) + .is_err()); + // root2 is still there. + verify_value_and_proof(store, key1, Some(&value1), 2, root2); + verify_value_and_proof(store, key2, Some(&value2_update), 2, root2); + verify_value_and_proof(store, key3, Some(&value3_update), 2, root2); + } +} + +#[test] +fn test_stale_node_index_with_target_version() { + let key1 = StateKey::Raw(String::from("test_key1").into_bytes()); + let key2 = StateKey::Raw(String::from("test_key2").into_bytes()); + let key3 = StateKey::Raw(String::from("test_key3").into_bytes()); + + let value1 = StateValue::from(String::from("test_val1").into_bytes()); + let value2 = StateValue::from(String::from("test_val2").into_bytes()); + let value2_update = StateValue::from(String::from("test_val2_update").into_bytes()); + let value3 = StateValue::from(String::from("test_val3").into_bytes()); + let value3_update = StateValue::from(String::from("test_val3_update").into_bytes()); + + let tmp_dir = TempPath::new(); + let db = AptosDB::new_for_test(&tmp_dir); + let store = &db.state_store; + let pruner = StateStorePruner::new(Arc::clone(&db.state_merkle_db)); + + // Update. + // ```text + // | batch | 0 | 1 | 2 | + // | address1 | value1 | | | + // | address2 | value2 | value2_update | | + // | address3 | | value3 | value3_update | + // ``` + // The stale node indexes will have 4 entries in total. + // ``` + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: 2 } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: d } } + // ``` + // On version 1, there are two entries, one changes address2 and the other changes the root node. + // On version 2, there are two entries, one changes address3 and the other changes the root node. + + let _root0 = put_value_set( + store, + vec![(key1.clone(), value1.clone()), (key2.clone(), value2)], + 0, /* version */ + None, + ); + let root1 = put_value_set( + store, + vec![ + (key2.clone(), value2_update.clone()), + (key3.clone(), value3.clone()), + ], + 1, /* version */ + Some(0), + ); + let root2 = put_value_set( + store, + vec![(key3.clone(), value3_update.clone())], + 2, /* version */ + Some(1), + ); + + // Verify. + // Prune with limit = 2 and target_min_readable_version = 1, two entries with + // stale_since_version = 1 will be pruned. min_readable_version will be promoted to 1. { - prune_stale_indices( - store, 0, /* min_readable_version */ - 1, /* target_min_readable_version */ - 100, /* limit */ + assert_eq!( + prune_stale_indices( + &pruner, 0, /* min_readable_version */ + 1, /* target_min_readable_version */ + 2 /* limit */ + ), + 1 ); // root0 is gone. + println!( + "store.get_state_value_with_proof_by_version(&key2, 0):{:?}", + store + .get_state_value_with_proof_by_version(&key2, 0) + .err() + .unwrap() + ); assert!(store .get_state_value_with_proof_by_version(&key2, 0) .is_err()); @@ -310,14 +439,101 @@ fn test_retired_records() { verify_value_and_proof(store, key2.clone(), Some(&value2_update), 1, root1); verify_value_and_proof(store, key3.clone(), Some(&value3), 1, root1); } - // Prune till version=2. + // Prune with limit = 1 and target_min_readable_version = 1, entries with + // stale_since_version = 2 will not be pruned. { - prune_stale_indices( - store, 1, /* min_readable_version */ - 2, /* target_min_readable_version */ - 100, /* limit */ + assert_eq!( + prune_stale_indices( + &pruner, 1, /* min_readable_version */ + 1, /* target_min_readable_version */ + 1, /* limit */ + ), + 1 ); - // root1 is gone. + // root1 is still there. + verify_value_and_proof(store, key1.clone(), Some(&value1), 1, root1); + verify_value_and_proof(store, key2.clone(), Some(&value2_update), 1, root1); + verify_value_and_proof(store, key3.clone(), Some(&value3), 1, root1); + // root2 is still there. + verify_value_and_proof(store, key1, Some(&value1), 2, root2); + verify_value_and_proof(store, key2, Some(&value2_update), 2, root2); + verify_value_and_proof(store, key3, Some(&value3_update), 2, root2); + } +} + +#[test] +fn test_stale_node_index_all_at_once() { + let key1 = StateKey::Raw(String::from("test_key1").into_bytes()); + let key2 = StateKey::Raw(String::from("test_key2").into_bytes()); + let key3 = StateKey::Raw(String::from("test_key3").into_bytes()); + + let value1 = StateValue::from(String::from("test_val1").into_bytes()); + let value2 = StateValue::from(String::from("test_val2").into_bytes()); + let value2_update = StateValue::from(String::from("test_val2_update").into_bytes()); + let value3 = StateValue::from(String::from("test_val3").into_bytes()); + let value3_update = StateValue::from(String::from("test_val3_update").into_bytes()); + + let tmp_dir = TempPath::new(); + let db = AptosDB::new_for_test(&tmp_dir); + let store = &db.state_store; + let pruner = StateStorePruner::new(Arc::clone(&db.state_merkle_db)); + + // Update. + // ```text + // | batch | 0 | 1 | 2 | + // | address1 | value1 | | | + // | address2 | value2 | value2_update | | + // | address3 | | value3 | value3_update | + // ``` + // The stale node indexes will have 4 entries in total. + // ``` + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 1, node_key: NodeKey { version: 0, nibble_path: 2 } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: } } + // index: StaleNodeIndex { stale_since_version: 2, node_key: NodeKey { version: 1, nibble_path: d } } + // ``` + // On version 1, there are two entries, one changes address2 and the other changes the root node. + // On version 2, there are two entries, one changes address3 and the other changes the root node. + + let _root0 = put_value_set( + store, + vec![(key1.clone(), value1.clone()), (key2.clone(), value2)], + 0, /* version */ + None, + ); + let _root1 = put_value_set( + store, + vec![ + (key2.clone(), value2_update.clone()), + (key3.clone(), value3), + ], + 1, /* version */ + Some(0), + ); + let root2 = put_value_set( + store, + vec![(key3.clone(), value3_update.clone())], + 2, /* version */ + Some(1), + ); + + // Verify. + // Prune with limit = 5, there are 4 stale index entries in total and all the stale index + // entries will be pruned. + { + assert_eq!( + prune_stale_indices( + &pruner, 0, /* min_readable_version */ + 2, /* target_min_readable_version */ + 5, /* limit */ + ), + 2 + ); + // root0 is gone. + assert!(store + .get_state_value_with_proof_by_version(&key2, 1) + .is_err()); + assert!(store .get_state_value_with_proof_by_version(&key2, 1) .is_err());