Skip to content

Commit

Permalink
better faster stronger reentrancy support
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 8, 2024
1 parent ef3d2dc commit 0004af7
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 65 deletions.
1 change: 0 additions & 1 deletion crates/re_query_cache2/src/range/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ impl RangeCache {
pending_invalidation: _,
} = self;

// A plain old `write()` (as opposed to a `try_write()`) here _should_ be fine.
let mut per_data_time = per_data_time.write();

if let Some(query_front) = per_data_time.compute_front_query(query) {
Expand Down
155 changes: 91 additions & 64 deletions crates/re_query_cache2/src/range/results.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
cell::RefCell,
collections::VecDeque,
ops::Range,
sync::{Arc, OnceLock},
Expand Down Expand Up @@ -146,6 +147,20 @@ pub struct CachedRangeData<'a, T> {

front_status: (TimeInt, PromiseResult<()>),
back_status: (TimeInt, PromiseResult<()>),

/// Keeps track of reentrancy counts for the current thread.
///
/// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing
/// environments such as Rayon.
reentering: &'static std::thread::LocalKey<RefCell<u32>>,
}

impl<'a, T> Drop for CachedRangeData<'a, T> {
#[inline]
fn drop(&mut self) {
self.reentering
.with_borrow_mut(|reentering| *reentering = reentering.saturating_sub(1));
}
}

