Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 12 additions & 22 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,25 @@ impl LedgerCleanupService {
/// A helper function to `cleanup_ledger` which returns a tuple of the
/// following four elements suggesting whether to clean up the ledger:
///
/// Return value (bool, Slot, Slot, u64):
/// Return value (bool, Slot, u64):
/// - `slots_to_clean` (bool): a boolean value indicating whether there
/// are any slots to clean. If true, then `cleanup_ledger` function
/// will then proceed with the ledger cleanup.
/// - `first_slot_to_purge` (Slot): the first slot to purge.
/// - `lowest_slot_to_puerge` (Slot): the lowest slot to purge. Together
/// with `first_slot_to_purge`, the two Slot values represent the
/// range of the clean up.
/// - `lowest_slot_to_purge` (Slot): the lowest slot to purge. Any
/// slot which is older or equal to `lowest_slot_to_purge` will be
/// cleaned up.
/// - `total_shreds` (u64): the total estimated number of shreds before the
/// `root`.
fn find_slots_to_clean(
blockstore: &Arc<Blockstore>,
root: Slot,
max_ledger_shreds: u64,
) -> (bool, Slot, Slot, u64) {
) -> (bool, Slot, u64) {
let mut total_slots = Vec::new();
let mut iterate_time = Measure::start("iterate_time");
let mut total_shreds = 0;
let mut first_slot = 0;
for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() {
if i == 0 {
first_slot = slot;
debug!("purge: searching from slot: {}", slot);
}
// Not exact since non-full slots will have holes
Expand All @@ -153,15 +150,14 @@ impl LedgerCleanupService {
}
iterate_time.stop();
info!(
"first_slot={} total_slots={} total_shreds={} max_ledger_shreds={}, {}",
first_slot,
"total_slots={} total_shreds={} max_ledger_shreds={}, {}",
total_slots.len(),
total_shreds,
max_ledger_shreds,
iterate_time
);
if (total_shreds as u64) < max_ledger_shreds {
return (false, 0, 0, total_shreds);
return (false, 0, total_shreds);
}
let mut num_shreds_to_clean = 0;
let mut lowest_cleanup_slot = total_slots[0].0;
Expand All @@ -173,7 +169,7 @@ impl LedgerCleanupService {
}
}

(true, first_slot, lowest_cleanup_slot, total_shreds)
(true, lowest_cleanup_slot, total_shreds)
}

fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
Expand Down Expand Up @@ -229,7 +225,7 @@ impl LedgerCleanupService {

*last_purge_slot = root;

let (slots_to_clean, purge_first_slot, lowest_cleanup_slot, total_shreds) =
let (slots_to_clean, lowest_cleanup_slot, total_shreds) =
Self::find_slots_to_clean(blockstore, root, max_ledger_shreds);

if slots_to_clean {
Expand All @@ -244,18 +240,12 @@ impl LedgerCleanupService {
*blockstore.lowest_cleanup_slot.write().unwrap() = lowest_cleanup_slot;
slot_update_time.stop();

info!(
"purging data from slots {} to {}",
purge_first_slot, lowest_cleanup_slot
);
info!("purging data older than {}", lowest_cleanup_slot);

let mut purge_time = Measure::start("purge_slots");

blockstore.purge_slots(
purge_first_slot,
lowest_cleanup_slot,
PurgeType::CompactionFilter,
);
// purge any slots older than lowest_cleanup_slot.
blockstore.purge_slots(0, lowest_cleanup_slot, PurgeType::CompactionFilter);
// Update only after purge operation.
// Safety: This value can be used by compaction_filters shared via Arc<AtomicU64>.
// Compactions are async and run as a multi-threaded background job. However, this
Expand Down
94 changes: 93 additions & 1 deletion ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {super::*, solana_sdk::message::AccountKeys, std::time::Instant};
pub struct PurgeStats {
delete_range: u64,
write_batch: u64,
delete_files_in_range: u64,
}

impl Blockstore {
Expand Down Expand Up @@ -32,7 +33,12 @@ impl Blockstore {
("from_slot", from_slot as i64, i64),
("to_slot", to_slot as i64, i64),
("delete_range_us", purge_stats.delete_range as i64, i64),
("write_batch_us", purge_stats.write_batch as i64, i64)
("write_batch_us", purge_stats.write_batch as i64, i64),
(
"delete_files_in_range_us",
purge_stats.write_batch as i64,
i64
)
);
if let Err(e) = purge_result {
error!(
Expand Down Expand Up @@ -127,6 +133,9 @@ impl Blockstore {

/// A helper function to `purge_slots` that executes the ledger clean up
/// from `from_slot` to `to_slot`.
///
/// When `from_slot` is 0, any sst-file with a key-range completely older
/// than `to_slot` will also be deleted.
pub(crate) fn run_purge_with_stats(
&self,
from_slot: Slot,
Expand Down Expand Up @@ -225,6 +234,7 @@ impl Blockstore {
}
}
delete_range_timer.stop();

let mut write_timer = Measure::start("write_batch");
if let Err(e) = self.db.write(write_batch) {
error!(
Expand All @@ -234,15 +244,97 @@ impl Blockstore {
return Err(e);
}
write_timer.stop();

let mut purge_files_in_range_timer = Measure::start("delete_file_in_range");
// purge_files_in_range delete any files whose slot range is within
// [from_slot, to_slot]. When from_slot is 0, it is safe to run
// purge_files_in_range because if purge_files_in_range deletes any
// sst file that contains any range-deletion tombstone, the deletion
// range of that tombstone will be completely covered by the new
// range-delete tombstone (0, to_slot) issued above.
//
// On the other hand, purge_files_in_range is more effective and
// efficient than the compaction filter (which runs key-by-key)
// because all the sst files that have key range below to_slot
// can be deleted immediately.
if columns_purged && from_slot == 0 {
self.purge_files_in_range(from_slot, to_slot);
}
purge_files_in_range_timer.stop();

purge_stats.delete_range += delete_range_timer.as_us();
purge_stats.write_batch += write_timer.as_us();
purge_stats.delete_files_in_range += purge_files_in_range_timer.as_us();

// only drop w_active_transaction_status_index after we do db.write(write_batch);
// otherwise, readers might be confused with inconsistent state between
// self.active_transaction_status_index and RockDb's TransactionStatusIndex contents
drop(w_active_transaction_status_index);
Ok(columns_purged)
}

fn purge_files_in_range(&self, from_slot: Slot, to_slot: Slot) -> bool {
self.db
.delete_file_in_range_cf::<cf::SlotMeta>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::BankHash>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Root>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ShredData>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ShredCode>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::DeadSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::DuplicateSlots>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::ErasureMeta>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Orphans>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Index>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Rewards>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::Blocktime>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::PerfSamples>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::BlockHeight>(from_slot, to_slot)
.is_ok()
& self
.db
.delete_file_in_range_cf::<cf::OptimisticSlots>(from_slot, to_slot)
.is_ok()
}

pub fn compact_storage(&self, from_slot: Slot, to_slot: Slot) -> Result<bool> {
if self.no_compaction {
info!("compact_storage: compaction disabled");
Expand Down
21 changes: 21 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,16 @@ impl Rocks {
Ok(())
}

fn delete_file_in_range_cf(
&self,
cf: &ColumnFamily,
from_key: &[u8],
to_key: &[u8],
) -> Result<()> {
self.0.delete_file_in_range_cf(cf, from_key, to_key)?;
Ok(())
}

fn iterator_cf<C>(&self, cf: &ColumnFamily, iterator_mode: IteratorMode<C::Index>) -> DBIterator
where
C: Column,
Expand Down Expand Up @@ -1174,6 +1184,17 @@ impl Database {
batch.delete_range_cf::<C>(cf, from_index, to_index)
}

pub fn delete_file_in_range_cf<C>(&self, from: Slot, to: Slot) -> Result<()>
where
C: Column + ColumnName,
{
self.backend.delete_file_in_range_cf(
self.cf_handle::<C>(),
&C::key(C::as_index(from)),
&C::key(C::as_index(to)),
)
}

pub fn is_primary_access(&self) -> bool {
self.backend.is_primary_access()
}
Expand Down