Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,8 +830,8 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi

let end_slot = last_slot.unwrap();
info!("Purging slots {} to {}", start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
blockstore.purge_from_next_slots(start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!("Purging done, compacting db..");
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
warn!(
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = "2.33.1"
futures = "0.3.5"
futures-util = "0.3.5"
histogram = "*"
itertools = "0.9.0"
log = { version = "0.4.8" }
regex = "1"
serde_json = "1.0.56"
Expand Down
89 changes: 81 additions & 8 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg,
ArgMatches, SubCommand,
};
use itertools::Itertools;
use log::*;
use regex::Regex;
use serde_json::json;
Expand Down Expand Up @@ -889,6 +890,11 @@ fn main() {
)
.arg(&allow_dead_slots_arg)
)
.subcommand(
SubCommand::with_name("dead-slots")
.arg(&starting_slot_arg)
.about("Print all of dead slots")
)
.subcommand(
SubCommand::with_name("set-dead-slot")
.about("Mark one or more slots dead")
Expand Down Expand Up @@ -1203,13 +1209,28 @@ fn main() {
.value_name("SLOT")
.help("Ending slot to stop purging (inclusive) [default: the highest slot in the ledger]"),
)
.arg(
Arg::with_name("batch_size")
.long("batch-size")
.value_name("NUM")
.takes_value(true)
.default_value("1000")
.help("Removes at most BATCH_SIZE slots while purging in loop"),
)
.arg(
Arg::with_name("no_compaction")
.long("no-compaction")
.required(false)
.takes_value(false)
.help("Skip ledger compaction after purge")
)
.arg(
Arg::with_name("dead_slots_only")
.long("dead-slots-only")
.required(false)
.takes_value(false)
.help("Limit puring to dead slots only")
)
)
.subcommand(
SubCommand::with_name("list-roots")
Expand Down Expand Up @@ -1445,6 +1466,17 @@ fn main() {
true,
);
}
("dead-slots", Some(arg_matches)) => {
let blockstore = open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
for slot in blockstore.dead_slots_iterator(starting_slot).unwrap() {
println!("{}", slot);
}
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore =
Expand Down Expand Up @@ -2045,9 +2077,15 @@ fn main() {
("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t!(arg_matches, "end_slot", Slot).ok();
let no_compaction = arg_matches.is_present("no-compaction");
let blockstore =
open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode);
let no_compaction = arg_matches.is_present("no_compaction");
let dead_slots_only = arg_matches.is_present("dead_slots_only");
let batch_size = value_t_or_exit!(arg_matches, "batch_size", usize);
let access_type = if !no_compaction {
AccessType::PrimaryOnly
} else {
AccessType::PrimaryOnlyForMaintenance
};
let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode);

let end_slot = match end_slot {
Some(end_slot) => end_slot,
Expand All @@ -2074,13 +2112,48 @@ fn main() {
);
exit(1);
}
println!("Purging data from slots {} to {}", start_slot, end_slot);
if no_compaction {
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!(
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
start_slot,
end_slot,
end_slot - start_slot,
no_compaction,
dead_slots_only,
);
let purge_from_blockstore = |start_slot, end_slot| {
blockstore.purge_from_next_slots(start_slot, end_slot);
if no_compaction {
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
} else {
blockstore.purge_and_compact_slots(start_slot, end_slot);
}
};
if !dead_slots_only {
let slots_iter = &(start_slot..=end_slot).chunks(batch_size);
for slots in slots_iter {
let slots = slots.collect::<Vec<_>>();
assert!(!slots.is_empty());

let start_slot = *slots.first().unwrap();
let end_slot = *slots.last().unwrap();
info!(
"Purging chunked slots from {} to {} ({} slots)",
start_slot,
end_slot,
end_slot - start_slot
);
purge_from_blockstore(start_slot, end_slot);
}
} else {
blockstore.purge_and_compact_slots(start_slot, end_slot);
let dead_slots_iter = blockstore
.dead_slots_iterator(start_slot)
.unwrap()
.take_while(|s| *s <= end_slot);
for dead_slot in dead_slots_iter {
info!("Purging dead slot {}", dead_slot);
purge_from_blockstore(dead_slot, dead_slot);
}
}
blockstore.purge_from_next_slots(start_slot, end_slot);
}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(
Expand Down
6 changes: 5 additions & 1 deletion ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl Blockstore {
meta.next_slots
.retain(|slot| *slot < from_slot || *slot > to_slot);
if meta.next_slots.len() != original_len {
info!("purge_from_next_slots: adjusted meta for slot {}", slot);
info!(
"purge_from_next_slots: meta for slot {} no longer refers to slots {:?}",
slot,
from_slot..=to_slot
);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
Expand Down
54 changes: 35 additions & 19 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ pub mod columns {

pub enum AccessType {
PrimaryOnly,
PrimaryOnlyForMaintenance, // this indicates no compaction
TryPrimaryThenSecondary,
}

Expand Down Expand Up @@ -213,37 +214,45 @@ impl Rocks {
fs::create_dir_all(&path)?;

// Use default database options
let mut db_options = get_db_options();
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update...");
}
let mut db_options = get_db_options(&access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}

// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let meta_cf_descriptor =
ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type));
let duplicate_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type));
let transaction_status_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options());
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type));
let address_signatures_cf_descriptor =
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options());
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type));
let transaction_status_index_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options());
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type));
let rewards_cf_descriptor =
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type));
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));

let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
Expand All @@ -268,7 +277,7 @@ impl Rocks {

// Open the database
let db = match access_type {
AccessType::PrimaryOnly => Rocks(
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary,
),
Expand Down Expand Up @@ -961,7 +970,7 @@ impl<'a> WriteBatch<'a> {
}
}

fn get_cf_options() -> Options {
fn get_cf_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
// 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
options.set_max_write_buffer_number(8);
Expand All @@ -975,10 +984,14 @@ fn get_cf_options() -> Options {
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}

options
}

fn get_db_options() -> Options {
fn get_db_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
Expand All @@ -987,6 +1000,9 @@ fn get_db_options() -> Options {

// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}

options
}