Skip to content

Commit

Permalink
Primary caching 10: latest-at cache invalidation (#4726)
Browse files Browse the repository at this point in the history
This implements cache invalidation via a `StoreSubscriber`.

We keep track of the timestamps to invalidate in the `StoreSubscriber`,
but we only do the actual removal of components at query time.
This is similar to how we handle bucket sorting in the main store: doing
it at query time has the benefit that the frame time effectively behaves
as natural micro-batching mechanism that vastly improves performance.

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc authored Jan 15, 2024
1 parent f569223 commit 68de772
Show file tree
Hide file tree
Showing 5 changed files with 468 additions and 104 deletions.
16 changes: 15 additions & 1 deletion crates/re_data_store/src/store_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl DataStore {
StoreSubscriberHandle(subscribers.len() as u32 - 1)
}

/// Passes a reference to the downcasted subscriber to the given callback.
/// Passes a reference to the downcasted subscriber to the given `FnMut` callback.
///
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
pub fn with_subscriber<V: StoreSubscriber, T, F: FnMut(&V) -> T>(
Expand All @@ -112,6 +112,20 @@ impl DataStore {
})
}

/// Passes a reference to the downcasted subscriber to the given `FnOnce` callback.
///
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
pub fn with_subscriber_once<V: StoreSubscriber, T, F: FnOnce(&V) -> T>(
StoreSubscriberHandle(handle): StoreSubscriberHandle,
f: F,
) -> Option<T> {
let subscribers = SUBSCRIBERS.read();
subscribers.get(handle as usize).and_then(|subscriber| {
let subscriber = subscriber.read();
subscriber.as_any().downcast_ref::<V>().map(f)
})
}

/// Passes a mutable reference to the downcasted subscriber to the given callback.
///
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
Expand Down
237 changes: 202 additions & 35 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use std::{
sync::Arc,
};

use ahash::HashMap;
use ahash::{HashMap, HashSet};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use paste::paste;
use seq_macro::seq;

