Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub struct SchedulerPool<S: SpawnableScheduler<TH>, TH: TaskHandler> {
// memory increase.
weak_self: Weak<Self>,
next_scheduler_id: AtomicSchedulerId,
max_usage_queue_count: usize,
_phantom: PhantomData<TH>,
}

Expand All @@ -108,6 +109,21 @@ pub type DefaultSchedulerPool =

const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10);
const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180);
// Rough estimate of max UsageQueueLoader size in bytes:
// UsageFromTask * UsageQueue's capacity * DEFAULT_MAX_USAGE_QUEUE_COUNT
// 16 bytes * 128 items * 262_144 entries == 512 MiB
// It's expected that there will be 2 or 3 pooled schedulers constantly when running against
// mainnnet-beta. That means the total memory consumption for the idle close-to-be-trashed pooled
// schedulers is set to 1.0 ~ 1.5 GiB. This value is chosen to maximize performance under the
// normal cluster condition to avoid memory reallocation as much as possible. That said, it's not
// likely this would allow unbounded memory growth when the cluster is unstable or under some kind
// of attacks. That's because this limit is enforced at every slot and the UsageQueueLoader itself
// is recreated without any entries at first, needing to repopulate by means of actual use to eat
// the memory.
//
// Along the lines, this isn't problematic for the development settings (= solana-test-validator),
// because UsageQueueLoader won't grow that much to begin with.
const DEFAULT_MAX_USAGE_QUEUE_COUNT: usize = 262_144;

impl<S, TH> SchedulerPool<S, TH>
where
Expand All @@ -132,6 +148,7 @@ where
prioritization_fee_cache,
DEFAULT_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
DEFAULT_MAX_USAGE_QUEUE_COUNT,
)
}

Expand All @@ -143,6 +160,7 @@ where
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
pool_cleaner_interval: Duration,
max_pooling_duration: Duration,
max_usage_queue_count: usize,
) -> Arc<Self> {
let handler_count = handler_count.unwrap_or(Self::default_handler_count());
assert!(handler_count >= 1);
Expand All @@ -159,6 +177,7 @@ where
},
weak_self: weak_self.clone(),
next_scheduler_id: AtomicSchedulerId::default(),
max_usage_queue_count,
_phantom: PhantomData,
});

Expand Down Expand Up @@ -577,6 +596,10 @@ impl UsageQueueLoader {
pub fn load(&self, address: Pubkey) -> UsageQueue {
self.usage_queues.entry(address).or_default().clone()
}

fn count(&self) -> usize {
self.usage_queues.len()
}
}

// (this is slow needing atomic mem reads. However, this can be turned into a lot faster
Expand Down Expand Up @@ -648,6 +671,10 @@ where
}

fn is_trashed(&self) -> bool {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so want to make a clarification here in the review, doesn't necessarily need a code comment.

This is only called after the scheduler is terminated due to abort or successful completion of the slot, is that right?
I want to make sure that the limits from is_overgrown will not cause us to skip a slot if such a large one comes through.

Copy link
Copy Markdown
Member Author

@ryoqun ryoqun Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only called after the scheduler is terminated due to abort or successful completion of the slot, is that right?

yes. it's only called by ::return_to_pool().

I want to make sure that the limits from is_overgrown will not cause us to skip a slot if such a large one comes through.

this is true, again.

self.is_aborted() || self.is_overgrown()
}

fn is_aborted(&self) -> bool {
// Schedulers can be regarded as being _trashed_ (thereby will be cleaned up later), if
// threads are joined. Remember that unified scheduler _doesn't normally join threads_ even
// across different sessions (i.e. different banks) to avoid thread recreation overhead.
Expand All @@ -666,6 +693,10 @@ where
// scheduler to the pool, considering is_trashed() is checked immediately before that.
self.thread_manager.are_threads_joined()
}

fn is_overgrown(&self) -> bool {
self.usage_queue_loader.count() > self.thread_manager.pool.max_usage_queue_count
}
}

// This type manages the OS threads for scheduling and executing transactions. The term
Expand Down Expand Up @@ -1426,6 +1457,7 @@ mod tests {
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
shortened_max_pooling_duration,
DEFAULT_MAX_USAGE_QUEUE_COUNT,
);
let pool = pool_raw.clone();
let bank = Arc::new(Bank::default_for_tests());
Expand Down Expand Up @@ -1461,6 +1493,77 @@ mod tests {
);
}

#[test]
fn test_scheduler_drop_overgrown() {
solana_logger::setup();

let _progress = sleepless_testing::setup(&[
&TestCheckPoint::BeforeTrashedSchedulerCleaned,
&CheckPoint::TrashedSchedulerCleaned(0),
&CheckPoint::TrashedSchedulerCleaned(1),
&TestCheckPoint::AfterTrashedSchedulerCleaned,
]);

let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
const REDUCED_MAX_USAGE_QUEUE_COUNT: usize = 1;
let pool_raw = DefaultSchedulerPool::do_new(
None,
None,
None,
None,
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
REDUCED_MAX_USAGE_QUEUE_COUNT,
);
let pool = pool_raw.clone();
let bank = Arc::new(Bank::default_for_tests());
let context1 = SchedulingContext::new(bank);
let context2 = context1.clone();

let small_scheduler = pool.do_take_scheduler(context1);
let small_scheduler_id = small_scheduler.id();
for _ in 0..REDUCED_MAX_USAGE_QUEUE_COUNT {
small_scheduler
.inner
.usage_queue_loader
.load(Pubkey::new_unique());
}
let big_scheduler = pool.do_take_scheduler(context2);
for _ in 0..REDUCED_MAX_USAGE_QUEUE_COUNT + 1 {
big_scheduler
.inner
.usage_queue_loader
.load(Pubkey::new_unique());
}

assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0);
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0);
Box::new(small_scheduler.into_inner().1).return_to_pool();
Box::new(big_scheduler.into_inner().1).return_to_pool();

// Block solScCleaner until we see trashed schedler...
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1);
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1);
sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned);

// See the trashed scheduler gone only after solScCleaner did its job...
sleepless_testing::at(&TestCheckPoint::AfterTrashedSchedulerCleaned);
assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1);
assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0);
assert_eq!(
pool_raw
.scheduler_inners
.lock()
.unwrap()
.first()
.as_ref()
.map(|(inner, _pooled_at)| inner.id())
.unwrap(),
small_scheduler_id
);
}

enum AbortCase {
Unhandled,
UnhandledWhilePanicking,
Expand Down Expand Up @@ -1816,6 +1919,7 @@ mod tests {
ignored_prioritization_fee_cache,
SHORTENED_POOL_CLEANER_INTERVAL,
DEFAULT_MAX_POOLING_DURATION,
DEFAULT_MAX_USAGE_QUEUE_COUNT,
);
let pool = pool_raw.clone();
let context = SchedulingContext::new(bank.clone());
Expand Down