Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 5 additions & 19 deletions core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ use {
},
solana_runtime::{
accounts_background_service::{
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, SnapshotRequestHandler,
AbsRequestHandler, AbsRequestSender, AccountsBackgroundService, DroppedSlotsReceiver,
SnapshotRequestHandler,
},
accounts_db::AccountShrinkThreshold,
bank_forks::BankForks,
Expand All @@ -57,7 +58,6 @@ use {
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
std::{
boxed::Box,
collections::HashSet,
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
Expand Down Expand Up @@ -148,6 +148,7 @@ impl Tvu {
last_full_snapshot_slot: Option<Slot>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
wait_to_vote_slot: Option<Slot>,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> Self {
let TvuSockets {
repair: repair_socket,
Expand Down Expand Up @@ -248,23 +249,6 @@ impl Tvu {
}
};

let (pruned_banks_sender, pruned_banks_receiver) = unbounded();

// Before replay starts, set the callbacks in each of the banks in BankForks
// Note after this callback is created, only the AccountsBackgroundService should be calling
// AccountsDb::purge_slot() to clean up dropped banks.
let callback = bank_forks
.read()
.unwrap()
.root_bank()
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
for bank in bank_forks.read().unwrap().banks().values() {
bank.set_callback(Some(Box::new(callback.clone())));
}

let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);

let accounts_background_request_handler = AbsRequestHandler {
Expand Down Expand Up @@ -465,6 +449,7 @@ pub mod tests {
let tower = Tower::default();
let accounts_package_channel = unbounded();
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
Expand Down Expand Up @@ -514,6 +499,7 @@ pub mod tests {
None,
None,
None,
pruned_banks_receiver,
);
exit.store(true, Ordering::Relaxed);
tvu.join().unwrap();
Expand Down
39 changes: 26 additions & 13 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use {
transaction_status_service::TransactionStatusService,
},
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -503,6 +504,7 @@ impl Validator {
},
blockstore_process_options,
blockstore_root_scan,
pruned_banks_receiver,
) = load_blockstore(
config,
ledger_path,
Expand All @@ -522,6 +524,7 @@ impl Validator {
config.snapshot_config.as_ref(),
accounts_package_channel.0.clone(),
blockstore_root_scan,
pruned_banks_receiver.clone(),
);
let last_full_snapshot_slot =
last_full_snapshot_slot.or_else(|| starting_snapshot_hashes.map(|x| x.full.hash.0));
Expand Down Expand Up @@ -927,6 +930,7 @@ impl Validator {
last_full_snapshot_slot,
block_metadata_notifier,
config.wait_to_vote_slot,
pruned_banks_receiver,
);

let tpu = Tpu::new(
Expand Down Expand Up @@ -1261,6 +1265,7 @@ fn load_blockstore(
TransactionHistoryServices,
blockstore_processor::ProcessOptions,
BlockstoreRootScan,
DroppedSlotsReceiver,
) {
info!("loading ledger from {:?}...", ledger_path);
*start_progress.write().unwrap() = ValidatorStartProgress::LoadingLedger;
Expand Down Expand Up @@ -1339,19 +1344,23 @@ fn load_blockstore(
TransactionHistoryServices::default()
};

let (mut bank_forks, mut leader_schedule_cache, starting_snapshot_hashes) =
bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_update_notifier,
);
let (
mut bank_forks,
mut leader_schedule_cache,
starting_snapshot_hashes,
pruned_banks_receiver,
) = bank_forks_utils::load_bank_forks(
&genesis_config,
&blockstore,
config.account_paths.clone(),
config.account_shrink_paths.clone(),
config.snapshot_config.as_ref(),
&process_options,
transaction_history_services
.cache_block_meta_sender
.as_ref(),
accounts_update_notifier,
);

leader_schedule_cache.set_fixed_leader_schedule(config.fixed_leader_schedule.clone());
bank_forks.set_snapshot_config(config.snapshot_config.clone());
Expand All @@ -1373,9 +1382,11 @@ fn load_blockstore(
transaction_history_services,
process_options,
blockstore_root_scan,
pruned_banks_receiver,
)
}

#[allow(clippy::too_many_arguments)]
fn process_blockstore(
blockstore: &Blockstore,
bank_forks: &mut BankForks,
Expand All @@ -1386,6 +1397,7 @@ fn process_blockstore(
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
blockstore_root_scan: BlockstoreRootScan,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> Option<Slot> {
let last_full_snapshot_slot = blockstore_processor::process_blockstore_from_root(
blockstore,
Expand All @@ -1396,6 +1408,7 @@ fn process_blockstore(
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
pruned_banks_receiver,
)
.unwrap_or_else(|err| {
error!("Failed to load ledger: {:?}", err);
Expand Down
54 changes: 41 additions & 13 deletions ledger/src/bank_forks_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use {
},
leader_schedule_cache::LeaderScheduleCache,
},
crossbeam_channel::unbounded,
log::*,
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_update_notifier_interface::AccountsUpdateNotifier,
bank_forks::BankForks,
snapshot_archive_info::SnapshotArchiveInfoGetter,
Expand Down Expand Up @@ -47,16 +49,17 @@ pub fn load(
accounts_package_sender: AccountsPackageSender,
accounts_update_notifier: Option<AccountsUpdateNotifier>,
) -> LoadResult {
let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes) = load_bank_forks(
genesis_config,
blockstore,
account_paths,
shrink_paths,
snapshot_config,
&process_options,
cache_block_meta_sender,
accounts_update_notifier,
);
let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) =
load_bank_forks(
genesis_config,
blockstore,
account_paths,
shrink_paths,
snapshot_config,
&process_options,
cache_block_meta_sender,
accounts_update_notifier,
);

blockstore_processor::process_blockstore_from_root(
blockstore,
Expand All @@ -67,6 +70,7 @@ pub fn load(
cache_block_meta_sender,
snapshot_config,
accounts_package_sender,
pruned_banks_receiver,
)
.map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes))
}
Expand All @@ -85,6 +89,7 @@ pub fn load_bank_forks(
BankForks,
LeaderScheduleCache,
Option<StartingSnapshotHashes>,
DroppedSlotsReceiver,
) {
let snapshot_present = if let Some(snapshot_config) = snapshot_config {
info!(
Expand Down Expand Up @@ -144,12 +149,30 @@ pub fn load_bank_forks(
)
};

let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank_forks.root_bank());
// Before replay starts, set the callbacks in each of the banks in BankForks so that
// all dropped banks come through the `pruned_banks_receiver` channel. This way all bank
// drop behavior can be safely synchronized with any other ongoing accounts activity like
// cache flush, clean, shrink, as long as the same thread performing those activities also
// is processing the dropped banks from the `pruned_banks_receiver` channel.

// There should only be one bank, the root bank in BankForks. Thus all banks added to
// BankForks from now on will be descended from the root bank and thus will inherit
// the bank drop callback.
assert_eq!(bank_forks.banks().len(), 1);
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let root_bank = bank_forks.root_bank();
let callback = root_bank
.rc
.accounts
.accounts_db
.create_drop_bank_callback(pruned_banks_sender);
root_bank.set_callback(Some(Box::new(callback)));

let mut leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank);
if process_options.full_leader_cache {
leader_schedule_cache.set_max_schedules(std::usize::MAX);
}

assert_eq!(bank_forks.banks().len(), 1);
if let Some(ref new_hard_forks) = process_options.new_hard_forks {
let root_bank = bank_forks.root_bank();
let hard_forks = root_bank.hard_forks();
Expand All @@ -166,7 +189,12 @@ pub fn load_bank_forks(
}
}

(bank_forks, leader_schedule_cache, starting_snapshot_hashes)
(
bank_forks,
leader_schedule_cache,
starting_snapshot_hashes,
pruned_banks_receiver,
)
}

#[allow(clippy::too_many_arguments)]
Expand Down
41 changes: 31 additions & 10 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use {
solana_program_runtime::timings::{ExecuteTimingType, ExecuteTimings},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{
accounts_background_service::DroppedSlotsReceiver,
accounts_db::{AccountShrinkThreshold, AccountsDbConfig},
accounts_index::AccountSecondaryIndexes,
accounts_update_notifier_interface::AccountsUpdateNotifier,
Expand Down Expand Up @@ -566,16 +567,17 @@ pub fn test_process_blockstore(
blockstore: &Blockstore,
opts: ProcessOptions,
) -> (BankForks, LeaderScheduleCache) {
let (mut bank_forks, leader_schedule_cache, ..) = crate::bank_forks_utils::load_bank_forks(
genesis_config,
blockstore,
Vec::new(),
None,
None,
&opts,
None,
None,
);
let (mut bank_forks, leader_schedule_cache, .., pruned_banks_receiver) =
crate::bank_forks_utils::load_bank_forks(
genesis_config,
blockstore,
Vec::new(),
None,
None,
&opts,
None,
None,
);
let (accounts_package_sender, _) = unbounded();
process_blockstore_from_root(
blockstore,
Expand All @@ -586,6 +588,7 @@ pub fn test_process_blockstore(
None,
None,
accounts_package_sender,
pruned_banks_receiver,
)
.unwrap();
(bank_forks, leader_schedule_cache)
Expand Down Expand Up @@ -636,6 +639,7 @@ pub fn process_blockstore_from_root(
cache_block_meta_sender: Option<&CacheBlockMetaSender>,
snapshot_config: Option<&SnapshotConfig>,
accounts_package_sender: AccountsPackageSender,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> result::Result<Option<Slot>, BlockstoreProcessorError> {
if let Some(num_threads) = opts.override_num_threads {
PAR_THREAD_POOL.with(|pool| {
Expand Down Expand Up @@ -695,6 +699,7 @@ pub fn process_blockstore_from_root(
accounts_package_sender,
&mut timing,
&mut last_full_snapshot_slot,
pruned_banks_receiver,
)?;
} else {
// If there's no meta for the input `start_slot`, then we started from a snapshot
Expand Down Expand Up @@ -1116,6 +1121,7 @@ fn load_frozen_forks(
accounts_package_sender: AccountsPackageSender,
timing: &mut ExecuteTimings,
last_full_snapshot_slot: &mut Option<Slot>,
pruned_banks_receiver: DroppedSlotsReceiver,
) -> result::Result<(), BlockstoreProcessorError> {
let recyclers = VerifyRecyclers::default();
let mut all_banks = HashMap::new();
Expand Down Expand Up @@ -1284,6 +1290,17 @@ fn load_frozen_forks(
}

if last_free.elapsed() > Duration::from_secs(10) {
// Purge account state for all dropped banks
for (pruned_slot, pruned_bank_id) in pruned_banks_receiver.try_iter() {
// Simulate this purge being from the AccountsBackgroundService
let is_from_abs = true;
new_root_bank.rc.accounts.purge_slot(
pruned_slot,
pruned_bank_id,
is_from_abs,
);
}

// Must be called after `squash()`, so that AccountsDb knows what
// the roots are for the cache flushing in exhaustively_free_unused_resource().
// This could take few secs; so update last_free later
Expand Down Expand Up @@ -3143,6 +3160,7 @@ pub mod tests {

// Test process_blockstore_from_root() from slot 1 onwards
let (accounts_package_sender, _) = unbounded();
let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
process_blockstore_from_root(
&blockstore,
&mut bank_forks,
Expand All @@ -3152,6 +3170,7 @@ pub mod tests {
None,
None,
accounts_package_sender,
pruned_banks_receiver,
)
.unwrap();

Expand Down Expand Up @@ -3252,6 +3271,7 @@ pub mod tests {
let (accounts_package_sender, accounts_package_receiver) = unbounded();
let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank);

let (_pruned_banks_sender, pruned_banks_receiver) = unbounded();
process_blockstore_from_root(
&blockstore,
&mut bank_forks,
Expand All @@ -3261,6 +3281,7 @@ pub mod tests {
None,
Some(&snapshot_config),
accounts_package_sender.clone(),
pruned_banks_receiver,
)
.unwrap();

Expand Down
Loading