diff --git a/core/src/ledger_cleanup_service.rs b/core/src/ledger_cleanup_service.rs index 160b8721f45..505cc8ebd55 100644 --- a/core/src/ledger_cleanup_service.rs +++ b/core/src/ledger_cleanup_service.rs @@ -6,20 +6,19 @@ use { crossbeam_channel::{Receiver, RecvTimeoutError}, - rand::{thread_rng, Rng}, solana_ledger::{ blockstore::{Blockstore, PurgeType}, blockstore_db::Result as BlockstoreResult, }, solana_measure::measure::Measure, - solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY}, + solana_sdk::clock::Slot, std::{ string::ToString, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, }, - thread::{self, sleep, Builder, JoinHandle}, + thread::{self, Builder, JoinHandle}, time::Duration, }, }; @@ -41,13 +40,8 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000; // and starve other blockstore users. pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512; -// Compacting at a slower interval than purging helps keep IOPS down. -// Once a day should be ample -const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT; - pub struct LedgerCleanupService { t_cleanup: JoinHandle<()>, - t_compact: JoinHandle<()>, } impl LedgerCleanupService { @@ -56,25 +50,15 @@ impl LedgerCleanupService { blockstore: Arc, max_ledger_shreds: u64, exit: &Arc, - compaction_interval: Option, - max_compaction_jitter: Option, ) -> Self { let exit = exit.clone(); let mut last_purge_slot = 0; - let mut last_compaction_slot = 0; - let mut compaction_jitter = 0; - let compaction_interval = compaction_interval.unwrap_or(DEFAULT_COMPACTION_SLOT_INTERVAL); - let last_compact_slot = Arc::new(AtomicU64::new(0)); - let last_compact_slot2 = last_compact_slot.clone(); info!( - "LedgerCleanupService active. max ledger shreds={}, compaction interval={}", - max_ledger_shreds, compaction_interval, + "LedgerCleanupService active. max ledger shreds={}", + max_ledger_shreds ); - let exit_compact = exit.clone(); - let blockstore_compact = blockstore.clone(); - let t_cleanup = Builder::new() .name("solLedgerClean".to_string()) .spawn(move || loop { @@ -87,7 +71,6 @@ impl LedgerCleanupService { max_ledger_shreds, &mut last_purge_slot, DEFAULT_PURGE_SLOT_INTERVAL, - &last_compact_slot, ) { match e { RecvTimeoutError::Disconnected => break, @@ -97,28 +80,7 @@ impl LedgerCleanupService { }) .unwrap(); - let t_compact = Builder::new() - .name("solLedgerComp".to_string()) - .spawn(move || loop { - if exit_compact.load(Ordering::Relaxed) { - break; - } - Self::compact_ledger( - &blockstore_compact, - &mut last_compaction_slot, - compaction_interval, - &last_compact_slot2, - &mut compaction_jitter, - max_compaction_jitter, - ); - sleep(Duration::from_secs(1)); - }) - .unwrap(); - - Self { - t_cleanup, - t_compact, - } + Self { t_cleanup } } /// A helper function to `cleanup_ledger` which returns a tuple of the @@ -202,10 +164,6 @@ impl LedgerCleanupService { /// `last_purge_slot` is fewer than `purge_interval`, the function will /// simply return `Ok` without actually running the ledger cleanup. /// In this case, `purge_interval` will remain unchanged. - /// - `last_compact_slot`: an output value which indicates the most recent - /// slot which has been cleaned up after this call. If this parameter is - /// updated after this function call, it means the ledger cleanup has - /// been performed. /// /// Also see `blockstore::purge_slot`. pub fn cleanup_ledger( @@ -214,7 +172,6 @@ impl LedgerCleanupService { max_ledger_shreds: u64, last_purge_slot: &mut u64, purge_interval: u64, - last_compact_slot: &Arc, ) -> Result<(), RecvTimeoutError> { let root = Self::receive_new_roots(new_root_receiver)?; if root - *last_purge_slot <= purge_interval { @@ -236,7 +193,6 @@ impl LedgerCleanupService { let purge_complete = Arc::new(AtomicBool::new(false)); let blockstore = blockstore.clone(); let purge_complete1 = purge_complete.clone(); - let last_compact_slot1 = last_compact_slot.clone(); let _t_purge = Builder::new() .name("solLedgerPurge".to_string()) .spawn(move || { @@ -266,8 +222,6 @@ impl LedgerCleanupService { purge_time.stop(); info!("{}", purge_time); - last_compact_slot1.store(lowest_cleanup_slot, Ordering::Relaxed); - purge_complete1.store(true, Ordering::Relaxed); }) .unwrap(); @@ -287,39 +241,6 @@ impl LedgerCleanupService { Ok(()) } - pub fn compact_ledger( - blockstore: &Arc, - last_compaction_slot: &mut u64, - compaction_interval: u64, - highest_compact_slot: &Arc, - compaction_jitter: &mut u64, - max_jitter: Option, - ) { - let highest_compaction_slot = highest_compact_slot.load(Ordering::Relaxed); - if highest_compaction_slot.saturating_sub(*last_compaction_slot) - > (compaction_interval + *compaction_jitter) - { - info!( - "compacting data from slots {} to {}", - *last_compaction_slot, highest_compaction_slot, - ); - if let Err(err) = - blockstore.compact_storage(*last_compaction_slot, highest_compaction_slot) - { - // This error is not fatal and indicates an internal error? - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - err, last_compaction_slot, highest_compaction_slot, - ); - } - *last_compaction_slot = highest_compaction_slot; - let jitter = max_jitter.unwrap_or(0); - if jitter > 0 { - *compaction_jitter = thread_rng().gen_range(0, jitter); - } - } - } - fn report_disk_metrics( pre: BlockstoreResult, post: BlockstoreResult, @@ -337,8 +258,7 @@ impl LedgerCleanupService { } pub fn join(self) -> thread::Result<()> { - self.t_cleanup.join()?; - self.t_compact.join() + self.t_cleanup.join() } } #[cfg(test)] @@ -402,7 +322,6 @@ mod tests { solana_logger::setup(); let blockstore_path = get_tmp_ledger_path!(); let mut blockstore = Blockstore::open(&blockstore_path).unwrap(); - blockstore.set_no_compaction(true); let blockstore = Arc::new(blockstore); let (sender, receiver) = unbounded(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 95a7255b8f7..013b1916425 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -81,8 +81,6 @@ pub struct TvuConfig { pub max_ledger_shreds: Option, pub shred_version: u16, pub repair_validators: Option>, - pub rocksdb_compaction_interval: Option, - pub rocksdb_max_compaction_jitter: Option, pub wait_for_vote_to_start_leader: bool, pub replay_slots_concurrently: bool, } @@ -298,8 +296,6 @@ impl Tvu { blockstore.clone(), max_ledger_shreds, exit, - tvu_config.rocksdb_compaction_interval, - tvu_config.rocksdb_max_compaction_jitter, ) }); diff --git a/core/src/validator.rs b/core/src/validator.rs index a60bbb39169..882ba590074 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -141,9 +141,6 @@ pub struct ValidatorConfig { pub gossip_validators: Option>, // None = gossip with all pub halt_on_known_validators_accounts_hash_mismatch: bool, pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection - pub no_rocksdb_compaction: bool, - pub rocksdb_compaction_interval: Option, - pub rocksdb_max_compaction_jitter: Option, pub accounts_hash_interval_slots: u64, pub max_genesis_archive_unpacked_size: u64, pub wal_recovery_mode: Option, @@ -205,9 +202,6 @@ impl Default for ValidatorConfig { gossip_validators: None, halt_on_known_validators_accounts_hash_mismatch: false, accounts_hash_fault_injection_slots: 0, - no_rocksdb_compaction: false, - rocksdb_compaction_interval: None, - rocksdb_max_compaction_jitter: None, accounts_hash_interval_slots: std::u64::MAX, max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, wal_recovery_mode: None, @@ -984,8 +978,6 @@ impl Validator { max_ledger_shreds: config.max_ledger_shreds, shred_version: node.info.shred_version, repair_validators: config.repair_validators.clone(), - rocksdb_compaction_interval: config.rocksdb_compaction_interval, - rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval, wait_for_vote_to_start_leader, replay_slots_concurrently: config.replay_slots_concurrently, }, @@ -1415,7 +1407,6 @@ fn load_blockstore( }, ) .expect("Failed to open ledger database"); - blockstore.set_no_compaction(config.no_rocksdb_compaction); blockstore.shred_timing_point_sender = poh_timing_point_sender; // following boot sequence (esp BankForks) could set root. so stash the original value // of blockstore root away here as soon as possible. diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 5dbc072f29d..5cc23e77926 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -176,7 +176,6 @@ pub struct Blockstore { completed_slots_senders: Mutex>, pub shred_timing_point_sender: Option, pub lowest_cleanup_slot: RwLock, - no_compaction: bool, pub slots_stats: SlotsStats, } @@ -334,7 +333,6 @@ impl Blockstore { insert_shreds_lock: Mutex::<()>::default(), last_root, lowest_cleanup_slot: RwLock::::default(), - no_compaction: false, slots_stats: SlotsStats::default(), }; if initialize_transaction_status_index { @@ -408,18 +406,6 @@ impl Blockstore { } } - /// Whether to disable compaction in [`Blockstore::compact_storage`], which is used - /// by the ledger cleanup service and `solana_core::validator::backup_and_clear_blockstore`. - /// - /// Note that this setting is not related to the RocksDB's background - /// compaction. - /// - /// To disable RocksDB's background compaction, open the Blockstore - /// with AccessType::PrimaryForMaintenance. - pub fn set_no_compaction(&mut self, no_compaction: bool) { - self.no_compaction = no_compaction; - } - /// Deletes the blockstore at the specified path. /// /// Note that if the `ledger_path` has multiple rocksdb instances, this @@ -1046,7 +1032,8 @@ impl Blockstore { .expect("Couldn't fetch from SlotMeta column family") { // Clear all slot related information - self.run_purge(slot, slot, PurgeType::PrimaryIndex) + // todo: is this new PurgeType correct in this context? + self.run_purge(slot, slot, PurgeType::CompactionFilter) .expect("Purge database operations failed"); // Clear this slot as a next slot from parent @@ -2059,61 +2046,6 @@ impl Blockstore { ) } - /// Toggles the active primary index between `0` and `1`, and clears the - /// stored max-slot of the frozen index in preparation for pruning. - fn toggle_transaction_status_index( - &self, - batch: &mut WriteBatch, - w_active_transaction_status_index: &mut u64, - to_slot: Slot, - ) -> Result> { - let index0 = self.transaction_status_index_cf.get(0)?; - if index0.is_none() { - return Ok(None); - } - let mut index0 = index0.unwrap(); - let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap(); - - if !index0.frozen && !index1.frozen { - index0.frozen = true; - *w_active_transaction_status_index = 1; - batch.put::(0, &index0)?; - Ok(None) - } else { - let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot { - info!( - "Pruning expired primary index 0 up to slot {} (max requested: {})", - index0.max_slot, to_slot - ); - Some(0) - } else if index1.frozen && to_slot > index1.max_slot { - info!( - "Pruning expired primary index 1 up to slot {} (max requested: {})", - index1.max_slot, to_slot - ); - Some(1) - } else { - None - }; - - if let Some(purge_target_primary_index) = purge_target_primary_index { - *w_active_transaction_status_index = purge_target_primary_index; - if index0.frozen { - index0.max_slot = 0 - }; - index0.frozen = !index0.frozen; - batch.put::(0, &index0)?; - if index1.frozen { - index1.max_slot = 0 - }; - index1.frozen = !index1.frozen; - batch.put::(1, &index1)?; - } - - Ok(purge_target_primary_index) - } - } - fn get_primary_index_to_write( &self, slot: Slot, diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index b0875219180..4609339397d 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -13,9 +13,6 @@ pub enum PurgeType { /// A slower but more accurate way to purge slots by also ensuring higher /// level of consistency between data during the clean up process. Exact, - /// A faster approximation of `Exact` where the purge process only takes - /// care of the primary index and does not update the associated entries. - PrimaryIndex, /// The fastest purge mode that relies on the slot-id based TTL /// compaction filter to do the cleanup. CompactionFilter, @@ -77,13 +74,6 @@ impl Blockstore { pub fn purge_and_compact_slots(&self, from_slot: Slot, to_slot: Slot) { self.purge_slots(from_slot, to_slot, PurgeType::Exact); - if let Err(e) = self.compact_storage(from_slot, to_slot) { - // This error is not fatal and indicates an internal error? - error!( - "Error: {:?}; Couldn't compact storage from {:?} to {:?}", - e, from_slot, to_slot - ); - } } /// Ensures that the SlotMeta::next_slots vector for all slots contain no references in the @@ -162,7 +152,7 @@ impl Blockstore { .batch() .expect("Database Error: Failed to get write batch"); let mut delete_range_timer = Measure::start("delete_range"); - let mut columns_purged = self + let columns_purged = self .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok() @@ -222,20 +212,12 @@ impl Blockstore { .db .delete_range_cf::(&mut write_batch, from_slot, to_slot) .is_ok(); - let mut w_active_transaction_status_index = + let w_active_transaction_status_index = self.active_transaction_status_index.write().unwrap(); match purge_type { PurgeType::Exact => { self.purge_special_columns_exact(&mut write_batch, from_slot, to_slot)?; } - PurgeType::PrimaryIndex => { - self.purge_special_columns_with_primary_index( - &mut write_batch, - &mut columns_purged, - &mut w_active_transaction_status_index, - to_slot, - )?; - } PurgeType::CompactionFilter => { // No explicit action is required here because this purge type completely and // indefinitely relies on the proper working of compaction filter for those @@ -346,97 +328,6 @@ impl Blockstore { .is_ok() } - pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result { - if self.no_compaction { - info!("compact_storage: compaction disabled"); - return Ok(false); - } - info!("compact_storage: from {} to {}", from_slot, to_slot); - let mut compact_timer = Measure::start("compact_range"); - let result = self - .meta_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .db - .column::() - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .data_shred_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .code_shred_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .dead_slots_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .duplicate_slots_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .erasure_meta_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .orphans_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .bank_hash_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .index_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .transaction_status_cf - .compact_range(0, 2) - .unwrap_or(false) - && self - .address_signatures_cf - .compact_range(0, 2) - .unwrap_or(false) - && self - .transaction_status_index_cf - .compact_range(0, 2) - .unwrap_or(false) - && self - .rewards_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .blocktime_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .perf_samples_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .block_height_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false) - && self - .optimistic_slots_cf - .compact_range(from_slot, to_slot) - .unwrap_or(false); - compact_timer.stop(); - if !result { - info!("compact_storage incomplete"); - } - datapoint_info!( - "blockstore-compact", - ("compact_range_us", compact_timer.as_us() as i64, i64), - ); - Ok(result) - } - /// Purges special columns (using a non-Slot primary-index) exactly, by /// deserializing each slot being purged and iterating through all /// transactions to determine the keys of individual records. @@ -487,37 +378,6 @@ impl Blockstore { } Ok(()) } - - /// Purges special columns (using a non-Slot primary-index) by range. Purge - /// occurs if frozen primary index has a max-slot less than the highest slot - /// being purged. - fn purge_special_columns_with_primary_index( - &self, - write_batch: &mut WriteBatch, - columns_purged: &mut bool, - w_active_transaction_status_index: &mut u64, - to_slot: Slot, - ) -> Result<()> { - if let Some(purged_index) = self.toggle_transaction_status_index( - write_batch, - w_active_transaction_status_index, - to_slot + 1, - )? { - *columns_purged &= self - .db - .delete_range_cf::(write_batch, purged_index, purged_index) - .is_ok() - & self - .db - .delete_range_cf::( - write_batch, - purged_index, - purged_index, - ) - .is_ok(); - } - Ok(()) - } } #[cfg(test)] @@ -581,7 +441,8 @@ pub mod tests { .unwrap(); } // Purge to freeze index 0 - blockstore.run_purge(0, 1, PurgeType::PrimaryIndex).unwrap(); + // todo: is this new PurgeType correct in this context? + blockstore.run_purge(0, 1, PurgeType::Exact).unwrap(); for x in max_slot..2 * max_slot { let random_bytes: Vec = (0..64).map(|_| rand::random::()).collect(); @@ -827,13 +688,6 @@ pub mod tests { let mut write_batch = blockstore.db.batch().unwrap(); let mut w_active_transaction_status_index = blockstore.active_transaction_status_index.write().unwrap(); - blockstore - .toggle_transaction_status_index( - &mut write_batch, - &mut w_active_transaction_status_index, - index0_max_slot + 1, - ) - .unwrap(); drop(w_active_transaction_status_index); blockstore.db.write(write_batch).unwrap(); diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index a0eb6e2842f..fa728b9c2d7 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -784,7 +784,6 @@ impl TestValidator { enforce_ulimit_nofile: false, warp_slot: config.warp_slot, validator_exit: config.validator_exit.clone(), - rocksdb_compaction_interval: Some(100), // Compact every 100 slots max_ledger_shreds: config.max_ledger_shreds, no_wait_for_vote_to_start_leader: true, staked_nodes_overrides: config.staked_nodes_overrides.clone(), diff --git a/validator/src/main.rs b/validator/src/main.rs index 630bc4ff743..57cbc480e75 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1210,19 +1210,6 @@ pub fn main() { will not push/pull from from validators outside this set. \ [default: all validators]") ) - .arg( - Arg::with_name("no_rocksdb_compaction") - .long("no-rocksdb-compaction") - .takes_value(false) - .help("Disable manual compaction of the ledger database (this is ignored).") - ) - .arg( - Arg::with_name("rocksdb_compaction_interval") - .long("rocksdb-compaction-interval-slots") - .value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS") - .takes_value(true) - .help("Number of slots between compacting ledger"), - ) .arg( Arg::with_name("tpu_coalesce_ms") .long("tpu-coalesce-ms") @@ -1275,13 +1262,6 @@ pub fn main() { number of QUIC streams permitted from the peer and vote packet sender stage. Format of the file: `staked_map_id: {: }"), ) - .arg( - Arg::with_name("rocksdb_max_compaction_jitter") - .long("rocksdb-max-compaction-jitter-slots") - .value_name("ROCKSDB_MAX_COMPACTION_JITTER_SLOTS") - .takes_value(true) - .help("Introduce jitter into the compaction to offset compaction operation"), - ) .arg( Arg::with_name("bind_address") .long("bind-address") @@ -2360,10 +2340,6 @@ pub fn main() { let private_rpc = matches.is_present("private_rpc"); let do_port_check = !matches.is_present("no_port_check"); - let no_rocksdb_compaction = true; - let rocksdb_compaction_interval = value_t!(matches, "rocksdb_compaction_interval", u64).ok(); - let rocksdb_max_compaction_jitter = - value_t!(matches, "rocksdb_max_compaction_jitter", u64).ok(); let tpu_coalesce_ms = value_t!(matches, "tpu_coalesce_ms", u64).unwrap_or(DEFAULT_TPU_COALESCE_MS); let wal_recovery_mode = matches @@ -2731,9 +2707,6 @@ pub fn main() { known_validators, repair_validators, gossip_validators, - no_rocksdb_compaction, - rocksdb_compaction_interval, - rocksdb_max_compaction_jitter, wal_recovery_mode, poh_verify: !matches.is_present("skip_poh_verify"), debug_keys,