Skip to content

Commit

Permalink
Use a lock-free stack to implement the global reference pool to penal…
Browse files Browse the repository at this point in the history
…ize producers versus consumers
  • Loading branch information
adamreichold committed May 18, 2024
1 parent fe79f54 commit e2ce401
Showing 1 changed file with 69 additions and 66 deletions.
135 changes: 69 additions & 66 deletions src/gil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ use std::cell::Cell;
use std::cell::RefCell;
#[cfg(not(debug_assertions))]
use std::cell::UnsafeCell;
use std::{mem, ptr::NonNull, sync};
#[cfg(not(pyo3_disable_reference_pool))]
use std::ptr::null_mut;
#[cfg(not(pyo3_disable_reference_pool))]
use std::sync::atomic::{AtomicPtr, Ordering};
use std::{ptr::NonNull, sync};

static START: sync::Once = sync::Once::new();

// Vector of PyObject
type PyObjVec = Vec<NonNull<ffi::PyObject>>;

std::thread_local! {
/// This is an internal counter in pyo3 monitoring whether this thread has the GIL.
///
Expand Down Expand Up @@ -233,7 +240,7 @@ impl GILGuard {
increment_gil_count();
// Update counts of PyObjects / Py that have been cloned or dropped since last acquisition
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(unsafe { Python::assume_gil_acquired() });
update_counts(unsafe { Python::assume_gil_acquired() });

GILGuard::Ensured { gstate }
}
Expand Down Expand Up @@ -272,47 +279,46 @@ impl Drop for GILGuard {
}
}

// Vector of PyObject
type PyObjVec = Vec<NonNull<ffi::PyObject>>;

