From eee9a08376f9424a2da7689103f9025e14b19156 Mon Sep 17 00:00:00 2001 From: Tyera Eulberg Date: Tue, 2 Jun 2020 19:49:31 -0600 Subject: [PATCH] Purge TransactionStatus and AddressSignatures exactly from ledger-tool (#10358) * Add failing test * Add execution path to purge primary-index columns exactly * Fail gracefully if older TransactionStatus rocksdb keys are present * Remove columns_empty check for special columns * Move blockstore purge methods to submodule * Remove unused column empty check --- core/src/ledger_cleanup_service.rs | 3 +- ledger/src/blockstore.rs | 644 ++---------- ledger/src/blockstore/blockstore_purge.rs | 1111 +++++++++++++++++++++ ledger/src/blockstore_db.rs | 26 +- 4 files changed, 1210 insertions(+), 574 deletions(-) create mode 100644 ledger/src/blockstore/blockstore_purge.rs diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 50345745e089ca..ffcb7facb52ae5 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -1,6 +1,6 @@ //! The `ledger_cleanup_service` drops older ledger data to limit disk space usage -use solana_ledger::blockstore::Blockstore; +use solana_ledger::blockstore::{Blockstore, PurgeType}; use solana_ledger::blockstore_db::Result as BlockstoreResult; use solana_measure::measure::Measure; use solana_sdk::clock::Slot; @@ -172,6 +172,7 @@ impl LedgerCleanupService { first_slot, lowest_cleanup_slot, delay_between_purges, + PurgeType::PrimaryIndex, ); purge_time.stop(); info!("{}", purge_time); diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f838c61c4438ea..71390f5068f7f1 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -56,6 +56,8 @@ use std::{ time::Duration, }; +pub mod blockstore_purge; + pub const BLOCKSTORE_DIRECTORY: &str = "rocksdb"; thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() @@ -64,6 +66,12 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: .build() .unwrap())); +thread_local!(static PAR_THREAD_POOL_ALL_CPUS: RefCell = RefCell::new(rayon::ThreadPoolBuilder::new() + .num_threads(num_cpus::get()) + .thread_name(|ix| format!("blockstore_{}", ix)) + .build() + .unwrap())); + pub const MAX_COMPLETED_SLOTS_IN_CHANNEL: usize = 100_000; pub const MAX_TURBINE_PROPAGATION_IN_MS: u64 = 100; pub const MAX_TURBINE_DELAY_IN_TICKS: u64 = MAX_TURBINE_PROPAGATION_IN_MS / MS_PER_TICK; @@ -75,6 +83,13 @@ const TIMESTAMP_SLOT_RANGE: usize = 16; pub const MAX_DATA_SHREDS_PER_SLOT: usize = 32_768; pub type CompletedSlotsReceiver = Receiver>; +type CompletedRanges = Vec<(u32, u32)>; + +#[derive(Clone, Copy)] +pub enum PurgeType { + Exact, + PrimaryIndex, +} // ledger window pub struct Blockstore { @@ -301,235 +316,6 @@ impl Blockstore { false } - /// Silently deletes all blockstore column families in the range [from_slot,to_slot] - /// Dangerous; Use with care: - /// Does not check for integrity and does not update slot metas that refer to deleted slots - /// Modifies multiple column families simultaneously - pub fn purge_slots_with_delay( - &self, - from_slot: Slot, - to_slot: Slot, - delay_between_purges: Option, - ) { - // if there's no upper bound, split the purge request into batches of 1000 slots - const PURGE_BATCH_SIZE: u64 = 1000; - let mut batch_start = from_slot; - while batch_start < to_slot { - let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); - match self.run_purge(batch_start, batch_end) { - Ok(_all_columns_purged) => { - batch_start = batch_end; - - if let Some(ref duration) = delay_between_purges { - // Cooperate with other blockstore users - std::thread::sleep(*duration); - } - } - Err(e) => { - error!( - "Error: {:?}; Purge failed in range {:?} to {:?}", - e, batch_start, batch_end - ); - break; - } - } - } - - if !self.no_compaction { - if let Err(e) = self.compact_storage(from_slot, to_slot) { - // This error is not fatal and indicates an internal error - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - e, from_slot, to_slot - ); - } - } - } - - pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) { - self.purge_slots_with_delay(from_slot, to_slot, None) - } - - /// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the - /// [from_slot,to_slot] range - /// - /// Dangerous; Use with care - pub fn purge_from_next_slots(&self, from_slot: Slot, to_slot: Slot) { - for (slot, mut meta) in self - .slot_meta_iterator(0) - .expect("unable to iterate over meta") - { - if slot > to_slot { - break; - } - - let original_len = meta.next_slots.len(); - meta.next_slots - .retain(|slot| *slot < from_slot || *slot > to_slot); - if meta.next_slots.len() != original_len { - info!("purge_from_next_slots: adjusted meta for slot {}", slot); - self.put_meta_bytes( - slot, - &bincode::serialize(&meta).expect("couldn't update meta"), - ) - .expect("couldn't update meta"); - } - } - } - - // Returns whether or not all columns successfully purged the slot range - fn run_purge(&self, from_slot: Slot, to_slot: Slot) -> Result { - let mut write_batch = self - .db - .batch() - .expect("Database Error: Failed to get write batch"); - // delete range cf is not inclusive - let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); - - let mut delete_range_timer = Measure::start("delete_range"); - let mut columns_empty = self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, from_slot, to_slot) - .unwrap_or(false); - let mut w_active_transaction_status_index = - self.active_transaction_status_index.write().unwrap(); - if let Some(index) = self.toggle_transaction_status_index( - &mut write_batch, - &mut w_active_transaction_status_index, - to_slot, - )? { - columns_empty &= self - .db - .delete_range_cf::(&mut write_batch, index, index + 1) - .unwrap_or(false) - & self - .db - .delete_range_cf::(&mut write_batch, index, index + 1) - .unwrap_or(false); - } - delete_range_timer.stop(); - let mut write_timer = Measure::start("write_batch"); - if let Err(e) = self.db.write(write_batch) { - error!( - "Error: {:?} while submitting write batch for slot {:?} retrying...", - e, from_slot - ); - return Err(e); - } - write_timer.stop(); - datapoint_info!( - "blockstore-purge", - ("from_slot", from_slot as i64, i64), - ("to_slot", to_slot as i64, i64), - ("delete_range_us", delete_range_timer.as_us() as i64, i64), - ("write_batch_us", write_timer.as_us() as i64, i64) - ); - Ok(columns_empty) - } - - pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { - info!("compact_storage: from {} to {}", from_slot, to_slot); - let mut compact_timer = Measure::start("compact_range"); - let result = self - .meta_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .db - .column::() - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .data_shred_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .code_shred_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .dead_slots_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .duplicate_slots_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .erasure_meta_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .orphans_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .index_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .transaction_status_cf - .compact_range(0, 2) - .unwrap_or(false) - && self - .address_signatures_cf - .compact_range(0, 2) - .unwrap_or(false) - && self - .transaction_status_index_cf - .compact_range(0, 2) - .unwrap_or(false) - && self - .rewards_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false); - compact_timer.stop(); - if !result { - info!("compact_storage incomplete"); - } - datapoint_info!( - "blockstore-compact", - ("compact_range_us", compact_timer.as_us() as i64, i64), - ); - Ok(result) - } - pub fn erasure_meta(&self, slot: Slot, set_index: u64) -> Result> { self.erasure_meta_cf.get((slot, set_index)) } @@ -987,7 +773,7 @@ impl Blockstore { .expect("Couldn't fetch from SlotMeta column family") { // Clear all slot related information - self.run_purge(slot, slot) + self.run_purge(slot, slot, PurgeType::PrimaryIndex) .expect("Purge database operations failed"); // Reinsert parts of `slot_meta` that are important to retain, like the `next_slots` @@ -2083,29 +1869,11 @@ impl Blockstore { return Err(BlockstoreError::DeadSlot); } - // lowest_cleanup_slot is the last slot that was not cleaned up by - // LedgerCleanupService - let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); - if *lowest_cleanup_slot > 0 && *lowest_cleanup_slot >= slot { - return Err(BlockstoreError::SlotCleanedUp); - } - - let slot_meta_cf = self.db.column::(); - let slot_meta = slot_meta_cf.get(slot)?; - if slot_meta.is_none() { - return Ok((vec![], 0, false)); - } - - let slot_meta = slot_meta.unwrap(); - // Find all the ranges for the completed data blocks - let completed_ranges = Self::get_completed_data_ranges( - start_index as u32, - &slot_meta.completed_data_indexes[..], - slot_meta.consumed as u32, - ); + let (completed_ranges, slot_meta) = self.get_completed_ranges(slot, start_index)?; if completed_ranges.is_empty() { return Ok((vec![], 0, false)); } + let slot_meta = slot_meta.unwrap(); let num_shreds = completed_ranges .last() .map(|(_, end_index)| u64::from(*end_index) - start_index + 1) @@ -2126,12 +1894,41 @@ impl Blockstore { Ok((entries, num_shreds, slot_meta.is_full())) } + fn get_completed_ranges( + &self, + slot: Slot, + start_index: u64, + ) -> Result<(CompletedRanges, Option)> { + // lowest_cleanup_slot is the last slot that was not cleaned up by + // LedgerCleanupService + let lowest_cleanup_slot = self.lowest_cleanup_slot.read().unwrap(); + if *lowest_cleanup_slot > slot { + return Err(BlockstoreError::SlotCleanedUp); + } + + let slot_meta_cf = self.db.column::(); + let slot_meta = slot_meta_cf.get(slot)?; + if slot_meta.is_none() { + return Ok((vec![], slot_meta)); + } + + let slot_meta = slot_meta.unwrap(); + // Find all the ranges for the completed data blocks + let completed_ranges = Self::get_completed_data_ranges( + start_index as u32, + &slot_meta.completed_data_indexes[..], + slot_meta.consumed as u32, + ); + + Ok((completed_ranges, Some(slot_meta))) + } + // Get the range of indexes [start_index, end_index] of every completed data block fn get_completed_data_ranges( mut start_index: u32, completed_data_end_indexes: &[u32], consumed: u32, - ) -> Vec<(u32, u32)> { + ) -> CompletedRanges { let mut completed_data_ranges = vec![]; let floor = completed_data_end_indexes .iter() @@ -2213,6 +2010,30 @@ impl Blockstore { }) } + fn get_any_valid_slot_entries(&self, slot: Slot, start_index: u64) -> Vec { + let (completed_ranges, slot_meta) = self + .get_completed_ranges(slot, start_index) + .unwrap_or_default(); + if completed_ranges.is_empty() { + return vec![]; + } + let slot_meta = slot_meta.unwrap(); + + let entries: Vec> = PAR_THREAD_POOL_ALL_CPUS.with(|thread_pool| { + thread_pool.borrow().install(|| { + completed_ranges + .par_iter() + .map(|(start_index, end_index)| { + self.get_entries_in_data_block(slot, *start_index, *end_index, &slot_meta) + .unwrap_or_default() + }) + .collect() + }) + }); + + entries.into_iter().flatten().collect() + } + // Returns slots connecting to any element of the list `slots`. pub fn get_slots_since(&self, slots: &[u64]) -> Result>> { // Return error if there was a database error during lookup of any of the @@ -3098,7 +2919,7 @@ pub mod tests { use std::{iter::FromIterator, time::Duration}; // used for tests only - fn make_slot_entries_with_transactions(num_entries: u64) -> Vec { + pub(crate) fn make_slot_entries_with_transactions(num_entries: u64) -> Vec { let mut entries: Vec = Vec::new(); for x in 0..num_entries { let transaction = Transaction::new_with_compiled_instructions( @@ -3115,99 +2936,6 @@ pub mod tests { entries } - // check that all columns are either empty or start at `min_slot` - fn test_all_empty_or_min(blockstore: &Blockstore, min_slot: Slot) { - let condition_met = blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((slot, _), _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((slot, _), _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((slot, _), _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((primary_index, _, slot), _)| { - slot >= min_slot || (primary_index == 2 && slot == 0) - }) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|((primary_index, _, slot, _), _)| { - slot >= min_slot || (primary_index == 2 && slot == 0) - }) - .unwrap_or(true) - & blockstore - .db - .iter::(IteratorMode::Start) - .unwrap() - .next() - .map(|(slot, _)| slot >= min_slot) - .unwrap_or(true); - assert!(condition_met); - } - #[test] fn test_create_new_ledger() { let mint_total = 1_000_000_000_000; @@ -3478,7 +3206,9 @@ pub mod tests { assert!(ledger.get_data_shreds(slot, 0, 1, &mut buf).is_ok()); let max_purge_slot = 1; - ledger.run_purge(0, max_purge_slot).unwrap(); + ledger + .run_purge(0, max_purge_slot, PurgeType::PrimaryIndex) + .unwrap(); *ledger.lowest_cleanup_slot.write().unwrap() = max_purge_slot; let mut buf = [0; 4096]; @@ -5034,49 +4764,6 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - fn test_purge_slots() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let (shreds, _) = make_many_slot_entries(0, 50, 5); - blockstore.insert_shreds(shreds, None, false).unwrap(); - - blockstore.purge_slots(0, 5); - - test_all_empty_or_min(&blockstore, 6); - - blockstore.purge_slots(0, 50); - - // min slot shouldn't matter, blockstore should be empty - test_all_empty_or_min(&blockstore, 100); - test_all_empty_or_min(&blockstore, 0); - - blockstore - .slot_meta_iterator(0) - .unwrap() - .for_each(|(_, _)| { - panic!(); - }); - - drop(blockstore); - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - - #[test] - fn test_purge_huge() { - let blockstore_path = get_tmp_ledger_path!(); - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let (shreds, _) = make_many_slot_entries(0, 5000, 10); - blockstore.insert_shreds(shreds, None, false).unwrap(); - - blockstore.purge_slots(0, 4999); - - test_all_empty_or_min(&blockstore, 5000); - - drop(blockstore); - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - #[test] fn test_iter_bounds() { let blockstore_path = get_tmp_ledger_path!(); @@ -5791,7 +5478,7 @@ pub mod tests { assert_eq!(first_address_entry.0, 0); assert_eq!(first_address_entry.2, slot0); - blockstore.run_purge(0, 8).unwrap(); + blockstore.run_purge(0, 8, PurgeType::PrimaryIndex).unwrap(); // First successful prune freezes index 0 assert_eq!( transaction_status_index_cf.get(0).unwrap().unwrap(), @@ -5886,7 +5573,9 @@ pub mod tests { assert_eq!(index1_first_address_entry.0, 1); assert_eq!(index1_first_address_entry.2, slot1); - blockstore.run_purge(0, 18).unwrap(); + blockstore + .run_purge(0, 18, PurgeType::PrimaryIndex) + .unwrap(); // Successful prune toggles TransactionStatusIndex assert_eq!( transaction_status_index_cf.get(0).unwrap().unwrap(), @@ -5932,169 +5621,6 @@ pub mod tests { Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); } - #[test] - #[allow(clippy::cognitive_complexity)] - fn test_purge_transaction_status() { - let blockstore_path = get_tmp_ledger_path!(); - { - let blockstore = Blockstore::open(&blockstore_path).unwrap(); - let transaction_status_index_cf = blockstore.db.column::(); - let slot = 10; - for _ in 0..5 { - let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); - blockstore - .write_transaction_status( - slot, - Signature::new(&random_bytes), - vec![&Pubkey::new(&random_bytes[0..32])], - vec![&Pubkey::new(&random_bytes[32..])], - &TransactionStatusMeta::default(), - ) - .unwrap(); - } - // Purge to freeze index 0 - blockstore.run_purge(0, 1).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..5 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - (0, Pubkey::default(), 0, Signature::default()), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..10 { - let entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 10, - frozen: true, - } - ); - - // Low purge should not affect state - blockstore.run_purge(0, 5).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..5 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..10 { - let entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 10, - frozen: true, - } - ); - - // Test boundary conditions: < slot should not purge statuses; <= slot should - blockstore.run_purge(0, 9).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..5 { - let entry = status_entry_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - for _ in 0..10 { - let entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(entry.0, 0); - assert_eq!(entry.2, slot); - } - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 10, - frozen: true, - } - ); - - blockstore.run_purge(0, 10).unwrap(); - let mut status_entry_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::TransactionStatus::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - let padding_entry = status_entry_iterator.next().unwrap().0; - assert_eq!(padding_entry.0, 2); - assert_eq!(padding_entry.2, 0); - assert!(status_entry_iterator.next().is_none()); - let mut address_transactions_iterator = blockstore - .db - .iter::(IteratorMode::From( - cf::AddressSignatures::as_index(0), - IteratorDirection::Forward, - )) - .unwrap(); - let padding_entry = address_transactions_iterator.next().unwrap().0; - assert_eq!(padding_entry.0, 2); - assert_eq!(padding_entry.2, 0); - assert!(address_transactions_iterator.next().is_none()); - assert_eq!( - transaction_status_index_cf.get(0).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: false, - } - ); - assert_eq!( - transaction_status_index_cf.get(1).unwrap().unwrap(), - TransactionStatusIndexMeta { - max_slot: 0, - frozen: true, - } - ); - } - Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); - } - #[test] fn test_get_transaction_status() { let blockstore_path = get_tmp_ledger_path!(); @@ -6286,7 +5812,7 @@ pub mod tests { ); } - blockstore.run_purge(0, 2).unwrap(); + blockstore.run_purge(0, 2, PurgeType::PrimaryIndex).unwrap(); *blockstore.lowest_cleanup_slot.write().unwrap() = slot; for (transaction, _) in expected_transactions { let signature = transaction.signatures[0]; @@ -6322,7 +5848,7 @@ pub mod tests { .unwrap(); } // Purge to freeze index 0 - blockstore.run_purge(0, 1).unwrap(); + blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); let slot1 = 20; for x in 5..9 { let signature = Signature::new(&[x; 64]); @@ -6382,7 +5908,9 @@ pub mod tests { } // Purge index 0 - blockstore.run_purge(0, 10).unwrap(); + blockstore + .run_purge(0, 10, PurgeType::PrimaryIndex) + .unwrap(); assert_eq!( blockstore .get_confirmed_signatures_for_address(address0, 0, 50) @@ -6519,7 +6047,7 @@ pub mod tests { blockstore.insert_shreds(shreds, None, false).unwrap(); } assert_eq!(blockstore.lowest_slot(), 1); - blockstore.run_purge(0, 5).unwrap(); + blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap(); assert_eq!(blockstore.lowest_slot(), 6); } Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs new file mode 100644 index 00000000000000..7e705195f937f6 --- /dev/null +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -0,0 +1,1111 @@ +use super::*; + +impl Blockstore { + /// Silently deletes all blockstore column families in the range [from_slot,to_slot] + /// Dangerous; Use with care: + /// Does not check for integrity and does not update slot metas that refer to deleted slots + /// Modifies multiple column families simultaneously + pub fn purge_slots_with_delay( + &self, + from_slot: Slot, + to_slot: Slot, + delay_between_purges: Option, + purge_type: PurgeType, + ) { + // if there's no upper bound, split the purge request into batches of 1000 slots + const PURGE_BATCH_SIZE: u64 = 1000; + let mut batch_start = from_slot; + while batch_start < to_slot { + let batch_end = (batch_start + PURGE_BATCH_SIZE).min(to_slot); + match self.run_purge(batch_start, batch_end, purge_type) { + Ok(_all_columns_purged) => { + batch_start = batch_end; + + if let Some(ref duration) = delay_between_purges { + // Cooperate with other blockstore users + std::thread::sleep(*duration); + } + } + Err(e) => { + error!( + "Error: {:?}; Purge failed in range {:?} to {:?}", + e, batch_start, batch_end + ); + break; + } + } + } + + if !self.no_compaction { + if let Err(e) = self.compact_storage(from_slot, to_slot) { + // This error is not fatal and indicates an internal error + error!( + "Error: {:?}; Couldn't compact storage from {:?} to {:?}", + e, from_slot, to_slot + ); + } + } + } + + pub fn purge_slots(&self, from_slot: Slot, to_slot: Slot) { + self.purge_slots_with_delay(from_slot, to_slot, None, PurgeType::Exact) + } + + /// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the + /// [from_slot,to_slot] range + /// + /// Dangerous; Use with care + pub fn purge_from_next_slots(&self, from_slot: Slot, to_slot: Slot) { + for (slot, mut meta) in self + .slot_meta_iterator(0) + .expect("unable to iterate over meta") + { + if slot > to_slot { + break; + } + + let original_len = meta.next_slots.len(); + meta.next_slots + .retain(|slot| *slot < from_slot || *slot > to_slot); + if meta.next_slots.len() != original_len { + info!("purge_from_next_slots: adjusted meta for slot {}", slot); + self.put_meta_bytes( + slot, + &bincode::serialize(&meta).expect("couldn't update meta"), + ) + .expect("couldn't update meta"); + } + } + } + + // Returns whether or not all columns successfully purged the slot range + pub(crate) fn run_purge( + &self, + from_slot: Slot, + to_slot: Slot, + purge_type: PurgeType, + ) -> Result { + let mut write_batch = self + .db + .batch() + .expect("Database Error: Failed to get write batch"); + // delete range cf is not inclusive + let to_slot = to_slot.checked_add(1).unwrap_or_else(|| std::u64::MAX); + + let mut delete_range_timer = Measure::start("delete_range"); + let mut columns_purged = self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok() + & self + .db + .delete_range_cf::(&mut write_batch, from_slot, to_slot) + .is_ok(); + let mut w_active_transaction_status_index = + self.active_transaction_status_index.write().unwrap(); + match purge_type { + PurgeType::Exact => { + self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; + } + PurgeType::PrimaryIndex => { + self.purge_special_columns_with_primary_index( + &mut write_batch, + &mut columns_purged, + &mut w_active_transaction_status_index, + to_slot, + )?; + } + } + delete_range_timer.stop(); + let mut write_timer = Measure::start("write_batch"); + if let Err(e) = self.db.write(write_batch) { + error!( + "Error: {:?} while submitting write batch for slot {:?} retrying...", + e, from_slot + ); + return Err(e); + } + write_timer.stop(); + datapoint_info!( + "blockstore-purge", + ("from_slot", from_slot as i64, i64), + ("to_slot", to_slot as i64, i64), + ("delete_range_us", delete_range_timer.as_us() as i64, i64), + ("write_batch_us", write_timer.as_us() as i64, i64) + ); + Ok(columns_purged) + } + + pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { + info!("compact_storage: from {} to {}", from_slot, to_slot); + let mut compact_timer = Measure::start("compact_range"); + let result = self + .meta_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .db + .column::() + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .data_shred_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .code_shred_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .dead_slots_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .duplicate_slots_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .erasure_meta_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .orphans_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .index_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false) + && self + .transaction_status_cf + .compact_range(0, 2) + .unwrap_or(false) + && self + .address_signatures_cf + .compact_range(0, 2) + .unwrap_or(false) + && self + .transaction_status_index_cf + .compact_range(0, 2) + .unwrap_or(false) + && self + .rewards_cf + .compact_range(from_slot, to_slot) + .unwrap_or(false); + compact_timer.stop(); + if !result { + info!("compact_storage incomplete"); + } + datapoint_info!( + "blockstore-compact", + ("compact_range_us", compact_timer.as_us() as i64, i64), + ); + Ok(result) + } + + fn purge_special_columns_exact( + &self, + batch: &mut WriteBatch, + from_slot: Slot, + to_slot: Slot, // Exclusive + ) -> Result<()> { + let mut index0 = self.transaction_status_index_cf.get(0)?.unwrap_or_default(); + let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap_or_default(); + for slot in from_slot..to_slot { + let slot_entries = self.get_any_valid_slot_entries(slot, 0); + for transaction in slot_entries + .iter() + .cloned() + .flat_map(|entry| entry.transactions) + { + batch.delete::((0, transaction.signatures[0], slot))?; + batch.delete::((1, transaction.signatures[0], slot))?; + for pubkey in transaction.message.account_keys { + batch.delete::(( + 0, + pubkey, + slot, + transaction.signatures[0], + ))?; + batch.delete::(( + 1, + pubkey, + slot, + transaction.signatures[0], + ))?; + } + } + } + if index0.max_slot >= from_slot && index0.max_slot <= to_slot { + index0.max_slot = from_slot.saturating_sub(1); + batch.put::(0, &index0)?; + } + if index1.max_slot >= from_slot && index1.max_slot <= to_slot { + index1.max_slot = from_slot.saturating_sub(1); + batch.put::(1, &index1)?; + } + Ok(()) + } + + fn purge_special_columns_with_primary_index( + &self, + write_batch: &mut WriteBatch, + columns_purged: &mut bool, + w_active_transaction_status_index: &mut u64, + to_slot: Slot, + ) -> Result<()> { + if let Some(index) = self.toggle_transaction_status_index( + write_batch, + w_active_transaction_status_index, + to_slot, + )? { + *columns_purged &= self + .db + .delete_range_cf::(write_batch, index, index + 1) + .is_ok() + & self + .db + .delete_range_cf::(write_batch, index, index + 1) + .is_ok(); + } + Ok(()) + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + use crate::{blockstore::tests::make_slot_entries_with_transactions, get_tmp_ledger_path}; + + // check that all columns are either empty or start at `min_slot` + fn test_all_empty_or_min(blockstore: &Blockstore, min_slot: Slot) { + let condition_met = blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((slot, _), _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((slot, _), _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((slot, _), _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((primary_index, _, slot), _)| { + slot >= min_slot || (primary_index == 2 && slot == 0) + }) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|((primary_index, _, slot, _), _)| { + slot >= min_slot || (primary_index == 2 && slot == 0) + }) + .unwrap_or(true) + & blockstore + .db + .iter::(IteratorMode::Start) + .unwrap() + .next() + .map(|(slot, _)| slot >= min_slot) + .unwrap_or(true); + assert!(condition_met); + } + + #[test] + fn test_purge_slots() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let (shreds, _) = make_many_slot_entries(0, 50, 5); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + blockstore.purge_slots(0, 5); + + test_all_empty_or_min(&blockstore, 6); + + blockstore.purge_slots(0, 50); + + // min slot shouldn't matter, blockstore should be empty + test_all_empty_or_min(&blockstore, 100); + test_all_empty_or_min(&blockstore, 0); + + blockstore + .slot_meta_iterator(0) + .unwrap() + .for_each(|(_, _)| { + panic!(); + }); + + drop(blockstore); + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_purge_huge() { + let blockstore_path = get_tmp_ledger_path!(); + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let (shreds, _) = make_many_slot_entries(0, 5000, 10); + blockstore.insert_shreds(shreds, None, false).unwrap(); + + blockstore.purge_slots(0, 4999); + + test_all_empty_or_min(&blockstore, 5000); + + drop(blockstore); + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + fn test_purge_front_of_ledger() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let max_slot = 10; + for x in 0..max_slot { + let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); + blockstore + .write_transaction_status( + x, + Signature::new(&random_bytes), + vec![&Pubkey::new(&random_bytes[0..32])], + vec![&Pubkey::new(&random_bytes[32..])], + &TransactionStatusMeta::default(), + ) + .unwrap(); + } + // Purge to freeze index 0 + blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); + + for x in max_slot..2 * max_slot { + let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); + blockstore + .write_transaction_status( + x, + Signature::new(&random_bytes), + vec![&Pubkey::new(&random_bytes[0..32])], + vec![&Pubkey::new(&random_bytes[32..])], + &TransactionStatusMeta::default(), + ) + .unwrap(); + } + + // Purging range outside of TransactionStatus max slots should not affect TransactionStatus data + blockstore.run_purge(20, 30, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_purge_transaction_status() { + let blockstore_path = get_tmp_ledger_path!(); + { + let blockstore = Blockstore::open(&blockstore_path).unwrap(); + let transaction_status_index_cf = blockstore.db.column::(); + let slot = 10; + for _ in 0..5 { + let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); + blockstore + .write_transaction_status( + slot, + Signature::new(&random_bytes), + vec![&Pubkey::new(&random_bytes[0..32])], + vec![&Pubkey::new(&random_bytes[32..])], + &TransactionStatusMeta::default(), + ) + .unwrap(); + } + // Purge to freeze index 0 + blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + for _ in 0..5 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert_eq!(entry.2, slot); + } + let mut address_transactions_iterator = blockstore + .db + .iter::(IteratorMode::From( + (0, Pubkey::default(), 0, Signature::default()), + IteratorDirection::Forward, + )) + .unwrap(); + for _ in 0..10 { + let entry = address_transactions_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert_eq!(entry.2, slot); + } + assert_eq!( + transaction_status_index_cf.get(0).unwrap().unwrap(), + TransactionStatusIndexMeta { + max_slot: 10, + frozen: true, + } + ); + + // Low purge should not affect state + blockstore.run_purge(0, 5, PurgeType::PrimaryIndex).unwrap(); + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + for _ in 0..5 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert_eq!(entry.2, slot); + } + let mut address_transactions_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::AddressSignatures::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + for _ in 0..10 { + let entry = address_transactions_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert_eq!(entry.2, slot); + } + assert_eq!( + transaction_status_index_cf.get(0).unwrap().unwrap(), + TransactionStatusIndexMeta { + max_slot: 10, + frozen: true, + } + ); + + // Test boundary conditions: < slot should not purge statuses; <= slot should + blockstore.run_purge(0, 9, PurgeType::PrimaryIndex).unwrap(); + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + for _ in 0..5 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert_eq!(entry.2, slot); + } + let mut address_transactions_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::AddressSignatures::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + for _ in 0..10 { + let entry = address_transactions_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert_eq!(entry.2, slot); + } + assert_eq!( + transaction_status_index_cf.get(0).unwrap().unwrap(), + TransactionStatusIndexMeta { + max_slot: 10, + frozen: true, + } + ); + + blockstore + .run_purge(0, 10, PurgeType::PrimaryIndex) + .unwrap(); + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + let padding_entry = status_entry_iterator.next().unwrap().0; + assert_eq!(padding_entry.0, 2); + assert_eq!(padding_entry.2, 0); + assert!(status_entry_iterator.next().is_none()); + let mut address_transactions_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::AddressSignatures::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + let padding_entry = address_transactions_iterator.next().unwrap().0; + assert_eq!(padding_entry.0, 2); + assert_eq!(padding_entry.2, 0); + assert!(address_transactions_iterator.next().is_none()); + assert_eq!( + transaction_status_index_cf.get(0).unwrap().unwrap(), + TransactionStatusIndexMeta { + max_slot: 0, + frozen: false, + } + ); + assert_eq!( + transaction_status_index_cf.get(1).unwrap().unwrap(), + TransactionStatusIndexMeta { + max_slot: 0, + frozen: true, + } + ); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } + + fn clear_and_repopulate_transaction_statuses( + blockstore: &mut Blockstore, + index0_max_slot: u64, + index1_max_slot: u64, + ) { + assert!(index1_max_slot > index0_max_slot); + let mut write_batch = blockstore.db.batch().unwrap(); + blockstore + .run_purge(0, index1_max_slot, PurgeType::PrimaryIndex) + .unwrap(); + blockstore + .db + .delete_range_cf::(&mut write_batch, 0, 3) + .unwrap(); + blockstore + .db + .delete_range_cf::(&mut write_batch, 0, 3) + .unwrap(); + blockstore.db.write(write_batch).unwrap(); + blockstore.initialize_transaction_status_index().unwrap(); + *blockstore.active_transaction_status_index.write().unwrap() = 0; + + for x in 0..index0_max_slot + 1 { + let entries = make_slot_entries_with_transactions(1); + let shreds = entries_to_test_shreds(entries.clone(), x, x.saturating_sub(1), true, 0); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let signature = entries + .iter() + .cloned() + .filter(|entry| !entry.is_tick()) + .flat_map(|entry| entry.transactions) + .map(|transaction| transaction.signatures[0]) + .collect::>()[0]; + let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); + blockstore + .write_transaction_status( + x, + signature, + vec![&Pubkey::new(&random_bytes[0..32])], + vec![&Pubkey::new(&random_bytes[32..])], + &TransactionStatusMeta::default(), + ) + .unwrap(); + } + // Freeze index 0 + let mut write_batch = blockstore.db.batch().unwrap(); + let mut w_active_transaction_status_index = + blockstore.active_transaction_status_index.write().unwrap(); + blockstore + .toggle_transaction_status_index( + &mut write_batch, + &mut w_active_transaction_status_index, + index0_max_slot + 1, + ) + .unwrap(); + drop(w_active_transaction_status_index); + blockstore.db.write(write_batch).unwrap(); + + for x in index0_max_slot + 1..index1_max_slot + 1 { + let entries = make_slot_entries_with_transactions(1); + let shreds = entries_to_test_shreds(entries.clone(), x, x.saturating_sub(1), true, 0); + blockstore.insert_shreds(shreds, None, false).unwrap(); + let signature: Signature = entries + .iter() + .cloned() + .filter(|entry| !entry.is_tick()) + .flat_map(|entry| entry.transactions) + .map(|transaction| transaction.signatures[0]) + .collect::>()[0]; + let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); + blockstore + .write_transaction_status( + x, + signature, + vec![&Pubkey::new(&random_bytes[0..32])], + vec![&Pubkey::new(&random_bytes[32..])], + &TransactionStatusMeta::default(), + ) + .unwrap(); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index0_max_slot, + frozen: true, + } + ); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index1_max_slot, + frozen: false, + } + ); + } + + #[test] + #[allow(clippy::cognitive_complexity)] + fn test_purge_transaction_status_exact() { + let blockstore_path = get_tmp_ledger_path!(); + { + let mut blockstore = Blockstore::open(&blockstore_path).unwrap(); + let index0_max_slot = 9; + let index1_max_slot = 19; + + // Test purge outside bounds + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore.run_purge(20, 22, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index0_max_slot, + frozen: true, + } + ); + for _ in 0..index0_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index1_max_slot, + frozen: false, + } + ); + for _ in index0_max_slot + 1..index1_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 1); + } + drop(status_entry_iterator); + + // Test purge inside index 0 + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore.run_purge(2, 4, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index0_max_slot, + frozen: true, + } + ); + for _ in 0..7 { + // 7 entries remaining + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert!(entry.2 < 2 || entry.2 > 4); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index1_max_slot, + frozen: false, + } + ); + for _ in index0_max_slot + 1..index1_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 1); + } + drop(status_entry_iterator); + + // Test purge inside index 0 at upper boundary + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore + .run_purge(7, index0_max_slot, PurgeType::Exact) + .unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: 6, + frozen: true, + } + ); + for _ in 0..7 { + // 7 entries remaining + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert!(entry.2 < 7); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index1_max_slot, + frozen: false, + } + ); + for _ in index0_max_slot + 1..index1_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 1); + } + drop(status_entry_iterator); + + // Test purge inside index 1 at lower boundary + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore.run_purge(10, 12, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index0_max_slot, + frozen: true, + } + ); + for _ in 0..index0_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index1_max_slot, + frozen: false, + } + ); + for _ in 13..index1_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 1); + assert!(entry.2 > 12); + } + drop(status_entry_iterator); + + // Test purge across index boundaries + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore.run_purge(7, 12, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: 6, + frozen: true, + } + ); + for _ in 0..7 { + // 7 entries remaining + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert!(entry.2 < 7); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: index1_max_slot, + frozen: false, + } + ); + for _ in 13..index1_max_slot + 1 { + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 1); + assert!(entry.2 > 12); + } + drop(status_entry_iterator); + + // Test purge include complete index 1 + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore.run_purge(7, 22, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: 6, + frozen: true, + } + ); + for _ in 0..7 { + // 7 entries remaining + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 0); + assert!(entry.2 < 7); + } + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: 6, + frozen: false, + } + ); + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 2); // Buffer entry, no index 1 entries remaining + drop(status_entry_iterator); + + // Test purge all + clear_and_repopulate_transaction_statuses( + &mut blockstore, + index0_max_slot, + index1_max_slot, + ); + blockstore.run_purge(0, 22, PurgeType::Exact).unwrap(); + + let mut status_entry_iterator = blockstore + .db + .iter::(IteratorMode::From( + cf::TransactionStatus::as_index(0), + IteratorDirection::Forward, + )) + .unwrap(); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(0) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: 0, + frozen: true, + } + ); + assert_eq!( + blockstore + .transaction_status_index_cf + .get(1) + .unwrap() + .unwrap(), + TransactionStatusIndexMeta { + max_slot: 0, + frozen: false, + } + ); + let entry = status_entry_iterator.next().unwrap().0; + assert_eq!(entry.0, 2); // Buffer entry, no index 0 or index 1 entries remaining + drop(status_entry_iterator); + } + Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction"); + } +} diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 58e9c7378b748a..22e0c97d1a7269 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -337,10 +337,14 @@ impl Column for columns::TransactionStatus { } fn index(key: &[u8]) -> (u64, Signature, Slot) { - let index = BigEndian::read_u64(&key[0..8]); - let signature = Signature::new(&key[8..72]); - let slot = BigEndian::read_u64(&key[72..80]); - (index, signature, slot) + if key.len() != 80 { + Self::as_index(0) + } else { + let index = BigEndian::read_u64(&key[0..8]); + let signature = Signature::new(&key[8..72]); + let slot = BigEndian::read_u64(&key[72..80]); + (index, signature, slot) + } } fn primary_index(index: Self::Index) -> u64 { @@ -660,23 +664,15 @@ impl Database { Ok(fs_extra::dir::get_size(&self.path)?) } - // Adds a range to delete to the given write batch and returns whether or not the column has reached - // its end - pub fn delete_range_cf(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result + // Adds a range to delete to the given write batch + pub fn delete_range_cf(&self, batch: &mut WriteBatch, from: Slot, to: Slot) -> Result<()> where C: Column + ColumnName, { let cf = self.cf_handle::(); let from_index = C::as_index(from); let to_index = C::as_index(to); - let result = batch.delete_range_cf::(cf, from_index, to_index); - let max_slot = self - .iter::(IteratorMode::End)? - .next() - .map(|(i, _)| C::primary_index(i)) - .unwrap_or(0); - let end = max_slot <= to; - result.map(|_| end) + batch.delete_range_cf::(cf, from_index, to_index) } }