Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 10: latest-at cache invalidation #4726

Merged
merged 6 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/re_data_store/src/store_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl DataStore {
StoreSubscriberHandle(subscribers.len() as u32 - 1)
}

/// Passes a reference to the downcasted subscriber to the given callback.
/// Passes a reference to the downcasted subscriber to the given `FnMut` callback.
///
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
pub fn with_subscriber<V: StoreSubscriber, T, F: FnMut(&V) -> T>(
Expand All @@ -112,6 +112,20 @@ impl DataStore {
})
}

/// Passes a reference to the downcasted subscriber to the given `FnOnce` callback.
///
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
pub fn with_subscriber_once<V: StoreSubscriber, T, F: FnOnce(&V) -> T>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? FnOnce is a super-trait of FnMut, so you should be able to call with_subscriber with an FnOnce closure

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If F captures anything by value and then moves that value (like e.g. in Caches::with_latest_at) then it cannot be FnMut since it can only be called once.

I could make with_subscriber take a FnOnce, but then im preventing callers from actually working with FnMut closures.

StoreSubscriberHandle(handle): StoreSubscriberHandle,
f: F,
) -> Option<T> {
let subscribers = SUBSCRIBERS.read();
subscribers.get(handle as usize).and_then(|subscriber| {
let subscriber = subscriber.read();
subscriber.as_any().downcast_ref::<V>().map(f)
})
}

/// Passes a mutable reference to the downcasted subscriber to the given callback.
///
/// Returns `None` if the subscriber doesn't exist or downcasting failed.
Expand Down
237 changes: 202 additions & 35 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use std::{
sync::Arc,
};

use ahash::HashMap;
use ahash::{HashMap, HashSet};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use paste::paste;
use seq_macro::seq;

