Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 85 additions & 37 deletions core/src/ledger_cleanup_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
crossbeam_channel::{Receiver, RecvTimeoutError},
solana_ledger::{
blockstore::{Blockstore, PurgeType},
blockstore_db::Result as BlockstoreResult,
blockstore_db::{Result as BlockstoreResult, DATA_SHRED_CF},
},
solana_measure::measure::Measure,
solana_sdk::clock::Slot,
Expand Down Expand Up @@ -100,43 +100,74 @@ impl LedgerCleanupService {
root: Slot,
max_ledger_shreds: u64,
) -> (bool, Slot, u64) {
let mut total_slots = Vec::new();
let mut iterate_time = Measure::start("iterate_time");
let mut total_shreds = 0;
for (i, (slot, meta)) in blockstore.slot_meta_iterator(0).unwrap().enumerate() {
if i == 0 {
debug!("purge: searching from slot: {}", slot);
}
// Unrooted slots are not eligible for cleaning
if slot > root {
break;
}
// This method not exact since non-full slots will have holes
total_shreds += meta.received;
total_slots.push((slot, meta.received));
let data_shred_cf_name = DATA_SHRED_CF.to_string();

let live_files = blockstore
.live_files_metadata()
.expect("Blockstore::live_files_metadata()");
let num_shreds = live_files
.iter()
.filter(|live_file| live_file.column_family_name == data_shred_cf_name)
.map(|file_meta| file_meta.num_entries)
.sum();
Comment thread
steviez marked this conversation as resolved.

// Using the difference between the lowest and highest slot seen will
// result in overestimating the number of slots in the blockstore since
// there are likely to be some missing slots, such as when a leader is
// delinquent for their leader slots.
//
// With the below calculations, we will then end up underestimating the
// mean number of shreds per slot present in the blockstore which will
// result in cleaning more slots than necessary to get us
// below max_ledger_shreds.
//
// Given that the service runs on an interval, this is good because it
// means that we are building some headroom so the peak number of alive
// shreds doesn't get too large before the service's next run.
Comment thread
steviez marked this conversation as resolved.
//
// Finally, we have a check to make sure that we don't purge any slots
// newer than the passed in root. This check is practically only
// relevant when a cluster has extended periods of not rooting slots.
// With healthy cluster operation, the minimum ledger size ensures
// that purged slots will be quite old in relation to the newest root.
let lowest_slot = blockstore.lowest_slot();
let highest_slot = blockstore
.highest_slot()
.expect("Blockstore::highest_slot()")
.unwrap_or(lowest_slot);
if highest_slot < lowest_slot {
error!(
"Skipping cleanup: Blockstore highest slot {} < lowest slot {}",
highest_slot, lowest_slot
);
return (false, 0, num_shreds);
}
iterate_time.stop();
// The + 1 ensures we count the correct number of slots. Additionally,
// it guarantees num_slots >= 1 for the subsequent division.
let num_slots = highest_slot - lowest_slot + 1;
let mean_shreds_per_slot = num_shreds / num_slots;
info!(
"total_slots={} total_shreds={} max_ledger_shreds={}, {}",
total_slots.len(),
total_shreds,
max_ledger_shreds,
iterate_time
"{} alive shreds in slots [{}, {}], mean of {} shreds per slot",
num_shreds, lowest_slot, highest_slot, mean_shreds_per_slot
);
if total_shreds < max_ledger_shreds {
return (false, 0, total_shreds);
}
let mut num_shreds_to_clean = 0;
let mut lowest_cleanup_slot = total_slots[0].0;
for (slot, num_shreds) in total_slots.iter().rev() {
num_shreds_to_clean += *num_shreds;
if num_shreds_to_clean > max_ledger_shreds {
lowest_cleanup_slot = *slot;
break;
}

if num_shreds <= max_ledger_shreds {
return (false, 0, num_shreds);
}

(true, lowest_cleanup_slot, total_shreds)
// Add an extra (mean_shreds_per_slot - 1) in the numerator
// so that our integer division rounds up
let num_slots_to_clean = (num_shreds - max_ledger_shreds + mean_shreds_per_slot - 1)
.checked_div(mean_shreds_per_slot);

if let Some(num_slots_to_clean) = num_slots_to_clean {
// Ensure we don't cleanup anything past the last root we saw
let lowest_cleanup_slot = std::cmp::min(lowest_slot + num_slots_to_clean - 1, root);
Comment thread
steviez marked this conversation as resolved.
(true, lowest_cleanup_slot, num_shreds)
} else {
error!("Skipping cleanup: calculated mean of 0 shreds per slot");
(false, 0, num_shreds)
}
}

fn receive_new_roots(new_root_receiver: &Receiver<Slot>) -> Result<Slot, RecvTimeoutError> {
Expand Down Expand Up @@ -270,13 +301,25 @@ mod tests {
solana_ledger::{blockstore::make_many_slot_entries, get_tmp_ledger_path_auto_delete},
};

fn flush_blockstore_contents_to_disk(blockstore: Blockstore) -> Blockstore {
Comment thread
yhchiang-sol marked this conversation as resolved.
// The find_slots_to_clean() routine uses a method that queries data
// from RocksDB SST files. On a running validator, these are created
// fairly reguarly as new data is coming in and contents of memory are
// pushed to disk. In a unit test environment, we aren't pushing nearly
// enough data for this to happen organically. So, instead open and
// close the Blockstore which will perform the flush to SSTs.
Comment thread
steviez marked this conversation as resolved.
let ledger_path = blockstore.ledger_path().clone();
drop(blockstore);
Blockstore::open(&ledger_path).unwrap()
}

#[test]
fn test_find_slots_to_clean() {
// LedgerCleanupService::find_slots_to_clean() does not modify the
// Blockstore, so we can make repeated calls on the same slots
solana_logger::setup();
let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

// Construct and build some shreds for slots [1, 10]
let num_slots: u64 = 10;
Expand All @@ -286,6 +329,9 @@ mod tests {
assert!(shreds_per_slot > 1);
blockstore.insert_shreds(shreds, None, false).unwrap();

// Initiate a flush so inserted shreds found by find_slots_to_clean()
let blockstore = Arc::new(flush_blockstore_contents_to_disk(blockstore));

// Ensure no cleaning of slots > last_root
let last_root = 0;
let max_ledger_shreds = 0;
Expand All @@ -294,11 +340,11 @@ mod tests {
// Slot 0 will exist in blockstore with zero shreds since it is slot
// 1's parent. Thus, slot 0 will be identified for clean.
assert!(should_clean && lowest_purged == 0);
// Now, set max_ledger_shreds to 1 so that slot 0 is left alone
// Now, set max_ledger_shreds to 1, slot 0 still eligible for clean
let max_ledger_shreds = 1;
let (should_clean, lowest_purged, _) =
LedgerCleanupService::find_slots_to_clean(&blockstore, last_root, max_ledger_shreds);
assert!(!should_clean && lowest_purged == 0);
assert!(should_clean && lowest_purged == 0);
Comment thread
steviez marked this conversation as resolved.

// Ensure no cleaning if blockstore contains fewer than max_ledger_shreds
let last_root = num_slots;
Expand Down Expand Up @@ -340,7 +386,9 @@ mod tests {
let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let (shreds, _) = make_many_slot_entries(0, 50, 5);
blockstore.insert_shreds(shreds, None, false).unwrap();
let blockstore = Arc::new(blockstore);

// Initiate a flush so inserted shreds found by find_slots_to_clean()
let blockstore = Arc::new(flush_blockstore_contents_to_disk(blockstore));
let (sender, receiver) = unbounded();

//send a signal to kill all but 5 shreds, which will be in the newest slots
Expand Down
2 changes: 1 addition & 1 deletion ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ const ROOT_CF: &str = "root";
/// Column family for indexes
const INDEX_CF: &str = "index";
/// Column family for Data Shreds
const DATA_SHRED_CF: &str = "data_shred";
pub const DATA_SHRED_CF: &str = "data_shred";
/// Column family for Code Shreds
const CODE_SHRED_CF: &str = "code_shred";
/// Column family for Transaction Status
Expand Down