Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ clone_trait_object!(BankingPacketHandler);
pub struct BankingStageHelper {
usage_queue_loader: UsageQueueLoader,
next_task_id: AtomicUsize,
new_task_sender: Sender<NewTaskPayload>,
new_task_sender: Weak<Sender<NewTaskPayload>>,
Copy link
Copy Markdown
Author

@apfitzge apfitzge Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is least invasive change to fix the circular dependency on the sender/recv connection in scheduler.
We may want to re-structure the scheduler thread to not hold the entire HandlerContext

}

impl BankingStageHelper {
fn new(new_task_sender: Sender<NewTaskPayload>) -> Self {
fn new(new_task_sender: Weak<Sender<NewTaskPayload>>) -> Self {
Self {
usage_queue_loader: UsageQueueLoader::default(),
next_task_id: AtomicUsize::default(),
Expand Down Expand Up @@ -296,6 +296,8 @@ impl BankingStageHelper {

pub fn send_new_task(&self, task: Task) {
self.new_task_sender
.upgrade()
.unwrap()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead just drop the task if the upgrade fails?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved the current behavior of panicing on failure to send; realistically we probably want to return an error and break whatever loops we're in wherever we send these

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm okay w/ following up on that separately

.send(NewTaskPayload::Payload(task))
.unwrap();
}
Expand Down Expand Up @@ -619,7 +621,7 @@ where
fn create_handler_context(
&self,
mode: SchedulingMode,
new_task_sender: &Sender<NewTaskPayload>,
new_task_sender: &Arc<Sender<NewTaskPayload>>,
) -> HandlerContext {
let (
thread_count,
Expand Down Expand Up @@ -652,7 +654,9 @@ where
handler_context.banking_thread_count,
handler_context.banking_packet_receiver.clone(),
handler_context.banking_packet_handler.clone(),
Some(Arc::new(BankingStageHelper::new(new_task_sender.clone()))),
Some(Arc::new(BankingStageHelper::new(Arc::downgrade(
new_task_sender,
)))),
Some(handler_context.transaction_recorder.clone()),
)
}
Expand Down Expand Up @@ -1207,7 +1211,7 @@ where

// Ensure to initiate thread shutdown via disconnected new_task_receiver by replacing the
// current new_task_sender with a random one...
self.new_task_sender = crossbeam_channel::unbounded().0;
self.new_task_sender = Arc::new(crossbeam_channel::unbounded().0);

self.ensure_join_threads(true);
assert_matches!(self.session_result_with_timings, Some((Ok(_), _)));
Expand Down Expand Up @@ -1241,7 +1245,11 @@ where
}

fn is_overgrown(&self) -> bool {
self.usage_queue_loader.count() > self.thread_manager.pool.max_usage_queue_count
self.thread_manager
.pool
.upgrade()
.map(|pool| self.usage_queue_loader.count() > pool.max_usage_queue_count)
.unwrap_or_default()
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT this value is used by cleaner to possibly drop or return to pool, so the return value here doesn't matter much if the pool no longer exists.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to call that out in a comment here or function header

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to think what we want to have happen here if we lose the handle to the pool... Do we want to consider it overgrown? Maybe this should be unwrap_or(true)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an odd set up, and it just can't really happen afaict.
Maybe we should just panic?

The current setup:

  • SchedulerPool::return_scheduler
    • is_trashed
      • is_overgrown

so is_overgrown is never called (in non-test code at least) unless we have a SchedulerPool.

This makes me wonder...could we just pass the pool into is_trashed?
The only other use of the pool is in return_to_pool, can we just pass it there somehow? Ultimatey it seems called by timeout listeners, which iirc are called by the cleaner loop, which has a weak reference to the pool itself and could pass it in.

That's a bit more restructuring, and I'm not 100% it would work - but would certainly simplify the ownership and reference model - ThreadManager would just no longer have a Pool reference at all.

}
}

Expand All @@ -1253,8 +1261,8 @@ where
#[derive(Debug)]
struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
scheduler_id: SchedulerId,
pool: Arc<SchedulerPool<S, TH>>,
new_task_sender: Sender<NewTaskPayload>,
pool: Weak<SchedulerPool<S, TH>>,
new_task_sender: Arc<Sender<NewTaskPayload>>,
new_task_receiver: Option<Receiver<NewTaskPayload>>,
session_result_sender: Sender<ResultWithTimings>,
session_result_receiver: Receiver<ResultWithTimings>,
Expand All @@ -1267,14 +1275,14 @@ struct HandlerPanicked;
type HandlerResult = std::result::Result<Box<ExecutedTask>, HandlerPanicked>;

impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn new(pool: Arc<SchedulerPool<S, TH>>) -> Self {
fn new(pool: &Arc<SchedulerPool<S, TH>>) -> Self {
let (new_task_sender, new_task_receiver) = crossbeam_channel::unbounded();
let (session_result_sender, session_result_receiver) = crossbeam_channel::unbounded();

Self {
scheduler_id: pool.new_scheduler_id(),
pool,
new_task_sender,
pool: Arc::downgrade(pool),
new_task_sender: Arc::new(new_task_sender),
new_task_receiver: Some(new_task_receiver),
session_result_sender,
session_result_receiver,
Expand Down Expand Up @@ -2170,7 +2178,7 @@ impl<TH: TaskHandler> SpawnableScheduler<TH> for PooledScheduler<TH> {
result_with_timings: ResultWithTimings,
) -> Self {
let mut inner = Self::Inner {
thread_manager: ThreadManager::new(pool.clone()),
thread_manager: ThreadManager::new(&pool),
usage_queue_loader: UsageQueueLoader::default(),
};
inner.thread_manager.start_threads(
Expand Down Expand Up @@ -2257,7 +2265,9 @@ where
TH: TaskHandler,
{
fn return_to_pool(self: Box<Self>) {
self.thread_manager.pool.clone().return_scheduler(*self);
if let Some(pool) = self.thread_manager.pool.upgrade() {
pool.clone().return_scheduler(*self);
}
}
}

Expand Down Expand Up @@ -3771,7 +3781,7 @@ mod tests {
&task,
&pool.create_handler_context(
BlockVerification,
&crossbeam_channel::unbounded().0,
&Arc::new(crossbeam_channel::unbounded().0),
),
);
(result, timings)
Expand Down
Loading