use re_data_store::{LatestAtQuery, RangeQuery};
use re_data_store::{
LatestAtQuery, RangeQuery, StoreDiff, StoreEvent, StoreSubscriber, StoreSubscriberHandle,
};
use re_log_types::{EntityPath, RowId, StoreId, TimeInt, Timeline};
use re_query::ArchetypeView;
use re_types_core::{components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName};
Expand Down Expand Up @@ -43,15 +45,45 @@ impl From<RangeQuery> for AnyQuery {
/// All primary caches (all stores, all entities, everything).
//
// TODO(cmc): Centralize and harmonize all caches (query, jpeg, mesh).
static CACHES: Lazy<Caches> = Lazy::new(Caches::default);
static CACHES: Lazy<StoreSubscriberHandle> =
Lazy::new(|| re_data_store::DataStore::register_subscriber(Box::<Caches>::default()));

/// Maintains the top-level cache mappings.
//
// TODO(cmc): Store subscriber and cache invalidation.
// TODO(#4730): SizeBytes support + size stats + mem panel
#[derive(Default)]
pub struct Caches {
latest_at: RwLock<HashMap<CacheKey, Arc<RwLock<LatestAtCache>>>>,
pub struct Caches(RwLock<HashMap<CacheKey, CachesPerArchetype>>);

#[derive(Default)]
pub struct CachesPerArchetype {
/// Which [`Archetype`] are we querying for?
///
/// This is very important because of our data model: we not only query for components, but we
/// query for components from a specific point-of-view (the so-called primary component).
/// Different archetypes have different point-of-views, and therefore can end up with different
/// results, even from the same raw data.
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,

/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
///
/// The next time this cache gets queried, it must remove any entry matching this criteria.
/// `None` indicates that there's no pending invalidation.
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
pending_timeful_invalidation: Option<TimeInt>,

/// If `true`, the timeless data associated with this cache has been asynchronously invalidated.
///
/// If `true`, this cache must remove all of its timeless entries the next time it gets queried.
/// `false` indicates that there's no pending invalidation.
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
pending_timeless_invalidation: bool,
}

impl Caches {
Expand All @@ -60,8 +92,9 @@ impl Caches {
// TODO(#4731): expose palette command.
#[inline]
pub fn clear() {
let Caches { latest_at } = &*CACHES;
latest_at.write().clear();
re_data_store::DataStore::with_subscriber_once(*CACHES, |caches: &Caches| {
caches.0.write().clear();
});
}

/// Gives write access to the appropriate `LatestAtCache` according to the specified
Expand All @@ -77,14 +110,25 @@ impl Caches {
A: Archetype,
F: FnMut(&mut LatestAtCache) -> R,
{
let key = CacheKey::new(store_id, entity_path, query.timeline, A::name());
let key = CacheKey::new(store_id, entity_path, query.timeline);

let cache =
re_data_store::DataStore::with_subscriber_once(*CACHES, move |caches: &Caches| {
let mut caches = caches.0.write();

// We want to make sure we release the lock on the top-level cache map ASAP.
let cache = {
let mut caches = CACHES.latest_at.write();
let latest_at_cache = caches.entry(key).or_default();
Arc::clone(latest_at_cache)
};
let caches_per_archetype = caches.entry(key).or_default();
caches_per_archetype.handle_pending_invalidation();

let mut latest_at_per_archetype =
caches_per_archetype.latest_at_per_archetype.write();
let latest_at_cache = latest_at_per_archetype.entry(A::name()).or_default();

Arc::clone(latest_at_cache)

// Implicitly releasing all intermediary locks.
})
// NOTE: downcasting cannot fail, this is our own private handle.
.unwrap();

let mut cache = cache.write();
f(&mut cache)
Expand All @@ -102,17 +146,6 @@ pub struct CacheKey {

/// Which [`Timeline`] is the query targeting?
pub timeline: Timeline,

/// Which [`Archetype`] are we querying for?
///
/// This is very important because of our data model: we not only query for components, but we
/// query for components from a specific point-of-view (the so-called primary component).
/// Different archetypes have different point-of-views, and therefore can end up with different
/// results, even from the same raw data.
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
pub archetype_name: ArchetypeName,
}

impl CacheKey {
Expand All @@ -121,17 +154,149 @@ impl CacheKey {
store_id: impl Into<StoreId>,
entity_path: impl Into<EntityPath>,
timeline: impl Into<Timeline>,
archetype_name: impl Into<ArchetypeName>,
) -> Self {
Self {
store_id: store_id.into(),
entity_path: entity_path.into(),
timeline: timeline.into(),
archetype_name: archetype_name.into(),
}
}
}

// --- Invalidation ---

impl StoreSubscriber for Caches {
#[inline]
fn name(&self) -> String {
"rerun.store_subscribers.QueryCache".into()
}

#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}

#[inline]
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

// TODO(cmc): support dropped recordings.
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
fn on_events(&mut self, events: &[StoreEvent]) {
re_tracing::profile_function!(format!("num_events={}", events.len()));

for event in events {
let StoreEvent {
store_id,
store_generation: _,
event_id: _,
diff,
} = event;

let StoreDiff {
kind: _, // Don't care: both additions and deletions invalidate query results.
row_id: _,
times,
entity_path,
cells: _, // Don't care: we invalidate at the entity level, not component level.
} = diff;

#[derive(Default, Debug)]
struct CompactedEvents {
timeless: HashSet<(StoreId, EntityPath)>,
timeful: HashMap<CacheKey, TimeInt>,
}

let mut compacted = CompactedEvents::default();
{
re_tracing::profile_scope!("compact events");

if times.is_empty() {
compacted
.timeless
.insert((store_id.clone(), entity_path.clone()));
}

for &(timeline, time) in times {
let key = CacheKey::new(store_id.clone(), entity_path.clone(), timeline);
let min_time = compacted.timeful.entry(key).or_insert(TimeInt::MAX);
*min_time = TimeInt::min(*min_time, time);
}
}

// TODO(cmc): This is horribly stupid and slow and can easily be made faster by adding
// yet another layer of caching indirection.
// But since this pretty much never happens in practice, let's not go there until we
// have metrics showing that we need to.
{
re_tracing::profile_scope!("timeless");

for (store_id, entity_path) in compacted.timeless {
for (key, caches_per_archetype) in self.0.write().iter_mut() {
if key.store_id == store_id && key.entity_path == entity_path {
caches_per_archetype.pending_timeless_invalidation = true;
}
}
}
}

{
re_tracing::profile_scope!("timeful");

for (key, time) in compacted.timeful {
if let Some(caches_per_archetype) = self.0.write().get_mut(&key) {
if let Some(min_time) =
caches_per_archetype.pending_timeful_invalidation.as_mut()
{
*min_time = TimeInt::min(*min_time, time);
} else {
caches_per_archetype.pending_timeful_invalidation = Some(time);
}
}
}
}
}
}
}

impl CachesPerArchetype {
/// Removes all entries from the cache that have been asynchronously invalidated.
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
fn handle_pending_invalidation(&mut self) {
let pending_timeless_invalidation = self.pending_timeless_invalidation;
let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();

if !pending_timeless_invalidation && !pending_timeful_invalidation {
return;
}

re_tracing::profile_function!();

for latest_at_cache in self.latest_at_per_archetype.read().values() {
let mut latest_at_cache = latest_at_cache.write();

if pending_timeless_invalidation {
latest_at_cache.timeless = None;
}

if let Some(min_time) = self.pending_timeful_invalidation {
latest_at_cache
.per_query_time
.retain(|&query_time, _| query_time < min_time);

latest_at_cache
.per_data_time
.retain(|&data_time, _| data_time < min_time);
}
}

self.pending_timeful_invalidation = None;
self.pending_timeless_invalidation = false;
}
}

// ---

/// Caches the results of any query for an arbitrary range of time.
Expand All @@ -150,8 +315,10 @@ impl CacheKey {
pub struct CacheBucket {
/// The _data_ timestamps and [`RowId`]s of all cached rows.
///
/// This corresponds to the data time and `RowId` returned by `re_query::query_archetype`.
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>,
pub(crate) data_times: VecDeque<(TimeInt, RowId)>,

/// The [`InstanceKey`]s of the point-of-view components.
pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
Expand All @@ -169,8 +336,8 @@ pub struct CacheBucket {
impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_pov_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_data_times.iter()
pub fn iter_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
Expand Down Expand Up @@ -206,7 +373,7 @@ impl CacheBucket {
/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.pov_data_times.len()
self.data_times.len()
}

#[inline]
Expand Down Expand Up @@ -235,15 +402,15 @@ macro_rules! impl_insert {
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
pov_data_times,
data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = data_times.partition_point(|t| t < &(query_time, pov_row_id));

pov_data_times.insert(index, (query_time, pov_row_id));
data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
Expand Down
3 changes: 1 addition & 2 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ macro_rules! impl_query_archetype {
re_tracing::profile_scope!("iter");

let it = itertools::izip!(
bucket.iter_pov_data_times(),
bucket.iter_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
Expand All @@ -133,7 +133,6 @@ macro_rules! impl_query_archetype {
Ok(())
};


let upsert_results = |
teh-cmc marked this conversation as resolved.
Show resolved Hide resolved
data_time: TimeInt,
arch_view: &::re_query::ArchetypeView<A>,
Expand Down
Loading
Loading