Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions core/src/banking_stage/decision_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
solana_pubkey::Pubkey,
solana_unified_scheduler_pool::{BankingStageMonitor, BankingStageStatus},
std::{
sync::{Arc, RwLock},
sync::{atomic::{AtomicBool, Ordering::Relaxed}, Arc, RwLock},
time::{Duration, Instant},
},
};
Expand Down Expand Up @@ -136,10 +136,30 @@ impl DecisionMaker {
}
}

impl BankingStageMonitor for DecisionMaker {
#[derive(Debug)]
pub(crate) struct DecisionMakerWrapper {
is_exited: Arc<AtomicBool>,
decision_maker: DecisionMaker,
}

impl DecisionMakerWrapper {
pub(crate) fn new(decision_maker: DecisionMaker) -> Self {
// Clone-off before hand to avoid lock contentions.
let is_exited = decision_maker.poh_recorder.read().unwrap().is_exited.clone();

Self {
is_exited,
decision_maker,
}
}
}

impl BankingStageMonitor for DecisionMakerWrapper {
fn status(&mut self) -> BankingStageStatus {
if matches!(
self.make_consume_or_forward_decision(),
if self.is_exited.load(Relaxed) {
BankingStageStatus::Exited
} else if matches!(
self.decision_maker.make_consume_or_forward_decision(),
BufferedPacketsDecision::Forward,
) {
BankingStageStatus::Inactive
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_stage/unified_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use qualifier_attr::qualifiers;
use {
super::{
decision_maker::{BufferedPacketsDecision, DecisionMaker},
decision_maker::{BufferedPacketsDecision, DecisionMaker, DecisionMakerWrapper},
packet_deserializer::PacketDeserializer,
LikeClusterInfo,
},
Expand All @@ -56,7 +56,7 @@ pub(crate) fn ensure_banking_stage_setup(
let mut root_bank_cache = RootBankCache::new(bank_forks.clone());
let unified_receiver = channels.unified_receiver().clone();
let mut decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let banking_stage_monitor = Box::new(decision_maker.clone());
let banking_stage_monitor = Box::new(DecisionMakerWrapper::new(decision_maker.clone()));

let banking_packet_handler = Box::new(
move |helper: &BankingStageHelper, batches: BankingPacketBatch| {
Expand Down
12 changes: 12 additions & 0 deletions runtime/src/bank_forks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,18 @@ impl ForkGraph for BankForks {
}
}

impl Drop for BankForks {
fn drop(&mut self) {
info!("BankForks::drop(): started...");
self.banks.clear();

if let Some(scheduler_pool) = self.scheduler_pool.take() {
scheduler_pool.uninstalled_from_bank_forks();
}
info!("BankForks::drop(): ...finished");
}
}

#[cfg(test)]
mod tests {
use {
Expand Down
2 changes: 2 additions & 0 deletions runtime/src/installed_scheduler_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub trait InstalledSchedulerPool: Send + Sync + Debug {
/// timing of scheduler returning to reduce latency of the normal block-verification code-path,
/// relying on eventual stale listener clean-up by `solScCleaner`.
fn register_timeout_listener(&self, timeout_listener: TimeoutListener);

fn uninstalled_from_bank_forks(self: Arc<Self>);
}

#[derive(Debug)]
Expand Down
165 changes: 137 additions & 28 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use qualifier_attr::qualifiers;
use {
agave_banking_stage_ingress_types::{BankingPacketBatch, BankingPacketReceiver},
assert_matches::assert_matches,
crossbeam_channel::{self, never, select_biased, Receiver, RecvError, SendError, Sender},
crossbeam_channel::{
self, never, select_biased, Receiver, RecvError, RecvTimeoutError, SendError, Sender,
},
dashmap::DashMap,
derive_where::derive_where,
dyn_clone::{clone_trait_object, DynClone},
Expand Down Expand Up @@ -130,6 +132,8 @@ pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
weak_self: Weak<Self>,
next_scheduler_id: AtomicSchedulerId,
max_usage_queue_count: usize,
scheduler_pool_sender: Sender<Weak<Self>>,
cleaner_thread: JoinHandle<()>,
_phantom: PhantomData<TH>,
}

Expand Down Expand Up @@ -231,6 +235,20 @@ impl HandlerContext {
fn banking_stage_helper(&self) -> &BankingStageHelper {
self.banking_stage_helper.as_ref().unwrap()
}

fn clone_for_scheduler_thread(&self) -> Self {
let mut context = self.clone();
if self.banking_stage_helper.is_some() {
context.disable_banking_packet_handler();
}
context
}

fn disable_banking_packet_handler(&mut self) {
self.banking_packet_receiver = never();
self.banking_packet_handler =
Box::new(|_, _| unreachable!("paired with never() receiver, this cannot be called"));
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -433,32 +451,22 @@ where
max_usage_queue_count: usize,
timeout_duration: Duration,
) -> Arc<Self> {
let scheduler_pool = Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
block_production_scheduler_inner: Mutex::default(),
trashed_scheduler_inners: Mutex::default(),
timeout_listeners: Mutex::default(),
common_handler_context: CommonHandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
prioritization_fee_cache,
},
block_verification_handler_count,
banking_stage_handler_context: Mutex::default(),
weak_self: weak_self.clone(),
next_scheduler_id: AtomicSchedulerId::default(),
max_usage_queue_count,
_phantom: PhantomData,
});
let (scheduler_pool_sender, scheduler_pool_receiver) = crossbeam_channel::bounded(1);

let cleaner_main_loop = {
let weak_scheduler_pool = Arc::downgrade(&scheduler_pool);
let mut exiting = false;
let cleaner_main_loop = move || {
info!("cleaner_main_loop: started...");

move || loop {
sleep(pool_cleaner_interval);
let weak_scheduler_pool: Weak<Self> = scheduler_pool_receiver.recv().unwrap();
loop {
match scheduler_pool_receiver.recv_timeout(pool_cleaner_interval) {
Ok(_) => unreachable!(),
Err(RecvTimeoutError::Disconnected | RecvTimeoutError::Timeout) => (),
}

let Some(scheduler_pool) = weak_scheduler_pool.upgrade() else {
// this is the only safe termination point of cleaner_main_loop while all other
// `break`s being due to poisoned locks.
break;
};

Expand Down Expand Up @@ -490,6 +498,10 @@ where
};

let banking_stage_status = scheduler_pool.banking_stage_status();
if !exiting && matches!(banking_stage_status, Some(BankingStageStatus::Exited)) {
exiting = true;
scheduler_pool.unregister_banking_stage();
}

if matches!(banking_stage_status, Some(BankingStageStatus::Inactive)) {
let Ok(mut inner) = scheduler_pool.block_production_scheduler_inner.lock()
Expand Down Expand Up @@ -595,14 +607,39 @@ where
triggered_timeout_listener_count,
));
}
info!("cleaner_main_loop: ...finished");
};

// No need to join; the spawned main loop will gracefully exit.
thread::Builder::new()
let cleaner_thread = thread::Builder::new()
.name("solScCleaner".to_owned())
.spawn_tracked(cleaner_main_loop)
.unwrap();

let scheduler_pool = Arc::new_cyclic(|weak_self| Self {
scheduler_inners: Mutex::default(),
block_production_scheduler_inner: Mutex::default(),
trashed_scheduler_inners: Mutex::default(),
timeout_listeners: Mutex::default(),
common_handler_context: CommonHandlerContext {
log_messages_bytes_limit,
transaction_status_sender,
replay_vote_sender,
prioritization_fee_cache,
},
block_verification_handler_count,
banking_stage_handler_context: Mutex::default(),
weak_self: weak_self.clone(),
next_scheduler_id: AtomicSchedulerId::default(),
max_usage_queue_count,
scheduler_pool_sender: scheduler_pool_sender.clone(),
cleaner_thread,
_phantom: PhantomData,
});

scheduler_pool_sender
.send(Arc::downgrade(&scheduler_pool))
.unwrap();

scheduler_pool
}

Expand Down Expand Up @@ -751,6 +788,19 @@ where
);
}

fn unregister_banking_stage(&self) {
let handler_context = &mut self.banking_stage_handler_context.lock().unwrap();
let handler_context = handler_context.as_mut().unwrap();
// Replace with dummy ones to unblock validator shutdown.
// Note that replacing banking_stage_handler_context with None altogether will create a
// very short window of race condition due to untimely spawning of block production
// scheduler.
handler_context.banking_packet_receiver = never();
handler_context.banking_packet_handler =
Box::new(|_, _| unreachable!("paired with never() receiver, this cannot be called"));
handler_context.banking_stage_monitor = Box::new(ExitedBankingMonitor);
}

fn banking_stage_status(&self) -> Option<BankingStageStatus> {
self.banking_stage_handler_context
.lock()
Expand Down Expand Up @@ -782,7 +832,9 @@ where
self.block_verification_handler_count,
// Return various type-specific no-op values.
never(),
Box::new(|_, _| {}),
Box::new(|_, _| {
unreachable!("paired with never() receiver, this cannot be called")
}),
None,
None,
)
Expand Down Expand Up @@ -884,6 +936,48 @@ where
.unwrap()
.push((timeout_listener, Instant::now()));
}

fn uninstalled_from_bank_forks(self: Arc<Self>) {
info!("SchedulerPool::uninstalled_from_bank_forks(): started...");

// Forcibly return back all taken schedulers back to this scheduler pool.
for (listener, _registered_at) in mem::take(&mut *self.timeout_listeners.lock().unwrap()) {
listener.trigger(self.clone());
}

// Then, drop all schedulers in the pool.
mem::take(&mut *self.scheduler_inners.lock().unwrap());
mem::take(&mut *self.block_production_scheduler_inner.lock().unwrap());
mem::take(&mut *self.trashed_scheduler_inners.lock().unwrap());

// At this point, all circular references of this pool has been cut. And there should be
// only 1 strong rerefence unless the cleaner thread is active right now.

// So, wait a bit to unwrap the pool out of the sinful Arc finally here. Note that we can't resort to the
// Drop impl, because of the need to take the ownership of the join handle of the cleaner
// thread...
let mut this = self;
let mut this: Self = loop {
match Arc::try_unwrap(this) {
Ok(pool) => {
break pool;
}
Err(that) => {
// It seems solScCleaner is active... retry later
this = that;
sleep(Duration::from_millis(100));
// Yes, indefinite loop, but the situation isn't so different from the
// following join(), which indefinitely waits as well.
continue;
}
}
};
// Accelerate cleaner thread joining by disconnection
this.scheduler_pool_sender = crossbeam_channel::bounded(1).0;
this.cleaner_thread.join().unwrap();

info!("SchedulerPool::uninstalled_from_bank_forks(): ...finished");
}
}

pub trait TaskHandler: Send + Sync + Debug + Sized + 'static {
Expand Down Expand Up @@ -1828,7 +1922,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
let scheduler_main_loop = {
let handler_context = handler_context.clone();
let handler_context = handler_context.clone_for_scheduler_thread();
let session_result_sender = self.session_result_sender.clone();
// Taking new_task_receiver here is important to ensure there's a single receiver. In
// this way, the replay stage will get .send() failures reliably, after this scheduler
Expand Down Expand Up @@ -2167,7 +2261,12 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {

let Ok(banking_packet) = banking_packet else {
info!("disconnected banking_packet_receiver");
break;
// Don't break here; handler threads are expected to outlive its
// associated scheduler thread always. So, disable banking packet
// handler then continue to be cleaned up properly later, much like
// block verification handler thread.
handler_context.disable_banking_packet_handler();
continue;
};
banking_packet_handler(banking_stage_helper, banking_packet);
continue;
Expand Down Expand Up @@ -2461,12 +2560,22 @@ impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
pub enum BankingStageStatus {
Active,
Inactive,
Exited,
}

pub trait BankingStageMonitor: Send + Debug {
fn status(&mut self) -> BankingStageStatus;
}

#[derive(Debug)]
struct ExitedBankingMonitor;

impl BankingStageMonitor for ExitedBankingMonitor {
fn status(&mut self) -> BankingStageStatus {
BankingStageStatus::Exited
}
}

impl<TH: TaskHandler> InstalledScheduler for PooledScheduler<TH> {
fn id(&self) -> SchedulerId {
self.inner.id()
Expand Down
Loading