#[cfg(not(pyo3_disable_reference_pool))]
/// Thread-safe storage for objects which were dec_ref while the GIL was not held.
struct ReferencePool {
pending_decrefs: sync::Mutex<PyObjVec>,
struct PendingDecRef {
obj: NonNull<ffi::PyObject>,
next: *mut PendingDecRef,
}

#[cfg(not(pyo3_disable_reference_pool))]
impl ReferencePool {
const fn new() -> Self {
Self {
pending_decrefs: sync::Mutex::new(Vec::new()),
}
}
static POOL: AtomicPtr<PendingDecRef> = AtomicPtr::new(null_mut());

fn register_decref(&self, obj: NonNull<ffi::PyObject>) {
self.pending_decrefs.lock().unwrap().push(obj);
}
#[cfg(not(pyo3_disable_reference_pool))]
fn enqueue_decref(obj: NonNull<ffi::PyObject>) {
let val = PendingDecRef {
obj,
next: null_mut(),
};

fn update_counts(&self, _py: Python<'_>) {
let mut pending_decrefs = self.pending_decrefs.lock().unwrap();
if pending_decrefs.is_empty() {
return;
}
let top = Box::into_raw(Box::new(val));

let decrefs = mem::take(&mut *pending_decrefs);
drop(pending_decrefs);
let next = unsafe { &mut (*top).next };

for ptr in decrefs {
unsafe { ffi::Py_DECREF(ptr.as_ptr()) };
}
}
POOL.fetch_update(Ordering::AcqRel, Ordering::Relaxed, |new_top| {
*next = new_top;
Some(top)
})
.unwrap();
}

#[cfg(not(pyo3_disable_reference_pool))]
unsafe impl Sync for ReferencePool {}
fn update_counts(_py: Python<'_>) {
let mut top = POOL.swap(null_mut(), Ordering::AcqRel);

#[cfg(not(pyo3_disable_reference_pool))]
static POOL: ReferencePool = ReferencePool::new();
while !top.is_null() {
// SAFETY: Was enqueued using `Box::into_raw`.
let val = unsafe { Box::from_raw(top) };

unsafe { ffi::Py_DECREF(val.obj.as_ptr()) };

top = val.next;
}
}

/// A guard which can be used to temporarily release the GIL and restore on `Drop`.
pub(crate) struct SuspendGIL {
Expand All @@ -337,7 +343,7 @@ impl Drop for SuspendGIL {

// Update counts of PyObjects / Py that were cloned or dropped while the GIL was released.
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(Python::assume_gil_acquired());
update_counts(Python::assume_gil_acquired());
}
}
}
Expand Down Expand Up @@ -410,7 +416,7 @@ impl GILPool {
pub unsafe fn new() -> GILPool {
// Update counts of PyObjects / Py that have been cloned or dropped since last acquisition
#[cfg(not(pyo3_disable_reference_pool))]
POOL.update_counts(Python::assume_gil_acquired());
update_counts(Python::assume_gil_acquired());
GILPool {
start: OWNED_OBJECTS
.try_with(|owned_objects| {
Expand Down Expand Up @@ -489,7 +495,7 @@ pub unsafe fn register_decref(obj: NonNull<ffi::PyObject>) {
ffi::Py_DECREF(obj.as_ptr())
} else {
#[cfg(not(pyo3_disable_reference_pool))]
POOL.register_decref(obj);
enqueue_decref(obj);
#[cfg(all(
pyo3_disable_reference_pool,
not(pyo3_leak_on_drop_without_reference_pool)
Expand Down Expand Up @@ -549,16 +555,9 @@ fn decrement_gil_count() {

#[cfg(test)]
mod tests {
#[cfg(not(pyo3_disable_reference_pool))]
use super::{gil_is_acquired, POOL};
#[cfg(feature = "gil-refs")]
#[allow(deprecated)]
use super::{GILPool, GIL_COUNT, OWNED_OBJECTS};
use crate::types::any::PyAnyMethods;
#[cfg(feature = "gil-refs")]
use crate::{ffi, gil};
use crate::{PyObject, Python};
use std::ptr::NonNull;
use super::*;

use crate::{types::any::PyAnyMethods, PyObject};

fn get_object(py: Python<'_>) -> PyObject {
py.eval_bound("object()", None, None).unwrap().unbind()
Expand All @@ -573,21 +572,25 @@ mod tests {
len
}

#[cfg(not(pyo3_disable_reference_pool))]
fn pool_dec_refs_does_not_contain(obj: &PyObject) -> bool {
!POOL
.pending_decrefs
.lock()
.unwrap()
.contains(&unsafe { NonNull::new_unchecked(obj.as_ptr()) })
}

#[cfg(all(not(pyo3_disable_reference_pool), not(target_arch = "wasm32")))]
fn pool_dec_refs_contains(obj: &PyObject) -> bool {
POOL.pending_decrefs
.lock()
.unwrap()
.contains(&unsafe { NonNull::new_unchecked(obj.as_ptr()) })
let mut top = POOL.swap(null_mut(), Ordering::AcqRel);
let mut found = false;

while !top.is_null() {
// SAFETY: Was enqueued using `Box::into_raw`.
let val = unsafe { Box::from_raw(top) };

if val.obj.as_ptr() == obj.as_ptr() {
found = true;
}

unsafe { ffi::Py_DECREF(val.obj.as_ptr()) };

top = val.next;
}

found
}

#[test]
Expand All @@ -603,7 +606,7 @@ mod tests {
unsafe {
{
let pool = py.new_pool();
gil::register_owned(pool.python(), NonNull::new_unchecked(obj.into_ptr()));
register_owned(pool.python(), NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
Expand Down Expand Up @@ -632,14 +635,14 @@ mod tests {
let _pool = py.new_pool();
assert_eq!(owned_object_count(), 0);

gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
register_owned(py, NonNull::new_unchecked(obj.into_ptr()));

assert_eq!(owned_object_count(), 1);
assert_eq!(ffi::Py_REFCNT(obj_ptr), 2);
{
let _pool = py.new_pool();
let obj = get_object(py);
gil::register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
register_owned(py, NonNull::new_unchecked(obj.into_ptr()));
assert_eq!(owned_object_count(), 2);
}
assert_eq!(owned_object_count(), 1);
Expand All @@ -662,14 +665,14 @@ mod tests {

assert_eq!(obj.get_refcnt(py), 2);
#[cfg(not(pyo3_disable_reference_pool))]
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));

// With the GIL held, reference count will be decreased immediately.
drop(reference);

assert_eq!(obj.get_refcnt(py), 1);
#[cfg(not(pyo3_disable_reference_pool))]
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));
});
}

Expand All @@ -682,7 +685,7 @@ mod tests {
let reference = obj.clone_ref(py);

assert_eq!(obj.get_refcnt(py), 2);
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));

// Drop reference in a separate thread which doesn't have the GIL.
std::thread::spawn(move || drop(reference)).join().unwrap();
Expand All @@ -697,7 +700,7 @@ mod tests {
// Next time the GIL is acquired, the reference is released
Python::with_gil(|py| {
assert_eq!(obj.get_refcnt(py), 1);
assert!(pool_dec_refs_does_not_contain(&obj));
assert!(!pool_dec_refs_contains(&obj));
});
}

Expand Down Expand Up @@ -820,10 +823,10 @@ mod tests {
let capsule =
unsafe { ffi::PyCapsule_New(ptr as _, std::ptr::null(), Some(capsule_drop)) };

POOL.register_decref(NonNull::new(capsule).unwrap());
super::enqueue_decref(NonNull::new(capsule).unwrap());

// Updating the counts will call decref on the capsule, which calls capsule_drop
POOL.update_counts(py);
super::update_counts(py);
})
}
}

0 comments on commit e2ce401

Please sign in to comment.