diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index 41dcca566b7d8d..f8f1a9ec756abd 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -92,6 +92,7 @@ pub struct SchedulerPool, TH: TaskHandler> { // memory increase. weak_self: Weak, next_scheduler_id: AtomicSchedulerId, + max_usage_queue_count: usize, _phantom: PhantomData, } @@ -108,6 +109,21 @@ pub type DefaultSchedulerPool = const DEFAULT_POOL_CLEANER_INTERVAL: Duration = Duration::from_secs(10); const DEFAULT_MAX_POOLING_DURATION: Duration = Duration::from_secs(180); +// Rough estimate of max UsageQueueLoader size in bytes: +// UsageFromTask * UsageQueue's capacity * DEFAULT_MAX_USAGE_QUEUE_COUNT +// 16 bytes * 128 items * 262_144 entries == 512 MiB +// It's expected that there will be 2 or 3 pooled schedulers constantly when running against +// mainnnet-beta. That means the total memory consumption for the idle close-to-be-trashed pooled +// schedulers is set to 1.0 ~ 1.5 GiB. This value is chosen to maximize performance under the +// normal cluster condition to avoid memory reallocation as much as possible. That said, it's not +// likely this would allow unbounded memory growth when the cluster is unstable or under some kind +// of attacks. That's because this limit is enforced at every slot and the UsageQueueLoader itself +// is recreated without any entries at first, needing to repopulate by means of actual use to eat +// the memory. +// +// Along the lines, this isn't problematic for the development settings (= solana-test-validator), +// because UsageQueueLoader won't grow that much to begin with. +const DEFAULT_MAX_USAGE_QUEUE_COUNT: usize = 262_144; impl SchedulerPool where @@ -132,6 +148,7 @@ where prioritization_fee_cache, DEFAULT_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, + DEFAULT_MAX_USAGE_QUEUE_COUNT, ) } @@ -143,6 +160,7 @@ where prioritization_fee_cache: Arc, pool_cleaner_interval: Duration, max_pooling_duration: Duration, + max_usage_queue_count: usize, ) -> Arc { let handler_count = handler_count.unwrap_or(Self::default_handler_count()); assert!(handler_count >= 1); @@ -159,6 +177,7 @@ where }, weak_self: weak_self.clone(), next_scheduler_id: AtomicSchedulerId::default(), + max_usage_queue_count, _phantom: PhantomData, }); @@ -577,6 +596,10 @@ impl UsageQueueLoader { pub fn load(&self, address: Pubkey) -> UsageQueue { self.usage_queues.entry(address).or_default().clone() } + + fn count(&self) -> usize { + self.usage_queues.len() + } } // (this is slow needing atomic mem reads. However, this can be turned into a lot faster @@ -648,6 +671,10 @@ where } fn is_trashed(&self) -> bool { + self.is_aborted() || self.is_overgrown() + } + + fn is_aborted(&self) -> bool { // Schedulers can be regarded as being _trashed_ (thereby will be cleaned up later), if // threads are joined. Remember that unified scheduler _doesn't normally join threads_ even // across different sessions (i.e. different banks) to avoid thread recreation overhead. @@ -666,6 +693,10 @@ where // scheduler to the pool, considering is_trashed() is checked immediately before that. self.thread_manager.are_threads_joined() } + + fn is_overgrown(&self) -> bool { + self.usage_queue_loader.count() > self.thread_manager.pool.max_usage_queue_count + } } // This type manages the OS threads for scheduling and executing transactions. The term @@ -1426,6 +1457,7 @@ mod tests { ignored_prioritization_fee_cache, SHORTENED_POOL_CLEANER_INTERVAL, shortened_max_pooling_duration, + DEFAULT_MAX_USAGE_QUEUE_COUNT, ); let pool = pool_raw.clone(); let bank = Arc::new(Bank::default_for_tests()); @@ -1461,6 +1493,77 @@ mod tests { ); } + #[test] + fn test_scheduler_drop_overgrown() { + solana_logger::setup(); + + let _progress = sleepless_testing::setup(&[ + &TestCheckPoint::BeforeTrashedSchedulerCleaned, + &CheckPoint::TrashedSchedulerCleaned(0), + &CheckPoint::TrashedSchedulerCleaned(1), + &TestCheckPoint::AfterTrashedSchedulerCleaned, + ]); + + let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); + const REDUCED_MAX_USAGE_QUEUE_COUNT: usize = 1; + let pool_raw = DefaultSchedulerPool::do_new( + None, + None, + None, + None, + ignored_prioritization_fee_cache, + SHORTENED_POOL_CLEANER_INTERVAL, + DEFAULT_MAX_POOLING_DURATION, + REDUCED_MAX_USAGE_QUEUE_COUNT, + ); + let pool = pool_raw.clone(); + let bank = Arc::new(Bank::default_for_tests()); + let context1 = SchedulingContext::new(bank); + let context2 = context1.clone(); + + let small_scheduler = pool.do_take_scheduler(context1); + let small_scheduler_id = small_scheduler.id(); + for _ in 0..REDUCED_MAX_USAGE_QUEUE_COUNT { + small_scheduler + .inner + .usage_queue_loader + .load(Pubkey::new_unique()); + } + let big_scheduler = pool.do_take_scheduler(context2); + for _ in 0..REDUCED_MAX_USAGE_QUEUE_COUNT + 1 { + big_scheduler + .inner + .usage_queue_loader + .load(Pubkey::new_unique()); + } + + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 0); + assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); + Box::new(small_scheduler.into_inner().1).return_to_pool(); + Box::new(big_scheduler.into_inner().1).return_to_pool(); + + // Block solScCleaner until we see trashed schedler... + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 1); + sleepless_testing::at(TestCheckPoint::BeforeTrashedSchedulerCleaned); + + // See the trashed scheduler gone only after solScCleaner did its job... + sleepless_testing::at(&TestCheckPoint::AfterTrashedSchedulerCleaned); + assert_eq!(pool_raw.scheduler_inners.lock().unwrap().len(), 1); + assert_eq!(pool_raw.trashed_scheduler_inners.lock().unwrap().len(), 0); + assert_eq!( + pool_raw + .scheduler_inners + .lock() + .unwrap() + .first() + .as_ref() + .map(|(inner, _pooled_at)| inner.id()) + .unwrap(), + small_scheduler_id + ); + } + enum AbortCase { Unhandled, UnhandledWhilePanicking, @@ -1816,6 +1919,7 @@ mod tests { ignored_prioritization_fee_cache, SHORTENED_POOL_CLEANER_INTERVAL, DEFAULT_MAX_POOLING_DURATION, + DEFAULT_MAX_USAGE_QUEUE_COUNT, ); let pool = pool_raw.clone(); let context = SchedulingContext::new(bank.clone());