Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 10 additions & 115 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@

use {
crossbeam_channel::{Receiver, RecvTimeoutError},
rand::{thread_rng, Rng},
solana_ledger::{
blockstore::{Blockstore, PurgeType},
blockstore_db::Result as BlockstoreResult,
},
solana_measure::measure::Measure,
solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY},
solana_sdk::clock::Slot,
std::{
string::ToString,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep, Builder, JoinHandle},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
Expand All @@ -41,13 +40,8 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
// and starve other blockstore users.
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;

// Compacting at a slower interval than purging helps keep IOPS down.
// Once a day should be ample
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;

pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>,
t_compact: JoinHandle<()>,
}

impl LedgerCleanupService {
Expand All @@ -56,25 +50,15 @@ impl LedgerCleanupService {
blockstore: Arc<Blockstore>,
max_ledger_shreds: u64,
exit: &Arc<AtomicBool>,
compaction_interval: Option<u64>,
Comment thread
steviez marked this conversation as resolved.
max_compaction_jitter: Option<u64>,
) -> Self {
let exit = exit.clone();
let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;
let mut compaction_jitter = 0;
let compaction_interval = compaction_interval.unwrap_or(DEFAULT_COMPACTION_SLOT_INTERVAL);
let last_compact_slot = Arc::new(AtomicU64::new(0));
let last_compact_slot2 = last_compact_slot.clone();

info!(
"LedgerCleanupService active. max ledger shreds={}, compaction interval={}",
max_ledger_shreds, compaction_interval,
"LedgerCleanupService active. max ledger shreds={}",
max_ledger_shreds
);

let exit_compact = exit.clone();
let blockstore_compact = blockstore.clone();

let t_cleanup = Builder::new()
.name("solLedgerClean".to_string())
.spawn(move || loop {
Expand All @@ -87,7 +71,6 @@ impl LedgerCleanupService {
max_ledger_shreds,
&mut last_purge_slot,
DEFAULT_PURGE_SLOT_INTERVAL,
&last_compact_slot,
) {
match e {
RecvTimeoutError::Disconnected => break,
Expand All @@ -97,28 +80,7 @@ impl LedgerCleanupService {
})
.unwrap();

let t_compact = Builder::new()
.name("solLedgerComp".to_string())
.spawn(move || loop {
if exit_compact.load(Ordering::Relaxed) {
break;
}
Self::compact_ledger(
&blockstore_compact,
&mut last_compaction_slot,
compaction_interval,
&last_compact_slot2,
&mut compaction_jitter,
max_compaction_jitter,
);
sleep(Duration::from_secs(1));
})
.unwrap();

Self {
t_cleanup,
t_compact,
}
Self { t_cleanup }
}

/// A helper function to `cleanup_ledger` which returns a tuple of the
Expand Down Expand Up @@ -202,10 +164,6 @@ impl LedgerCleanupService {
/// `last_purge_slot` is fewer than `purge_interval`, the function will
/// simply return `Ok` without actually running the ledger cleanup.
/// In this case, `purge_interval` will remain unchanged.
/// - `last_compact_slot`: an output value which indicates the most recent
/// slot which has been cleaned up after this call. If this parameter is
/// updated after this function call, it means the ledger cleanup has
/// been performed.
///
/// Also see `blockstore::purge_slot`.
pub fn cleanup_ledger(
Expand All @@ -214,7 +172,6 @@ impl LedgerCleanupService {
max_ledger_shreds: u64,
last_purge_slot: &mut u64,
purge_interval: u64,
last_compact_slot: &Arc<AtomicU64>,
) -> Result<(), RecvTimeoutError> {
let root = Self::receive_new_roots(new_root_receiver)?;
if root - *last_purge_slot <= purge_interval {
Expand All @@ -236,7 +193,6 @@ impl LedgerCleanupService {
let purge_complete = Arc::new(AtomicBool::new(false));
let blockstore = blockstore.clone();
let purge_complete1 = purge_complete.clone();
let last_compact_slot1 = last_compact_slot.clone();
let _t_purge = Builder::new()
.name("solLedgerPurge".to_string())
.spawn(move || {
Expand Down Expand Up @@ -266,8 +222,6 @@ impl LedgerCleanupService {
purge_time.stop();
info!("{}", purge_time);

last_compact_slot1.store(lowest_cleanup_slot, Ordering::Relaxed);

purge_complete1.store(true, Ordering::Relaxed);
})
.unwrap();
Expand All @@ -287,39 +241,6 @@ impl LedgerCleanupService {
Ok(())
}

pub fn compact_ledger(
Comment thread
steviez marked this conversation as resolved.
blockstore: &Arc<Blockstore>,
last_compaction_slot: &mut u64,
compaction_interval: u64,
highest_compact_slot: &Arc<AtomicU64>,
compaction_jitter: &mut u64,
max_jitter: Option<u64>,
) {
let highest_compaction_slot = highest_compact_slot.load(Ordering::Relaxed);
if highest_compaction_slot.saturating_sub(*last_compaction_slot)
> (compaction_interval + *compaction_jitter)
{
info!(
"compacting data from slots {} to {}",
*last_compaction_slot, highest_compaction_slot,
);
if let Err(err) =
blockstore.compact_storage(*last_compaction_slot, highest_compaction_slot)
{
// This error is not fatal and indicates an internal error?
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
err, last_compaction_slot, highest_compaction_slot,
);
}
*last_compaction_slot = highest_compaction_slot;
let jitter = max_jitter.unwrap_or(0);
if jitter > 0 {
*compaction_jitter = thread_rng().gen_range(0, jitter);
}
}
}

fn report_disk_metrics(
pre: BlockstoreResult<u64>,
post: BlockstoreResult<u64>,
Expand All @@ -337,8 +258,7 @@ impl LedgerCleanupService {
}

pub fn join(self) -> thread::Result<()> {
self.t_cleanup.join()?;
self.t_compact.join()
self.t_cleanup.join()
}
}
#[cfg(test)]
Expand All @@ -361,38 +281,17 @@ mod tests {

//send a signal to kill all but 5 shreds, which will be in the newest slots
let mut last_purge_slot = 0;
let highest_compaction_slot = Arc::new(AtomicU64::new(0));
sender.send(50).unwrap();
LedgerCleanupService::cleanup_ledger(
&receiver,
&blockstore,
5,
&mut last_purge_slot,
10,
&highest_compaction_slot,
)
.unwrap();
LedgerCleanupService::cleanup_ledger(&receiver, &blockstore, 5, &mut last_purge_slot, 10)
.unwrap();
assert_eq!(last_purge_slot, 50);
assert_eq!(highest_compaction_slot.load(Ordering::Relaxed), 44);

//check that 0-40 don't exist
blockstore
.slot_meta_iterator(0)
.unwrap()
.for_each(|(slot, _)| assert!(slot > 40));

let mut last_compaction_slot = 0;
let mut jitter = 0;
LedgerCleanupService::compact_ledger(
&blockstore,
&mut last_compaction_slot,
10,
&highest_compaction_slot,
&mut jitter,
None,
);
assert_eq!(jitter, 0);

drop(blockstore);
Blockstore::destroy(&blockstore_path).expect("Expected successful database destruction");
}
Expand All @@ -401,9 +300,7 @@ mod tests {
fn test_cleanup_speed() {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_no_compaction(true);
let blockstore = Arc::new(blockstore);
let blockstore = Arc::new(Blockstore::open(&blockstore_path).unwrap());
let (sender, receiver) = unbounded();

let mut first_insert = Measure::start("first_insert");
Expand All @@ -415,7 +312,6 @@ mod tests {
info!("{}", first_insert);

let mut last_purge_slot = 0;
let last_compaction_slot = Arc::new(AtomicU64::new(0));
let mut slot = initial_slots;
let mut num_slots = 6;
for _ in 0..5 {
Expand All @@ -439,7 +335,6 @@ mod tests {
initial_slots,
&mut last_purge_slot,
10,
&last_compaction_slot,
)
.unwrap();
time.stop();
Expand Down
4 changes: 0 additions & 4 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
pub repair_validators: Option<HashSet<Pubkey>>,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool,
}
Expand Down Expand Up @@ -300,8 +298,6 @@ impl Tvu {
blockstore.clone(),
max_ledger_shreds,
exit,
tvu_config.rocksdb_compaction_interval,
tvu_config.rocksdb_max_compaction_jitter,
)
});

Expand Down
9 changes: 0 additions & 9 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,6 @@ pub struct ValidatorConfig {
pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
pub halt_on_known_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
pub no_rocksdb_compaction: bool,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
Expand Down Expand Up @@ -207,9 +204,6 @@ impl Default for ValidatorConfig {
gossip_validators: None,
halt_on_known_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0,
no_rocksdb_compaction: false,
rocksdb_compaction_interval: None,
rocksdb_max_compaction_jitter: None,
accounts_hash_interval_slots: std::u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None,
Expand Down Expand Up @@ -981,8 +975,6 @@ impl Validator {
max_ledger_shreds: config.max_ledger_shreds,
shred_version: node.info.shred_version,
repair_validators: config.repair_validators.clone(),
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently,
},
Expand Down Expand Up @@ -1414,7 +1406,6 @@ fn load_blockstore(
},
)
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
blockstore.shred_timing_point_sender = poh_timing_point_sender;
// following boot sequence (esp BankForks) could set root. so stash the original value
// of blockstore root away here as soon as possible.
Expand Down
Loading