Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 7 additions & 88 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@

use {
crossbeam_channel::{Receiver, RecvTimeoutError},
rand::{thread_rng, Rng},
solana_ledger::{
blockstore::{Blockstore, PurgeType},
blockstore_db::Result as BlockstoreResult,
},
solana_measure::measure::Measure,
solana_sdk::clock::{Slot, DEFAULT_TICKS_PER_SLOT, TICKS_PER_DAY},
solana_sdk::clock::Slot,
std::{
string::ToString,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, sleep, Builder, JoinHandle},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};
Expand All @@ -41,13 +40,8 @@ pub const DEFAULT_MIN_MAX_LEDGER_SHREDS: u64 = 50_000_000;
// and starve other blockstore users.
pub const DEFAULT_PURGE_SLOT_INTERVAL: u64 = 512;

// Compacting at a slower interval than purging helps keep IOPS down.
// Once a day should be ample
const DEFAULT_COMPACTION_SLOT_INTERVAL: u64 = TICKS_PER_DAY / DEFAULT_TICKS_PER_SLOT;

pub struct LedgerCleanupService {
t_cleanup: JoinHandle<()>,
t_compact: JoinHandle<()>,
}

impl LedgerCleanupService {
Expand All @@ -56,25 +50,15 @@ impl LedgerCleanupService {
blockstore: Arc<Blockstore>,
max_ledger_shreds: u64,
exit: &Arc<AtomicBool>,
compaction_interval: Option<u64>,
max_compaction_jitter: Option<u64>,
) -> Self {
let exit = exit.clone();
let mut last_purge_slot = 0;
let mut last_compaction_slot = 0;
let mut compaction_jitter = 0;
let compaction_interval = compaction_interval.unwrap_or(DEFAULT_COMPACTION_SLOT_INTERVAL);
let last_compact_slot = Arc::new(AtomicU64::new(0));
let last_compact_slot2 = last_compact_slot.clone();

info!(
"LedgerCleanupService active. max ledger shreds={}, compaction interval={}",
max_ledger_shreds, compaction_interval,
"LedgerCleanupService active. max ledger shreds={}",
max_ledger_shreds
);

let exit_compact = exit.clone();
let blockstore_compact = blockstore.clone();

let t_cleanup = Builder::new()
.name("solLedgerClean".to_string())
.spawn(move || loop {
Expand All @@ -87,7 +71,6 @@ impl LedgerCleanupService {
max_ledger_shreds,
&mut last_purge_slot,
DEFAULT_PURGE_SLOT_INTERVAL,
&last_compact_slot,
) {
match e {
RecvTimeoutError::Disconnected => break,
Expand All @@ -97,28 +80,7 @@ impl LedgerCleanupService {
})
.unwrap();

let t_compact = Builder::new()
.name("solLedgerComp".to_string())
.spawn(move || loop {
if exit_compact.load(Ordering::Relaxed) {
break;
}
Self::compact_ledger(
&blockstore_compact,
&mut last_compaction_slot,
compaction_interval,
&last_compact_slot2,
&mut compaction_jitter,
max_compaction_jitter,
);
sleep(Duration::from_secs(1));
})
.unwrap();

Self {
t_cleanup,
t_compact,
}
Self { t_cleanup }
}

/// A helper function to `cleanup_ledger` which returns a tuple of the
Expand Down Expand Up @@ -202,10 +164,6 @@ impl LedgerCleanupService {
/// `last_purge_slot` is fewer than `purge_interval`, the function will
/// simply return `Ok` without actually running the ledger cleanup.
/// In this case, `purge_interval` will remain unchanged.
/// - `last_compact_slot`: an output value which indicates the most recent
/// slot which has been cleaned up after this call. If this parameter is
/// updated after this function call, it means the ledger cleanup has
/// been performed.
///
/// Also see `blockstore::purge_slot`.
pub fn cleanup_ledger(
Expand All @@ -214,7 +172,6 @@ impl LedgerCleanupService {
max_ledger_shreds: u64,
last_purge_slot: &mut u64,
purge_interval: u64,
last_compact_slot: &Arc<AtomicU64>,
) -> Result<(), RecvTimeoutError> {
let root = Self::receive_new_roots(new_root_receiver)?;
if root - *last_purge_slot <= purge_interval {
Expand All @@ -236,7 +193,6 @@ impl LedgerCleanupService {
let purge_complete = Arc::new(AtomicBool::new(false));
let blockstore = blockstore.clone();
let purge_complete1 = purge_complete.clone();
let last_compact_slot1 = last_compact_slot.clone();
let _t_purge = Builder::new()
.name("solLedgerPurge".to_string())
.spawn(move || {
Expand Down Expand Up @@ -266,8 +222,6 @@ impl LedgerCleanupService {
purge_time.stop();
info!("{}", purge_time);

last_compact_slot1.store(lowest_cleanup_slot, Ordering::Relaxed);

purge_complete1.store(true, Ordering::Relaxed);
})
.unwrap();
Expand All @@ -287,39 +241,6 @@ impl LedgerCleanupService {
Ok(())
}

pub fn compact_ledger(
blockstore: &Arc<Blockstore>,
last_compaction_slot: &mut u64,
compaction_interval: u64,
highest_compact_slot: &Arc<AtomicU64>,
compaction_jitter: &mut u64,
max_jitter: Option<u64>,
) {
let highest_compaction_slot = highest_compact_slot.load(Ordering::Relaxed);
if highest_compaction_slot.saturating_sub(*last_compaction_slot)
> (compaction_interval + *compaction_jitter)
{
info!(
"compacting data from slots {} to {}",
*last_compaction_slot, highest_compaction_slot,
);
if let Err(err) =
blockstore.compact_storage(*last_compaction_slot, highest_compaction_slot)
{
// This error is not fatal and indicates an internal error?
error!(
"Error: {:?}; Couldn't compact storage from {:?} to {:?}",
err, last_compaction_slot, highest_compaction_slot,
);
}
*last_compaction_slot = highest_compaction_slot;
let jitter = max_jitter.unwrap_or(0);
if jitter > 0 {
*compaction_jitter = thread_rng().gen_range(0, jitter);
}
}
}

fn report_disk_metrics(
pre: BlockstoreResult<u64>,
post: BlockstoreResult<u64>,
Expand All @@ -337,8 +258,7 @@ impl LedgerCleanupService {
}

pub fn join(self) -> thread::Result<()> {
self.t_cleanup.join()?;
self.t_compact.join()
self.t_cleanup.join()
}
}
#[cfg(test)]
Expand Down Expand Up @@ -402,7 +322,6 @@ mod tests {
solana_logger::setup();
let blockstore_path = get_tmp_ledger_path!();
let mut blockstore = Blockstore::open(&blockstore_path).unwrap();
blockstore.set_no_compaction(true);
let blockstore = Arc::new(blockstore);
let (sender, receiver) = unbounded();

Expand Down
4 changes: 0 additions & 4 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ pub struct TvuConfig {
pub max_ledger_shreds: Option<u64>,
pub shred_version: u16,
pub repair_validators: Option<HashSet<Pubkey>>,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub wait_for_vote_to_start_leader: bool,
pub replay_slots_concurrently: bool,
}
Expand Down Expand Up @@ -298,8 +296,6 @@ impl Tvu {
blockstore.clone(),
max_ledger_shreds,
exit,
tvu_config.rocksdb_compaction_interval,
tvu_config.rocksdb_max_compaction_jitter,
)
});

Expand Down
9 changes: 0 additions & 9 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,6 @@ pub struct ValidatorConfig {
pub gossip_validators: Option<HashSet<Pubkey>>, // None = gossip with all
pub halt_on_known_validators_accounts_hash_mismatch: bool,
pub accounts_hash_fault_injection_slots: u64, // 0 = no fault injection
pub no_rocksdb_compaction: bool,
pub rocksdb_compaction_interval: Option<u64>,
pub rocksdb_max_compaction_jitter: Option<u64>,
pub accounts_hash_interval_slots: u64,
pub max_genesis_archive_unpacked_size: u64,
pub wal_recovery_mode: Option<BlockstoreRecoveryMode>,
Expand Down Expand Up @@ -205,9 +202,6 @@ impl Default for ValidatorConfig {
gossip_validators: None,
halt_on_known_validators_accounts_hash_mismatch: false,
accounts_hash_fault_injection_slots: 0,
no_rocksdb_compaction: false,
rocksdb_compaction_interval: None,
rocksdb_max_compaction_jitter: None,
accounts_hash_interval_slots: std::u64::MAX,
max_genesis_archive_unpacked_size: MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
wal_recovery_mode: None,
Expand Down Expand Up @@ -984,8 +978,6 @@ impl Validator {
max_ledger_shreds: config.max_ledger_shreds,
shred_version: node.info.shred_version,
repair_validators: config.repair_validators.clone(),
rocksdb_compaction_interval: config.rocksdb_compaction_interval,
rocksdb_max_compaction_jitter: config.rocksdb_compaction_interval,
wait_for_vote_to_start_leader,
replay_slots_concurrently: config.replay_slots_concurrently,
},
Expand Down Expand Up @@ -1415,7 +1407,6 @@ fn load_blockstore(
},
)
.expect("Failed to open ledger database");
blockstore.set_no_compaction(config.no_rocksdb_compaction);
blockstore.shred_timing_point_sender = poh_timing_point_sender;
// following boot sequence (esp BankForks) could set root. so stash the original value
// of blockstore root away here as soon as possible.
Expand Down
72 changes: 2 additions & 70 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ pub struct Blockstore {
completed_slots_senders: Mutex<Vec<CompletedSlotsSender>>,
pub shred_timing_point_sender: Option<PohTimingSender>,
pub lowest_cleanup_slot: RwLock<Slot>,
no_compaction: bool,
pub slots_stats: SlotsStats,
}

Expand Down Expand Up @@ -334,7 +333,6 @@ impl Blockstore {
insert_shreds_lock: Mutex::<()>::default(),
last_root,
lowest_cleanup_slot: RwLock::<Slot>::default(),
no_compaction: false,
slots_stats: SlotsStats::default(),
};
if initialize_transaction_status_index {
Expand Down Expand Up @@ -408,18 +406,6 @@ impl Blockstore {
}
}

/// Whether to disable compaction in [`Blockstore::compact_storage`], which is used
/// by the ledger cleanup service and `solana_core::validator::backup_and_clear_blockstore`.
///
/// Note that this setting is not related to the RocksDB's background
/// compaction.
///
/// To disable RocksDB's background compaction, open the Blockstore
/// with AccessType::PrimaryForMaintenance.
pub fn set_no_compaction(&mut self, no_compaction: bool) {
self.no_compaction = no_compaction;
}

/// Deletes the blockstore at the specified path.
///
/// Note that if the `ledger_path` has multiple rocksdb instances, this
Expand Down Expand Up @@ -1046,7 +1032,8 @@ impl Blockstore {
.expect("Couldn't fetch from SlotMeta column family")
{
// Clear all slot related information
self.run_purge(slot, slot, PurgeType::PrimaryIndex)
// todo: is this new PurgeType correct in this context?
self.run_purge(slot, slot, PurgeType::CompactionFilter)
.expect("Purge database operations failed");

// Clear this slot as a next slot from parent
Expand Down Expand Up @@ -2059,61 +2046,6 @@ impl Blockstore {
)
}

/// Toggles the active primary index between `0` and `1`, and clears the
/// stored max-slot of the frozen index in preparation for pruning.
fn toggle_transaction_status_index(
&self,
batch: &mut WriteBatch,
w_active_transaction_status_index: &mut u64,
to_slot: Slot,
) -> Result<Option<u64>> {
let index0 = self.transaction_status_index_cf.get(0)?;
if index0.is_none() {
return Ok(None);
}
let mut index0 = index0.unwrap();
let mut index1 = self.transaction_status_index_cf.get(1)?.unwrap();

if !index0.frozen && !index1.frozen {
index0.frozen = true;
*w_active_transaction_status_index = 1;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
Ok(None)
} else {
let purge_target_primary_index = if index0.frozen && to_slot > index0.max_slot {
info!(
"Pruning expired primary index 0 up to slot {} (max requested: {})",
index0.max_slot, to_slot
);
Some(0)
} else if index1.frozen && to_slot > index1.max_slot {
info!(
"Pruning expired primary index 1 up to slot {} (max requested: {})",
index1.max_slot, to_slot
);
Some(1)
} else {
None
};

if let Some(purge_target_primary_index) = purge_target_primary_index {
*w_active_transaction_status_index = purge_target_primary_index;
if index0.frozen {
index0.max_slot = 0
};
index0.frozen = !index0.frozen;
batch.put::<cf::TransactionStatusIndex>(0, &index0)?;
if index1.frozen {
index1.max_slot = 0
};
index1.frozen = !index1.frozen;
batch.put::<cf::TransactionStatusIndex>(1, &index1)?;
}

Ok(purge_target_primary_index)
}
}

fn get_primary_index_to_write(
&self,
slot: Slot,
Expand Down
Loading