Skip to content

Commit 117000f

Browse files
authored
Add lock-free BlockPageResource for ImmixSpace (#506)
This PR adds a lock-free block queue and uses it for Immix's clean block allocation (as `BlockPageResource`) and recyclable block allocation (as a reusable block queue). The lock-free block queue has per-thread producer endpoints. Only GC workers can add blocks to the queue. Workers will add the blocks to their thread-local queues first, and flush them to the global pool if the queue is full. Block allocation can only happen at block granularity. The consumers still share a single endpoint. But on block allocation fast-path, the cost is just a simple atomic increment to update the allocation cursor and pop a block. _However, for `BlockPageResource`, considering the current design of `Space.acquire`, it still acquires a lock before calling `pr.get_new_pages`._
1 parent 69b4fe4 commit 117000f

File tree

10 files changed

+546
-31
lines changed

10 files changed

+546
-31
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ num_cpus = "1.8"
3333
enum-map = "=2.1"
3434
downcast-rs = "1.1.1"
3535
atomic-traits = "0.2.0"
36-
atomic = "0.4.6"
36+
atomic = "0.5.1"
3737
num-traits = "0.2"
3838
spin = "0.5.2"
3939
env_logger = "0.8.2"

src/policy/immix/block.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use super::defrag::Histogram;
33
use super::line::Line;
44
use super::ImmixSpace;
55
use crate::util::constants::*;
6+
use crate::util::heap::blockpageresource::BlockPool;
67
use crate::util::linear_scan::{Region, RegionIterator};
78
use crate::util::metadata::side_metadata::{MetadataByteArrayRef, SideMetadataSpec};
89
use crate::util::Address;
910
use crate::vm::*;
10-
use spin::{Mutex, MutexGuard};
1111
use std::sync::atomic::Ordering;
1212

1313
/// The block allocation state.
@@ -271,39 +271,51 @@ impl Block {
271271
}
272272

273273
/// A non-block single-linked list to store blocks.
274-
#[derive(Default)]
275-
pub struct BlockList {
276-
queue: Mutex<Vec<Block>>,
274+
pub struct ReusableBlockPool {
275+
queue: BlockPool<Block>,
276+
num_workers: usize,
277277
}
278278

279-
impl BlockList {
279+
impl ReusableBlockPool {
280+
/// Create empty block list
281+
pub fn new(num_workers: usize) -> Self {
282+
Self {
283+
queue: BlockPool::new(num_workers),
284+
num_workers,
285+
}
286+
}
287+
280288
/// Get number of blocks in this list.
281-
#[inline]
289+
#[inline(always)]
282290
pub fn len(&self) -> usize {
283-
self.queue.lock().len()
291+
self.queue.len()
284292
}
285293

286294
/// Add a block to the list.
287-
#[inline]
295+
#[inline(always)]
288296
pub fn push(&self, block: Block) {
289-
self.queue.lock().push(block)
297+
self.queue.push(block)
290298
}
291299

292300
/// Pop a block out of the list.
293-
#[inline]
301+
#[inline(always)]
294302
pub fn pop(&self) -> Option<Block> {
295-
self.queue.lock().pop()
303+
self.queue.pop()
296304
}
297305

298306
/// Clear the list.
299-
#[inline]
300-
pub fn reset(&self) {
301-
*self.queue.lock() = Vec::new()
307+
pub fn reset(&mut self) {
308+
self.queue = BlockPool::new(self.num_workers);
302309
}
303310

304-
/// Get an array of all reusable blocks stored in this BlockList.
311+
/// Iterate all the blocks in the queue. Call the visitor for each reported block.
305312
#[inline]
306-
pub fn get_blocks(&self) -> MutexGuard<Vec<Block>> {
307-
self.queue.lock()
313+
pub fn iterate_blocks(&self, mut f: impl FnMut(Block)) {
314+
self.queue.iterate_blocks(&mut f);
315+
}
316+
317+
/// Flush the block queue
318+
pub fn flush_all(&self) {
319+
self.queue.flush_all();
308320
}
309321
}

src/policy/immix/chunk.rs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use crate::{
1010
MMTK,
1111
};
1212
use spin::Mutex;
13+
use std::sync::atomic::AtomicUsize;
14+
use std::sync::Arc;
1315
use std::{ops::Range, sync::atomic::Ordering};
1416

1517
/// Data structure to reference a MMTk 4 MB chunk.
@@ -160,7 +162,19 @@ impl ChunkMap {
160162
space: &'static ImmixSpace<VM>,
161163
) -> Vec<Box<dyn GCWork<VM>>> {
162164
space.defrag.mark_histograms.lock().clear();
163-
self.generate_tasks(|chunk| Box::new(SweepChunk { space, chunk }))
165+
let epilogue = Arc::new(FlushPageResource {
166+
space,
167+
counter: AtomicUsize::new(0),
168+
});
169+
let tasks = self.generate_tasks(|chunk| {
170+
Box::new(SweepChunk {
171+
space,
172+
chunk,
173+
epilogue: epilogue.clone(),
174+
})
175+
});
176+
epilogue.counter.store(tasks.len(), Ordering::SeqCst);
177+
tasks
164178
}
165179
}
166180

@@ -174,6 +188,8 @@ impl Default for ChunkMap {
174188
struct SweepChunk<VM: VMBinding> {
175189
space: &'static ImmixSpace<VM>,
176190
chunk: Chunk,
191+
/// A destructor invoked when all `SweepChunk` packets are finished.
192+
epilogue: Arc<FlushPageResource<VM>>,
177193
}
178194

179195
impl<VM: VMBinding> GCWork<VM> for SweepChunk<VM> {
@@ -184,5 +200,23 @@ impl<VM: VMBinding> GCWork<VM> for SweepChunk<VM> {
184200
self.chunk.sweep(self.space, &mut histogram);
185201
}
186202
self.space.defrag.add_completed_mark_histogram(histogram);
203+
self.epilogue.finish_one_work_packet();
204+
}
205+
}
206+
207+
/// Count number of remaining work pacets, and flush page resource if all packets are finished.
208+
struct FlushPageResource<VM: VMBinding> {
209+
space: &'static ImmixSpace<VM>,
210+
counter: AtomicUsize,
211+
}
212+
213+
impl<VM: VMBinding> FlushPageResource<VM> {
214+
/// Called after a related work packet is finished.
215+
fn finish_one_work_packet(&self) {
216+
if 1 == self.counter.fetch_sub(1, Ordering::SeqCst) {
217+
// We've finished releasing all the dead blocks to the BlockPageResource's thread-local queues.
218+
// Now flush the BlockPageResource.
219+
self.space.flush_page_resource()
220+
}
187221
}
188222
}

src/policy/immix/defrag.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl Defrag {
139139
spill_avail_histograms: &mut Histogram,
140140
) -> usize {
141141
let mut total_available_lines = 0;
142-
for block in space.reusable_blocks.get_blocks().iter() {
142+
space.reusable_blocks.iterate_blocks(|block| {
143143
let bucket = block.get_holes();
144144
let unavailable_lines = match block.get_state() {
145145
BlockState::Reusable { unavailable_lines } => unavailable_lines as usize,
@@ -148,7 +148,7 @@ impl Defrag {
148148
let available_lines = Block::LINES - unavailable_lines;
149149
spill_avail_histograms[bucket] += available_lines;
150150
total_available_lines += available_lines;
151-
}
151+
});
152152
total_available_lines
153153
}
154154

src/policy/immix/immixspace.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::policy::space::SpaceOptions;
1111
use crate::policy::space::{CommonSpace, Space};
1212
use crate::util::copy::*;
1313
use crate::util::heap::layout::heap_layout::{Mmapper, VMMap};
14+
use crate::util::heap::BlockPageResource;
1415
use crate::util::heap::HeapMeta;
1516
use crate::util::heap::PageResource;
1617
use crate::util::heap::VMRequest;
@@ -23,10 +24,7 @@ use crate::vm::*;
2324
use crate::{
2425
plan::ObjectQueue,
2526
scheduler::{GCWork, GCWorkScheduler, GCWorker, WorkBucketStage},
26-
util::{
27-
heap::FreeListPageResource,
28-
opaque_pointer::{VMThread, VMWorkerThread},
29-
},
27+
util::opaque_pointer::{VMThread, VMWorkerThread},
3028
MMTK,
3129
};
3230
use atomic::Ordering;
@@ -37,15 +35,15 @@ pub(crate) const TRACE_KIND_DEFRAG: TraceKind = 1;
3735

3836
pub struct ImmixSpace<VM: VMBinding> {
3937
common: CommonSpace<VM>,
40-
pr: FreeListPageResource<VM>,
38+
pr: BlockPageResource<VM, Block>,
4139
/// Allocation status for all chunks in immix space
4240
pub chunk_map: ChunkMap,
4341
/// Current line mark state
4442
pub line_mark_state: AtomicU8,
4543
/// Line mark state in previous GC
4644
line_unavail_state: AtomicU8,
4745
/// A list of all reusable blocks
48-
pub reusable_blocks: BlockList,
46+
pub reusable_blocks: ReusableBlockPool,
4947
/// Defrag utilities
5048
pub(super) defrag: Defrag,
5149
/// Object mark state
@@ -216,21 +214,38 @@ impl<VM: VMBinding> ImmixSpace<VM> {
216214
);
217215
ImmixSpace {
218216
pr: if common.vmrequest.is_discontiguous() {
219-
FreeListPageResource::new_discontiguous(vm_map)
217+
BlockPageResource::new_discontiguous(
218+
Block::LOG_PAGES,
219+
vm_map,
220+
scheduler.num_workers(),
221+
)
220222
} else {
221-
FreeListPageResource::new_contiguous(common.start, common.extent, vm_map)
223+
BlockPageResource::new_contiguous(
224+
Block::LOG_PAGES,
225+
common.start,
226+
common.extent,
227+
vm_map,
228+
scheduler.num_workers(),
229+
)
222230
},
223231
common,
224232
chunk_map: ChunkMap::new(),
225233
line_mark_state: AtomicU8::new(Line::RESET_MARK_STATE),
226234
line_unavail_state: AtomicU8::new(Line::RESET_MARK_STATE),
227-
reusable_blocks: BlockList::default(),
235+
reusable_blocks: ReusableBlockPool::new(scheduler.num_workers()),
228236
defrag: Defrag::default(),
229237
mark_state: Self::UNMARKED_STATE,
230238
scheduler,
231239
}
232240
}
233241

242+
/// Flush the thread-local queues in BlockPageResource
243+
pub fn flush_page_resource(&self) {
244+
self.reusable_blocks.flush_all();
245+
#[cfg(target_pointer_width = "64")]
246+
self.pr.flush_all()
247+
}
248+
234249
/// Get the number of defrag headroom pages.
235250
pub fn defrag_headroom_pages(&self) -> usize {
236251
self.defrag.defrag_headroom_pages(self)
@@ -339,7 +354,7 @@ impl<VM: VMBinding> ImmixSpace<VM> {
339354
/// Release a block.
340355
pub fn release_block(&self, block: Block) {
341356
block.deinit();
342-
self.pr.release_pages(block.start());
357+
self.pr.release_block(block);
343358
}
344359

345360
/// Allocate a clean block.

src/scheduler/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod work_bucket;
1919
pub use work_bucket::WorkBucketStage;
2020

2121
mod worker;
22+
pub(crate) use worker::current_worker_ordinal;
2223
pub use worker::GCWorker;
2324

2425
mod controller;

src/scheduler/worker.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::mmtk::MMTK;
55
use crate::util::copy::GCWorkerCopyContext;
66
use crate::util::opaque_pointer::*;
77
use crate::vm::{Collection, GCThreadContext, VMBinding};
8+
use atomic::Atomic;
89
use atomic_refcell::{AtomicRef, AtomicRefCell, AtomicRefMut};
910
use crossbeam::deque::{self, Stealer};
1011
use crossbeam::queue::ArrayQueue;
@@ -15,6 +16,17 @@ use std::sync::{Arc, Mutex};
1516
/// Represents the ID of a GC worker thread.
1617
pub type ThreadId = usize;
1718

19+
thread_local! {
20+
/// Current worker's ordinal
21+
static WORKER_ORDINAL: Atomic<Option<ThreadId>> = Atomic::new(None);
22+
}
23+
24+
/// Get current worker ordinal. Return `None` if the current thread is not a worker.
25+
#[inline(always)]
26+
pub fn current_worker_ordinal() -> Option<ThreadId> {
27+
WORKER_ORDINAL.with(|x| x.load(Ordering::Relaxed))
28+
}
29+
1830
/// The part shared between a GCWorker and the scheduler.
1931
/// This structure is used for communication, e.g. adding new work packets.
2032
pub struct GCWorkerShared<VM: VMBinding> {
@@ -173,6 +185,7 @@ impl<VM: VMBinding> GCWorker<VM> {
173185
/// Entry of the worker thread. Resolve thread affinity, if it has been specified by the user.
174186
/// Each worker will keep polling and executing work packets in a loop.
175187
pub fn run(&mut self, tls: VMWorkerThread, mmtk: &'static MMTK<VM>) {
188+
WORKER_ORDINAL.with(|x| x.store(Some(self.ordinal), Ordering::SeqCst));
176189
self.scheduler.resolve_affinity(self.ordinal);
177190
self.tls = tls;
178191
self.copy = crate::plan::create_gc_worker_context(tls, mmtk);

0 commit comments

Comments
 (0)