From ebf62ae0bca1c3c5655f7f77f86750c4c5bc3c5b Mon Sep 17 00:00:00 2001 From: OliverNChalk <11343499+OliverNChalk@users.noreply.github.com> Date: Tue, 20 Jan 2026 17:51:25 +0100 Subject: [PATCH] fix(banking-stage): shutdown log spam --- core/src/banking_stage.rs | 19 ++++++++++++++++--- core/src/banking_stage/tpu_to_pack.rs | 9 +++++++-- .../scheduler_controller.rs | 10 +++++----- core/src/banking_stage/vote_worker.rs | 16 ++++++++++++++-- 4 files changed, 42 insertions(+), 12 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 0caa0b34c57c31..aaaf03c8d39b00 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -605,12 +605,13 @@ impl BankingStage { macro_rules! spawn_scheduler { ($scheduler:ident) => { let exit = exit.clone(); + let shutdown_signal = self.banking_shutdown_signal.clone(); let bank_forks = self.bank_forks.clone(); threads.push( Builder::new() .name("solBnkTxSched".to_string()) .spawn(move || { - let scheduler_controller = SchedulerController::new( + let mut scheduler_controller = SchedulerController::new( exit, scheduler_config, decision_maker, @@ -621,8 +622,17 @@ impl BankingStage { ); match scheduler_controller.run() { - Ok(_) => {} - Err(SchedulerError::DisconnectedRecvChannel(_)) => {} + Ok(_) => info!("Scheduler exiting without error"), + Err(SchedulerError::DisconnectedRecvChannel(_)) => { + info!("Upstream disconnected, shutting down banking"); + + // NB: We must signal shutdown before dropping the scheduler + // controller, else, the workers may exit with an error and + // trigger a new spawn before we have a chance to issue the + // cancel. + shutdown_signal.cancel(); + drop(scheduler_controller); + } Err(SchedulerError::DisconnectedSendChannel(_)) => { warn!("Unexpected worker disconnect from scheduler") } @@ -684,12 +694,14 @@ impl BankingStage { let decision_maker = DecisionMaker::from(self.poh_recorder.read().unwrap().deref()); let worker_exit_signal = self.worker_exit_signal.clone(); + let shutdown_signal = self.banking_shutdown_signal.clone(); let bank_forks = self.bank_forks.clone(); Builder::new() .name("solBanknStgVote".to_string()) .spawn(move || { VoteWorker::new( worker_exit_signal, + shutdown_signal, decision_maker, tpu_receiver, gossip_receiver, @@ -809,6 +821,7 @@ mod external { // Spawn tpu to pack. threads.push(tpu_to_pack::spawn( self.worker_exit_signal.clone(), + self.banking_shutdown_signal.clone(), tpu_to_pack_receivers, tpu_to_pack, )); diff --git a/core/src/banking_stage/tpu_to_pack.rs b/core/src/banking_stage/tpu_to_pack.rs index 50864cdbf6a8a4..9c2a1ddd56d65a 100644 --- a/core/src/banking_stage/tpu_to_pack.rs +++ b/core/src/banking_stage/tpu_to_pack.rs @@ -18,6 +18,7 @@ use { thread::JoinHandle, time::Duration, }, + tokio_util::sync::CancellationToken, }; pub struct BankingPacketReceivers { @@ -29,6 +30,7 @@ pub struct BankingPacketReceivers { /// Spawns a thread to receive packets from TPU and send them to the external scheduler. pub fn spawn( exit: Arc, + shutdown_signal: CancellationToken, receivers: BankingPacketReceivers, AgaveTpuToPackSession { allocator, @@ -38,13 +40,14 @@ pub fn spawn( std::thread::Builder::new() .name("solTpu2Pack".to_string()) .spawn(move || { - tpu_to_pack(exit, receivers, allocator, producer); + tpu_to_pack(exit, shutdown_signal, receivers, allocator, producer); }) .unwrap() } fn tpu_to_pack( exit: Arc, + shutdown_signal: CancellationToken, receivers: BankingPacketReceivers, allocator: Allocator, mut producer: shaq::Producer, @@ -68,7 +71,9 @@ fn tpu_to_pack( } { Ok(packet_batches) => packet_batches, Err(crossbeam_channel::RecvError) => { - // Senders have been dropped, exit the loop. + // Senders have been dropped, signal shutdown and exit. + shutdown_signal.cancel(); + break; } }; diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index 7eb7de23b0ee7a..d98bb8aefeec37 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -112,7 +112,7 @@ where } } - pub fn run(mut self) -> Result<(), SchedulerError> { + pub fn run(&mut self) -> Result<(), SchedulerError> { let mut most_recent_leader_slot = None; let mut cost_pacer = None; @@ -176,9 +176,9 @@ where self.receive_completed()?; self.process_transactions(&decision, cost_pacer.as_ref(), &now)?; - if self.receive_and_buffer_packets(&decision).is_err() { - break; - } + self.receive_and_buffer_packets(&decision).map_err(|_| { + SchedulerError::DisconnectedRecvChannel("receive and buffer disconnected") + })?; // Report metrics only if there is data. // Reset intervals when appropriate, regardless of report. let should_report = self.count_metrics.interval_has_data(); @@ -634,7 +634,7 @@ mod tests { #[test] #[should_panic(expected = "batch id 0 is not being tracked")] fn test_unexpected_batch_id() { - let (test_frame, scheduler_controller) = + let (test_frame, mut scheduler_controller) = create_test_frame(1, test_create_transaction_view_receive_and_buffer); let TestFrame { finished_consume_work_sender, diff --git a/core/src/banking_stage/vote_worker.rs b/core/src/banking_stage/vote_worker.rs index b5ad62b7a6bd32..ce13ae112cedab 100644 --- a/core/src/banking_stage/vote_worker.rs +++ b/core/src/banking_stage/vote_worker.rs @@ -43,6 +43,7 @@ use { }, time::Instant, }, + tokio_util::sync::CancellationToken, }; mod transaction { @@ -56,6 +57,7 @@ pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 16; pub struct VoteWorker { exit: Arc, + shutdown_signal: CancellationToken, decision_maker: DecisionMaker, tpu_receiver: VotePacketReceiver, gossip_receiver: VotePacketReceiver, @@ -67,6 +69,7 @@ pub struct VoteWorker { impl VoteWorker { pub fn new( exit: Arc, + shutdown_signal: CancellationToken, decision_maker: DecisionMaker, tpu_receiver: VotePacketReceiver, gossip_receiver: VotePacketReceiver, @@ -76,6 +79,7 @@ impl VoteWorker { ) -> Self { Self { exit, + shutdown_signal, decision_maker, tpu_receiver, gossip_receiver, @@ -110,7 +114,11 @@ impl VoteWorker { VoteSource::Tpu, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Disconnected) => { + self.shutdown_signal.cancel(); + + break; + } } // Check for new packets from the gossip receiver match self.gossip_receiver.receive_and_buffer_packets( @@ -120,7 +128,11 @@ impl VoteWorker { VoteSource::Gossip, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), - Err(RecvTimeoutError::Disconnected) => break, + Err(RecvTimeoutError::Disconnected) => { + self.shutdown_signal.cancel(); + + break; + } } banking_stage_stats.report(1000); }