Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -934,8 +934,8 @@ fn backup_and_clear_blockstore(ledger_path: &Path, start_slot: Slot, shred_versi

let end_slot = last_slot.unwrap();
info!("Purging slots {} to {}", start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this order was very broken, when the process is interrupted immediately after purge_slots and before purge_from_next_slots. It leaves super dangerous dangling references in meta....

blockstore.purge_from_next_slots(start_slot, end_slot);
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!("Purging done, compacting db..");
if let Err(e) = blockstore.compact_storage(start_slot, end_slot) {
warn!(
Expand Down
1 change: 1 addition & 0 deletions ledger-tool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ clap = "2.33.1"
futures = "0.3.5"
futures-util = "0.3.5"
histogram = "*"
itertools = "0.9.0"
log = { version = "0.4.8" }
regex = "1"
serde_json = "1.0.56"
Expand Down
89 changes: 81 additions & 8 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg,
ArgMatches, SubCommand,
};
use itertools::Itertools;
use log::*;
use regex::Regex;
use serde_json::json;
Expand Down Expand Up @@ -889,6 +890,11 @@ fn main() {
)
.arg(&allow_dead_slots_arg)
)
.subcommand(
SubCommand::with_name("dead-slots")
.arg(&starting_slot_arg)
.about("Print all of dead slots")
)
.subcommand(
SubCommand::with_name("set-dead-slot")
.about("Mark one or more slots dead")
Expand Down Expand Up @@ -1203,13 +1209,28 @@ fn main() {
.value_name("SLOT")
.help("Ending slot to stop purging (inclusive) [default: the highest slot in the ledger]"),
)
.arg(
Arg::with_name("batch_size")
.long("batch-size")
.value_name("NUM")
.takes_value(true)
.default_value("1000")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I preferred more large values; but it seems that 10000 (45 slots/sec) worsens the throughput...

not much digging; I'll just settle down on 1000 (measured 50 slots/sec).

.help("Removes at most BATCH_SIZE slots while purging in loop"),
)
.arg(
Arg::with_name("no_compaction")
.long("no-compaction")
.required(false)
.takes_value(false)
.help("Skip ledger compaction after purge")
)
.arg(
Arg::with_name("dead_slots_only")
.long("dead-slots-only")
.required(false)
.takes_value(false)
.help("Limit puring to dead slots only")
)
)
.subcommand(
SubCommand::with_name("list-roots")
Expand Down Expand Up @@ -1445,6 +1466,17 @@ fn main() {
true,
);
}
("dead-slots", Some(arg_matches)) => {
let blockstore = open_blockstore(
&ledger_path,
AccessType::TryPrimaryThenSecondary,
wal_recovery_mode,
);
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
for slot in blockstore.dead_slots_iterator(starting_slot).unwrap() {
println!("{}", slot);
}
}
("set-dead-slot", Some(arg_matches)) => {
let slots = values_t_or_exit!(arg_matches, "slots", Slot);
let blockstore =
Expand Down Expand Up @@ -2045,9 +2077,15 @@ fn main() {
("purge", Some(arg_matches)) => {
let start_slot = value_t_or_exit!(arg_matches, "start_slot", Slot);
let end_slot = value_t!(arg_matches, "end_slot", Slot).ok();
let no_compaction = arg_matches.is_present("no-compaction");
Copy link
Copy Markdown
Contributor Author

@ryoqun ryoqun Oct 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let blockstore =
open_blockstore(&ledger_path, AccessType::PrimaryOnly, wal_recovery_mode);
let no_compaction = arg_matches.is_present("no_compaction");
let dead_slots_only = arg_matches.is_present("dead_slots_only");
let batch_size = value_t_or_exit!(arg_matches, "batch_size", usize);
let access_type = if !no_compaction {
AccessType::PrimaryOnly
} else {
AccessType::PrimaryOnlyForMaintenance
};
let blockstore = open_blockstore(&ledger_path, access_type, wal_recovery_mode);

let end_slot = match end_slot {
Some(end_slot) => end_slot,
Expand All @@ -2074,13 +2112,48 @@ fn main() {
);
exit(1);
}
println!("Purging data from slots {} to {}", start_slot, end_slot);
if no_compaction {
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
info!(
"Purging data from slots {} to {} ({} slots) (skip compaction: {}) (dead slot only: {})",
start_slot,
end_slot,
end_slot - start_slot,
no_compaction,
dead_slots_only,
);
let purge_from_blockstore = |start_slot, end_slot| {
blockstore.purge_from_next_slots(start_slot, end_slot);
if no_compaction {
blockstore.purge_slots(start_slot, end_slot, PurgeType::Exact);
} else {
blockstore.purge_and_compact_slots(start_slot, end_slot);
}
};
if !dead_slots_only {
let slots_iter = &(start_slot..=end_slot).chunks(batch_size);
for slots in slots_iter {
let slots = slots.collect::<Vec<_>>();
assert!(!slots.is_empty());

let start_slot = *slots.first().unwrap();
let end_slot = *slots.last().unwrap();
info!(
"Purging chunked slots from {} to {} ({} slots)",
start_slot,
end_slot,
end_slot - start_slot
);
purge_from_blockstore(start_slot, end_slot);
}
} else {
blockstore.purge_and_compact_slots(start_slot, end_slot);
let dead_slots_iter = blockstore
.dead_slots_iterator(start_slot)
.unwrap()
.take_while(|s| *s <= end_slot);
for dead_slot in dead_slots_iter {
info!("Purging dead slot {}", dead_slot);
purge_from_blockstore(dead_slot, dead_slot);
}
}
blockstore.purge_from_next_slots(start_slot, end_slot);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this order was very broken, when the process is interrupted immediately after purge_slots and before purge_from_next_slots. It leaves super dangerous dangling references in meta....

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, when I was working around https://github.com/solana-labs/solana/pull/12350/files#diff-5e9f940ea065adfd3066ef0d8ef0cfb5029b5ba478d96fb484b36954e464505cR1609, I just skipped the fact check and corrected new code only.... and now that laziness bit me. ;)

}
("list-roots", Some(arg_matches)) => {
let blockstore = open_blockstore(
Expand Down
6 changes: 5 additions & 1 deletion ledger/src/blockstore/blockstore_purge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl Blockstore {
meta.next_slots
.retain(|slot| *slot < from_slot || *slot > to_slot);
if meta.next_slots.len() != original_len {
info!("purge_from_next_slots: adjusted meta for slot {}", slot);
info!(
"purge_from_next_slots: meta for slot {} no longer refers to slots {:?}",
slot,
from_slot..=to_slot
);
self.put_meta_bytes(
slot,
&bincode::serialize(&meta).expect("couldn't update meta"),
Expand Down
54 changes: 35 additions & 19 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pub mod columns {

pub enum AccessType {
PrimaryOnly,
PrimaryOnlyForMaintenance, // this indicates no compaction
TryPrimaryThenSecondary,
}

Expand Down Expand Up @@ -217,37 +218,45 @@ impl Rocks {
fs::create_dir_all(&path)?;

// Use default database options
let mut db_options = get_db_options();
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
warn!("Disabling rocksdb's auto compaction for maintenance bulk ledger update...");
}
let mut db_options = get_db_options(&access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}

// Column family names
let meta_cf_descriptor = ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options());
let meta_cf_descriptor =
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heh, finally rustfmt aligns these nicely. ;)

ColumnFamilyDescriptor::new(SlotMeta::NAME, get_cf_options(&access_type));
let dead_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DeadSlots::NAME, get_cf_options(&access_type));
let duplicate_slots_cf_descriptor =
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options());
ColumnFamilyDescriptor::new(DuplicateSlots::NAME, get_cf_options(&access_type));
let erasure_meta_cf_descriptor =
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options());
let orphans_cf_descriptor = ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options());
let root_cf_descriptor = ColumnFamilyDescriptor::new(Root::NAME, get_cf_options());
let index_cf_descriptor = ColumnFamilyDescriptor::new(Index::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ErasureMeta::NAME, get_cf_options(&access_type));
let orphans_cf_descriptor =
ColumnFamilyDescriptor::new(Orphans::NAME, get_cf_options(&access_type));
let root_cf_descriptor =
ColumnFamilyDescriptor::new(Root::NAME, get_cf_options(&access_type));
let index_cf_descriptor =
ColumnFamilyDescriptor::new(Index::NAME, get_cf_options(&access_type));
let shred_data_cf_descriptor =
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ShredData::NAME, get_cf_options(&access_type));
let shred_code_cf_descriptor =
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options());
ColumnFamilyDescriptor::new(ShredCode::NAME, get_cf_options(&access_type));
let transaction_status_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options());
ColumnFamilyDescriptor::new(TransactionStatus::NAME, get_cf_options(&access_type));
let address_signatures_cf_descriptor =
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options());
ColumnFamilyDescriptor::new(AddressSignatures::NAME, get_cf_options(&access_type));
let transaction_status_index_cf_descriptor =
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options());
let rewards_cf_descriptor = ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options());
ColumnFamilyDescriptor::new(TransactionStatusIndex::NAME, get_cf_options(&access_type));
let rewards_cf_descriptor =
ColumnFamilyDescriptor::new(Rewards::NAME, get_cf_options(&access_type));
let blocktime_cf_descriptor =
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options());
ColumnFamilyDescriptor::new(Blocktime::NAME, get_cf_options(&access_type));
let perf_samples_cf_descriptor =
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options());
ColumnFamilyDescriptor::new(PerfSamples::NAME, get_cf_options(&access_type));