impl<'a, T> CachedRangeData<'a, T> {
Expand Down Expand Up @@ -244,44 +259,49 @@ impl CachedRangeComponentResults {
pub fn to_dense<C: Component>(&self, resolver: &PromiseResolver) -> CachedRangeData<'_, C> {
// --- Step 1: try and upsert pending data (write lock) ---

// # Multithreading semantics
//
// There is only one situation where this `try_write()` might fail: there is another task that
// is already in the process of upserting that specific cache (e.g. a cloned space view).
//
// That task might be on the same thread (due to work-stealing), or a different one.
// Either way, we need to give up trying to upsert the cache in order to prevent a
// deadlock in case the other task is in fact running on the same thread.
//
// It's fine, though:
// - Best case scenario, the data we need is already cached.
// - Worst case scenario, the data will be missing for this current frame but is guaranteed
// to be there for the next.
//
// Data invalidation happens at the per-archetype cache layer, so this won't return
// out-of-date data in either scenario.
//
// There is a lot of complexity we could add to make this whole process more efficient:
// keep track of failed queries in a queue so we don't rely on probabilities, keep track
// of the thread-local reentrancy state to skip this logic when it's not needed, return raw
// data when the lock is busy and the data isn't already cached, etc.
//
// In the end, this is a edge-case inherent to our current "immediate query" model that we
// already know we want -- and have to -- move away from: the extra complexity isn't worth it.
let mut results = self.0.try_write();
thread_local! {
/// Keeps track of reentrancy counts for the current thread.
///
/// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing
/// environments such as Rayon.
static REENTERING: RefCell<u32> = const { RefCell::new(0) };
}

REENTERING.with_borrow_mut(|reentering| *reentering = reentering.saturating_add(1));

let mut results = if let Some(results) = self.0.try_write() {
// The lock was free to grab, nothing else to worry about.
Some(results)
} else {
REENTERING.with_borrow_mut(|reentering| {
if *reentering > 1 {
// The lock is busy, and at least one of the lock holders is the current thread from a
// previous stack frame.
//
// Return `None` so that we skip straight to the read-only part of the operation.
// All the data will be there already, since the previous stack frame already
// took care of upserting it.
None
} else {
// The lock is busy, but it is not held by the current thread.
// Just block untils it gets released.

Check warning on line 287 in crates/re_query_cache2/src/range/results.rs

View workflow job for this annotation

GitHub Actions / Checks / Spell Check

"untils" should be "until" or "utils".
Some(self.0.write())
}
})
};

if let Some(results) = &mut results {
// NOTE: This is just a lazy initialization of the underlying deque, because we
// just now finally know the expected type!
if results.cached_dense.is_none() {
results.cached_dense = Some(Box::new(FlatVecDeque::<C>::new()));
}

if results.cached_sparse.is_some() {
re_log::error!(
"a component cannot be both dense and sparse -- try `to_sparse()` instead"
);
} else {
// NOTE: This is just a lazy initialization of the underlying deque, because we
// just now finally know the expected type!
if results.cached_dense.is_none() {
results.cached_dense = Some(Box::new(FlatVecDeque::<C>::new()));
}

if !results.promises_front.is_empty() {
let mut resolved_indices = Vec::with_capacity(results.promises_front.len());
let mut resolved_data = Vec::with_capacity(results.promises_front.len());
Expand Down Expand Up @@ -411,7 +431,7 @@ impl CachedRangeComponentResults {
// work-stealing thread-pool and might swap a task on one thread with another task on the
// same thread, where both tasks happen to query the same exact data (e.g. cloned space views).
//
// See comment above for more details.
// See `REENTERING` comments above for more details.
self.read_recursive()
};

Expand Down Expand Up @@ -439,6 +459,7 @@ impl CachedRangeComponentResults {
data,
front_status,
back_status,
reentering: &REENTERING,
}
}

Expand All @@ -458,44 +479,49 @@ impl CachedRangeComponentResults {
) -> CachedRangeData<'_, Option<C>> {
// --- Step 1: try and upsert pending data (write lock) ---

// # Multithreading semantics
//
// There is only one situation where this `try_write()` might fail: there is another task that
// is already in the process of upserting that specific cache (e.g. a cloned space view).
//
// That task might be on the same thread (due to work-stealing), or a different one.
// Either way, we need to give up trying to upsert the cache in order to prevent a
// deadlock in case the other task is in fact running on the same thread.
//
// It's fine, though:
// - Best case scenario, the data we need is already cached.
// - Worst case scenario, the data will be missing for this current frame but is guaranteed
// to be there for the next.
//
// Data invalidation happens at the per-archetype cache layer, so this won't return
// out-of-date data in either scenario.
//
// There is a lot of complexity we could add to make this whole process more efficient:
// keep track of failed queries in a queue so we don't rely on probabilities, keep track
// of the thread-local reentrancy state to skip this logic when it's not needed, return raw
// data when the lock is busy and the data isn't already cached, etc.
//
// In the end, this is a edge-case inherent to our current "immediate query" model that we
// already know we want -- and have to -- move away from: the extra complexity isn't worth it.
let mut results = self.0.try_write();
thread_local! {
/// Keeps track of reentrancy counts for the current thread.
///
/// Used to detect and prevent potential deadlocks when using the cached APIs in work-stealing
/// environments such as Rayon.
static REENTERING: RefCell<u32> = const { RefCell::new(0) };
}

REENTERING.with_borrow_mut(|reentering| *reentering = reentering.saturating_add(1));

let mut results = if let Some(results) = self.0.try_write() {
// The lock was free to grab, nothing else to worry about.
Some(results)
} else {
REENTERING.with_borrow_mut(|reentering| {
if *reentering > 1 {
// The lock is busy, and at least one of the lock holders is the current thread from a
// previous stack frame.
//
// Return `None` so that we skip straight to the read-only part of the operation.
// All the data will be there already, since the previous stack frame already
// took care of upserting it.
None
} else {
// The lock is busy, but it is not held by the current thread.
// Just block untils it gets released.

Check warning on line 507 in crates/re_query_cache2/src/range/results.rs

View workflow job for this annotation

GitHub Actions / Checks / Spell Check

"untils" should be "until" or "utils".
Some(self.0.write())
}
})
};

if let Some(results) = &mut results {
// NOTE: This is just a lazy initialization of the underlying deque, because we
// just now finally know the expected type!
if results.cached_sparse.is_none() {
results.cached_sparse = Some(Box::new(FlatVecDeque::<Option<C>>::new()));
}

if results.cached_dense.is_some() {
re_log::error!(
"a component cannot be both dense and sparse -- try `to_dense()` instead"
);
} else {
// NOTE: This is just a lazy initialization of the underlying deque, because we
// just now finally know the expected type!
if results.cached_sparse.is_none() {
results.cached_sparse = Some(Box::new(FlatVecDeque::<Option<C>>::new()));
}

if !results.promises_front.is_empty() {
let mut resolved_indices = Vec::with_capacity(results.promises_front.len());
let mut resolved_data = Vec::with_capacity(results.promises_front.len());
Expand Down Expand Up @@ -625,7 +651,7 @@ impl CachedRangeComponentResults {
// work-stealing thread-pool and might swap a task on one thread with another task on the
// same thread, where both tasks happen to query the same exact data (e.g. cloned space views).
//
// See comment above for more details.
// See `REENTERING` comments above for more details.
self.read_recursive()
};

Expand Down Expand Up @@ -653,6 +679,7 @@ impl CachedRangeComponentResults {
data,
front_status,
back_status,
reentering: &REENTERING,
}
}
}
Expand Down

0 comments on commit 0004af7

Please sign in to comment.