Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,12 +605,13 @@ impl BankingStage {
macro_rules! spawn_scheduler {
($scheduler:ident) => {
let exit = exit.clone();
let shutdown_signal = self.banking_shutdown_signal.clone();
let bank_forks = self.bank_forks.clone();
threads.push(
Builder::new()
.name("solBnkTxSched".to_string())
.spawn(move || {
let scheduler_controller = SchedulerController::new(
let mut scheduler_controller = SchedulerController::new(
Comment thread
OliverNChalk marked this conversation as resolved.
exit,
scheduler_config,
decision_maker,
Expand All @@ -621,8 +622,17 @@ impl BankingStage {
);

match scheduler_controller.run() {
Ok(_) => {}
Err(SchedulerError::DisconnectedRecvChannel(_)) => {}
Ok(_) => info!("Scheduler exiting without error"),
Err(SchedulerError::DisconnectedRecvChannel(_)) => {
info!("Upstream disconnected, shutting down banking");

// NB: We must signal shutdown before dropping the scheduler
// controller, else, the workers may exit with an error and
// trigger a new spawn before we have a chance to issue the
// cancel.
shutdown_signal.cancel();
drop(scheduler_controller);
Comment thread
tao-stones marked this conversation as resolved.
}
Err(SchedulerError::DisconnectedSendChannel(_)) => {
warn!("Unexpected worker disconnect from scheduler")
}
Expand Down Expand Up @@ -684,12 +694,14 @@ impl BankingStage {
let decision_maker = DecisionMaker::from(self.poh_recorder.read().unwrap().deref());

let worker_exit_signal = self.worker_exit_signal.clone();
let shutdown_signal = self.banking_shutdown_signal.clone();
let bank_forks = self.bank_forks.clone();
Builder::new()
.name("solBanknStgVote".to_string())
.spawn(move || {
VoteWorker::new(
worker_exit_signal,
shutdown_signal,
decision_maker,
tpu_receiver,
gossip_receiver,
Expand Down Expand Up @@ -809,6 +821,7 @@ mod external {
// Spawn tpu to pack.
threads.push(tpu_to_pack::spawn(
self.worker_exit_signal.clone(),
self.banking_shutdown_signal.clone(),
tpu_to_pack_receivers,
tpu_to_pack,
));
Expand Down
9 changes: 7 additions & 2 deletions core/src/banking_stage/tpu_to_pack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
thread::JoinHandle,
time::Duration,
},
tokio_util::sync::CancellationToken,
};

pub struct BankingPacketReceivers {
Expand All @@ -29,6 +30,7 @@ pub struct BankingPacketReceivers {
/// Spawns a thread to receive packets from TPU and send them to the external scheduler.
pub fn spawn(
exit: Arc<AtomicBool>,
shutdown_signal: CancellationToken,
receivers: BankingPacketReceivers,
AgaveTpuToPackSession {
allocator,
Expand All @@ -38,13 +40,14 @@ pub fn spawn(
std::thread::Builder::new()
.name("solTpu2Pack".to_string())
.spawn(move || {
tpu_to_pack(exit, receivers, allocator, producer);
tpu_to_pack(exit, shutdown_signal, receivers, allocator, producer);
})
.unwrap()
}

fn tpu_to_pack(
exit: Arc<AtomicBool>,
shutdown_signal: CancellationToken,
receivers: BankingPacketReceivers,
allocator: Allocator,
mut producer: shaq::Producer<TpuToPackMessage>,
Expand All @@ -68,7 +71,9 @@ fn tpu_to_pack(
} {
Ok(packet_batches) => packet_batches,
Err(crossbeam_channel::RecvError) => {
// Senders have been dropped, exit the loop.
// Senders have been dropped, signal shutdown and exit.
shutdown_signal.cancel();

break;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where
}
}

pub fn run(mut self) -> Result<(), SchedulerError> {
pub fn run(&mut self) -> Result<(), SchedulerError> {
let mut most_recent_leader_slot = None;
let mut cost_pacer = None;

Expand Down Expand Up @@ -176,9 +176,9 @@ where

self.receive_completed()?;
self.process_transactions(&decision, cost_pacer.as_ref(), &now)?;
if self.receive_and_buffer_packets(&decision).is_err() {
break;
}
self.receive_and_buffer_packets(&decision).map_err(|_| {
SchedulerError::DisconnectedRecvChannel("receive and buffer disconnected")
})?;
// Report metrics only if there is data.
// Reset intervals when appropriate, regardless of report.
let should_report = self.count_metrics.interval_has_data();
Expand Down Expand Up @@ -634,7 +634,7 @@ mod tests {
#[test]
#[should_panic(expected = "batch id 0 is not being tracked")]
fn test_unexpected_batch_id() {
let (test_frame, scheduler_controller) =
let (test_frame, mut scheduler_controller) =
create_test_frame(1, test_create_transaction_view_receive_and_buffer);
let TestFrame {
finished_consume_work_sender,
Expand Down
16 changes: 14 additions & 2 deletions core/src/banking_stage/vote_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use {
},
time::Instant,
},
tokio_util::sync::CancellationToken,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this suitable outside async contexts?

Copy link
Copy Markdown
Author

@OliverNChalk OliverNChalk Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty sure, it's just either an atomic operation if the runtime isnt parked or a syscall to unpark the runtime (should be a write syscall for our single threaded runtime).

Fairly sure this is where we get to on our sync caller side:

    pub fn wake(self) {
        // The actual wakeup call is delegated through a virtual function call
        // to the implementation which is defined by the executor.

        // Don't call `drop` -- the waker will be consumed by `wake`.
        let this = ManuallyDrop::new(self);

        // SAFETY: This is safe because `Waker::from_raw` is the only way
        // to initialize `wake` and `data` requiring the user to acknowledge
        // that the contract of `RawWaker` is upheld.
        unsafe { (this.waker.vtable.wake)(this.waker.data) };
    }

After this the queued waker runs which is type erased. Most likely this is the wake method on the other side:

    pub(crate) fn wake(&self) -> io::Result<()> {
        // The epoll emulation on some illumos systems currently requires
        // the eventfd to be read before an edge-triggered read event is
        // generated.
        // See https://www.illumos.org/issues/16700.
        #[cfg(target_os = "illumos")]
        self.reset()?;

        let buf: [u8; 8] = 1u64.to_ne_bytes();
        match (&self.fd).write(&buf) {
            Ok(_) => Ok(()),
            Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
                // Writing only blocks if the counter is going to overflow.
                // So we'll reset the counter to 0 and wake it again.
                self.reset()?;
                self.wake()
            }
            Err(err) => Err(err),
        }
    }

};

mod transaction {
Expand All @@ -56,6 +57,7 @@ pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 16;

pub struct VoteWorker {
exit: Arc<AtomicBool>,
shutdown_signal: CancellationToken,
Comment thread
apfitzge marked this conversation as resolved.
decision_maker: DecisionMaker,
tpu_receiver: VotePacketReceiver,
gossip_receiver: VotePacketReceiver,
Expand All @@ -67,6 +69,7 @@ pub struct VoteWorker {
impl VoteWorker {
pub fn new(
exit: Arc<AtomicBool>,
shutdown_signal: CancellationToken,
decision_maker: DecisionMaker,
tpu_receiver: VotePacketReceiver,
gossip_receiver: VotePacketReceiver,
Expand All @@ -76,6 +79,7 @@ impl VoteWorker {
) -> Self {
Self {
exit,
shutdown_signal,
decision_maker,
tpu_receiver,
gossip_receiver,
Expand Down Expand Up @@ -110,7 +114,11 @@ impl VoteWorker {
VoteSource::Tpu,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Disconnected) => {
self.shutdown_signal.cancel();

break;
}
}
// Check for new packets from the gossip receiver
match self.gossip_receiver.receive_and_buffer_packets(
Expand All @@ -120,7 +128,11 @@ impl VoteWorker {
VoteSource::Gossip,
) {
Ok(()) | Err(RecvTimeoutError::Timeout) => (),
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Disconnected) => {
self.shutdown_signal.cancel();

break;
}
}
banking_stage_stats.report(1000);
}
Expand Down
Loading