let cfs = vec![
(SlotMeta::NAME, meta_cf_descriptor),
Expand All @@ -272,7 +281,7 @@ impl Rocks {

// Open the database
let db = match access_type {
AccessType::PrimaryOnly => Rocks(
AccessType::PrimaryOnly | AccessType::PrimaryOnlyForMaintenance => Rocks(
DB::open_cf_descriptors(&db_options, path, cfs.into_iter().map(|c| c.1))?,
ActualAccessType::Primary,
),
Expand Down Expand Up @@ -1003,7 +1012,7 @@ impl<'a> WriteBatch<'a> {
}
}

fn get_cf_options() -> Options {
fn get_cf_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
// 256 * 8 = 2GB. 6 of these columns should take at most 12GB of RAM
options.set_max_write_buffer_number(8);
Expand All @@ -1017,10 +1026,14 @@ fn get_cf_options() -> Options {
options.set_level_zero_file_num_compaction_trigger(file_num_compaction_trigger as i32);
options.set_max_bytes_for_level_base(total_size_base);
options.set_target_file_size_base(file_size_base);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}

options
}

fn get_db_options() -> Options {
fn get_db_options(access_type: &AccessType) -> Options {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
Expand All @@ -1029,6 +1042,9 @@ fn get_db_options() -> Options {

// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);
if matches!(access_type, AccessType::PrimaryOnlyForMaintenance) {
options.set_disable_auto_compactions(true);
}

options
}