use re_data_store::{LatestAtQuery, RangeQuery};
use re_data_store::{
LatestAtQuery, RangeQuery, StoreDiff, StoreEvent, StoreSubscriber, StoreSubscriberHandle,
};
use re_log_types::{EntityPath, RowId, StoreId, TimeInt, Timeline};
use re_query::ArchetypeView;
use re_types_core::{components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName};
Expand Down Expand Up @@ -43,15 +45,45 @@ impl From<RangeQuery> for AnyQuery {
/// All primary caches (all stores, all entities, everything).
//
// TODO(cmc): Centralize and harmonize all caches (query, jpeg, mesh).
static CACHES: Lazy<Caches> = Lazy::new(Caches::default);
static CACHES: Lazy<StoreSubscriberHandle> =
Lazy::new(|| re_data_store::DataStore::register_subscriber(Box::<Caches>::default()));

/// Maintains the top-level cache mappings.
//
// TODO(cmc): Store subscriber and cache invalidation.
// TODO(#4730): SizeBytes support + size stats + mem panel
#[derive(Default)]
pub struct Caches {
latest_at: RwLock<HashMap<CacheKey, Arc<RwLock<LatestAtCache>>>>,
pub struct Caches(RwLock<HashMap<CacheKey, CachesPerArchetype>>);

#[derive(Default)]
pub struct CachesPerArchetype {
/// Which [`Archetype`] are we querying for?
///
/// This is very important because of our data model: we not only query for components, but we
/// query for components from a specific point-of-view (the so-called primary component).
/// Different archetypes have different point-of-views, and therefore can end up with different
/// results, even from the same raw data.
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,

/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
///
/// The next time this cache gets queried, it must remove any entry matching this criteria.
/// `None` indicates that there's no pending invalidation.
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
pending_timeful_invalidation: Option<TimeInt>,

/// If `true`, the timeless data associated with this cache has been asynchronously invalidated.
///
/// If `true`, this cache must remove all of its timeless entries the next time it gets queried.
/// `false` indicates that there's no pending invalidation.
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
pending_timeless_invalidation: bool,
}

impl Caches {
Expand All @@ -60,8 +92,9 @@ impl Caches {
// TODO(#4731): expose palette command.
#[inline]
pub fn clear() {
let Caches { latest_at } = &*CACHES;
latest_at.write().clear();
re_data_store::DataStore::with_subscriber_once(*CACHES, |caches: &Caches| {
caches.0.write().clear();
});
}

/// Gives write access to the appropriate `LatestAtCache` according to the specified
Expand All @@ -77,14 +110,25 @@ impl Caches {
A: Archetype,
F: FnMut(&mut LatestAtCache) -> R,
{
let key = CacheKey::new(store_id, entity_path, query.timeline, A::name());
let key = CacheKey::new(store_id, entity_path, query.timeline);

let cache =
re_data_store::DataStore::with_subscriber_once(*CACHES, move |caches: &Caches| {
let mut caches = caches.0.write();

// We want to make sure we release the lock on the top-level cache map ASAP.
let cache = {
let mut caches = CACHES.latest_at.write();
let latest_at_cache = caches.entry(key).or_default();
Arc::clone(latest_at_cache)
};
let caches_per_archetype = caches.entry(key).or_default();
caches_per_archetype.handle_pending_invalidation();

let mut latest_at_per_archetype =
caches_per_archetype.latest_at_per_archetype.write();
let latest_at_cache = latest_at_per_archetype.entry(A::name()).or_default();

Arc::clone(latest_at_cache)

// Implicitly releasing all intermediary locks.
})
// NOTE: downcasting cannot fail, this is our own private handle.
.unwrap();

let mut cache = cache.write();
f(&mut cache)
Expand All @@ -102,17 +146,6 @@ pub struct CacheKey {

/// Which [`Timeline`] is the query targeting?
pub timeline: Timeline,

/// Which [`Archetype`] are we querying for?
///
/// This is very important because of our data model: we not only query for components, but we
/// query for components from a specific point-of-view (the so-called primary component).
/// Different archetypes have different point-of-views, and therefore can end up with different
/// results, even from the same raw data.
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
pub archetype_name: ArchetypeName,
}

impl CacheKey {
Expand All @@ -121,17 +154,149 @@ impl CacheKey {
store_id: impl Into<StoreId>,
entity_path: impl Into<EntityPath>,
timeline: impl Into<Timeline>,
archetype_name: impl Into<ArchetypeName>,
) -> Self {
Self {
store_id: store_id.into(),
entity_path: entity_path.into(),
timeline: timeline.into(),
archetype_name: archetype_name.into(),
}
}
}

// --- Invalidation ---

impl StoreSubscriber for Caches {
#[inline]
fn name(&self) -> String {
"rerun.store_subscribers.QueryCache".into()
}

#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}

#[inline]
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

// TODO(cmc): support dropped recordings.
fn on_events(&mut self, events: &[StoreEvent]) {
re_tracing::profile_function!(format!("num_events={}", events.len()));

for event in events {
let StoreEvent {
store_id,
store_generation: _,
event_id: _,
diff,
} = event;

let StoreDiff {
kind: _, // Don't care: both additions and deletions invalidate query results.
row_id: _,
times,
entity_path,
cells: _, // Don't care: we invalidate at the entity level, not component level.
} = diff;

#[derive(Default, Debug)]
struct CompactedEvents {
timeless: HashSet<(StoreId, EntityPath)>,
timeful: HashMap<CacheKey, TimeInt>,
}

let mut compacted = CompactedEvents::default();
{
re_tracing::profile_scope!("compact events");

if times.is_empty() {
compacted
.timeless
.insert((store_id.clone(), entity_path.clone()));
}

for &(timeline, time) in times {
let key = CacheKey::new(store_id.clone(), entity_path.clone(), timeline);
let min_time = compacted.timeful.entry(key).or_insert(TimeInt::MAX);
*min_time = TimeInt::min(*min_time, time);
}
}

// TODO(cmc): This is horribly stupid and slow and can easily be made faster by adding
// yet another layer of caching indirection.
// But since this pretty much never happens in practice, let's not go there until we
// have metrics showing that we need to.
{
re_tracing::profile_scope!("timeless");

for (store_id, entity_path) in compacted.timeless {
for (key, caches_per_archetype) in self.0.write().iter_mut() {
if key.store_id == store_id && key.entity_path == entity_path {
caches_per_archetype.pending_timeless_invalidation = true;
}
}
}
}

{
re_tracing::profile_scope!("timeful");

for (key, time) in compacted.timeful {
if let Some(caches_per_archetype) = self.0.write().get_mut(&key) {
if let Some(min_time) =
caches_per_archetype.pending_timeful_invalidation.as_mut()
{
*min_time = TimeInt::min(*min_time, time);
} else {
caches_per_archetype.pending_timeful_invalidation = Some(time);
}
}
}
}
}
}
}

impl CachesPerArchetype {
/// Removes all entries from the cache that have been asynchronously invalidated.
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
fn handle_pending_invalidation(&mut self) {
let pending_timeless_invalidation = self.pending_timeless_invalidation;
let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();

if !pending_timeless_invalidation && !pending_timeful_invalidation {
return;
}

re_tracing::profile_function!();

for latest_at_cache in self.latest_at_per_archetype.read().values() {
let mut latest_at_cache = latest_at_cache.write();

if pending_timeless_invalidation {
latest_at_cache.timeless = None;
}

if let Some(min_time) = self.pending_timeful_invalidation {
latest_at_cache
.per_query_time
.retain(|&query_time, _| query_time < min_time);

latest_at_cache
.per_data_time
.retain(|&data_time, _| data_time < min_time);
}
}

self.pending_timeful_invalidation = None;
self.pending_timeless_invalidation = false;
}
}

// ---

/// Caches the results of any query for an arbitrary range of time.
Expand All @@ -150,8 +315,10 @@ impl CacheKey {
pub struct CacheBucket {
/// The _data_ timestamps and [`RowId`]s of all cached rows.
///
/// This corresponds to the data time and `RowId` returned by `re_query::query_archetype`.
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>,
pub(crate) data_times: VecDeque<(TimeInt, RowId)>,

/// The [`InstanceKey`]s of the point-of-view components.
pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
Expand All @@ -169,8 +336,8 @@ pub struct CacheBucket {
impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_pov_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_data_times.iter()
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
Expand Down Expand Up @@ -206,7 +373,7 @@ impl CacheBucket {
/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.pov_data_times.len()
self.data_times.len()
}

#[inline]
Expand Down Expand Up @@ -235,15 +402,15 @@ macro_rules! impl_insert {
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
pov_data_times,
data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = data_times.partition_point(|t| t < &(query_time, pov_row_id));

pov_data_times.insert(index, (query_time, pov_row_id));
data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
Expand Down
3 changes: 1 addition & 2 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ macro_rules! impl_query_archetype {
re_tracing::profile_scope!("iter");

let it = itertools::izip!(
bucket.iter_pov_data_times(),
bucket.iter_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
Expand All @@ -133,7 +133,6 @@ macro_rules! impl_query_archetype {
Ok(())
};


let upsert_results = |
data_time: TimeInt,
arch_view: &::re_query::ArchetypeView<A>,
Expand Down
Loading

0 comments on commit 68de772

Please sign in to comment.