-
-
Notifications
You must be signed in to change notification settings - Fork 861
fear(allocator): introduce CapacityLimit for FixedSizedAllocator #17014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,7 +3,7 @@ use std::{ | |||||||||||||||||||||
| mem::{self, ManuallyDrop}, | ||||||||||||||||||||||
| ptr::NonNull, | ||||||||||||||||||||||
| sync::{ | ||||||||||||||||||||||
| Mutex, | ||||||||||||||||||||||
| Condvar, Mutex, | ||||||||||||||||||||||
| atomic::{AtomicBool, AtomicU32, Ordering}, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
|
|
@@ -18,43 +18,80 @@ use crate::{ | |||||||||||||||||||||
| const TWO_GIB: usize = 1 << 31; | ||||||||||||||||||||||
| const FOUR_GIB: usize = 1 << 32; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Capacity limiter for a pool that blocks when the maximum capacity is reached. | ||||||||||||||||||||||
| struct CapacityLimit { | ||||||||||||||||||||||
| /// Maximum number of items that can be created | ||||||||||||||||||||||
| max_count: u32, | ||||||||||||||||||||||
| /// Condition variable to signal when an item is returned to the pool | ||||||||||||||||||||||
| available: Condvar, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| impl CapacityLimit { | ||||||||||||||||||||||
| /// Create a new [`CapacityLimit`] with the given maximum count. | ||||||||||||||||||||||
| fn new(max_count: u32) -> Self { | ||||||||||||||||||||||
| Self { max_count, available: Condvar::new() } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Check if the pool is at capacity. | ||||||||||||||||||||||
| fn is_at_capacity(&self, current_count: u32) -> bool { | ||||||||||||||||||||||
| current_count >= self.max_count | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Wait for an item to become available and return it. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// This method blocks until an item is returned to the pool. | ||||||||||||||||||||||
| fn wait_for_item<T>(&self, mut items: std::sync::MutexGuard<'_, Vec<T>>) -> T { | ||||||||||||||||||||||
| loop { | ||||||||||||||||||||||
| items = self.available.wait(items).unwrap(); | ||||||||||||||||||||||
| if let Some(item) = items.pop() { | ||||||||||||||||||||||
| return item; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Notify one waiting thread that an item is available. | ||||||||||||||||||||||
| fn notify_available(&self) { | ||||||||||||||||||||||
| self.available.notify_one(); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// A thread-safe pool for reusing [`Allocator`] instances, that uses fixed-size allocators, | ||||||||||||||||||||||
| /// suitable for use with raw transfer. | ||||||||||||||||||||||
| pub struct FixedSizeAllocatorPool { | ||||||||||||||||||||||
| /// Allocators in the pool | ||||||||||||||||||||||
| allocators: Mutex<Vec<FixedSizeAllocator>>, | ||||||||||||||||||||||
| /// ID to assign to next `Allocator` that's created | ||||||||||||||||||||||
| next_id: AtomicU32, | ||||||||||||||||||||||
| /// Capacity limiter. `None` means no limit (default). | ||||||||||||||||||||||
| capacity_limit: Option<CapacityLimit>, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| impl FixedSizeAllocatorPool { | ||||||||||||||||||||||
| /// Create a new [`FixedSizeAllocatorPool`] for use across the specified number of threads. | ||||||||||||||||||||||
| pub fn new(thread_count: usize) -> FixedSizeAllocatorPool { | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// If `max_count` is `Some`, the pool will block when trying to get an allocator | ||||||||||||||||||||||
| /// if the maximum number of allocators has been reached, waiting until one is returned. | ||||||||||||||||||||||
| /// If `max_count` is `None`, there is no limit on the number of allocators. | ||||||||||||||||||||||
| pub fn new(thread_count: usize, max_count: Option<u32>) -> FixedSizeAllocatorPool { | ||||||||||||||||||||||
| // Each allocator consumes a large block of memory, so create them on demand instead of upfront, | ||||||||||||||||||||||
| // in case not all threads end up being used (e.g. language server without `import` plugin) | ||||||||||||||||||||||
| let allocators = Vec::with_capacity(thread_count); | ||||||||||||||||||||||
| FixedSizeAllocatorPool { allocators: Mutex::new(allocators), next_id: AtomicU32::new(0) } | ||||||||||||||||||||||
| FixedSizeAllocatorPool { | ||||||||||||||||||||||
| allocators: Mutex::new(allocators), | ||||||||||||||||||||||
| next_id: AtomicU32::new(0), | ||||||||||||||||||||||
| capacity_limit: max_count.map(CapacityLimit::new), | ||||||||||||||||||||||
|
||||||||||||||||||||||
| capacity_limit: max_count.map(CapacityLimit::new), | |
| capacity_limit: max_count.filter(|&count| count > 0).map(CapacityLimit::new), |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a race condition between checking capacity and incrementing next_id. Multiple threads could simultaneously:
- Check that allocators.pop() returns None (line 110)
- Load current_count and see it's below max_count (line 116)
- All pass the capacity check (line 117)
- All increment next_id with fetch_add (line 126)
This allows the pool to create more allocators than max_count allows. The capacity check needs to happen atomically with the ID increment, or use a different synchronization approach.
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new capacity limiting behavior lacks test coverage. Consider adding tests to verify: (1) blocking behavior when capacity is reached, (2) correct unblocking when allocators are returned, (3) race condition handling when multiple threads try to acquire allocators simultaneously, and (4) correct behavior when max_count is None.
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutex guard is dropped before notifying waiting threads. This creates a window where a thread waiting in wait_for_item could be notified but another thread could acquire the lock first and pop the allocator, causing the notified thread to wait again unnecessarily. Consider moving the notify_available call inside the mutex-protected block, or document why this ordering is intentional.
| } | |
| // Notify one waiting thread that an allocator is available (if capacity is limited) | |
| if let Some(capacity_limit) = &self.capacity_limit { | |
| capacity_limit.notify_available(); | |
| // Notify one waiting thread that an allocator is available (if capacity is limited) | |
| if let Some(capacity_limit) = &self.capacity_limit { | |
| capacity_limit.notify_available(); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The unwrap() in wait_for_item could panic if the mutex is poisoned, but this is not documented. The panic behavior should either be documented in the method's docstring or the error should be propagated to the caller.