Skip to content

Commit

Permalink
Primary caching 11: cache stats and integration with memory panel (#4773
Browse files Browse the repository at this point in the history
)

The primary cache now tracks memory statistics and display them in the
memory panel.

This immediately highlights a very stupid thing that the cache does:
missing optional components that have been turned into streams of
default values by the `ArchetypeView` are materialized as such
:man_facepalming:
- #4779


https://github.com/rerun-io/rerun/assets/2910679/876b264a-3f77-4d91-934e-aa8897bb32fe



- Fixes #4730 


---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc authored Jan 15, 2024
1 parent 68de772 commit 1f19805
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions crates/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ pub struct TimeInt(pub(crate) i64);

impl nohash_hasher::IsEnabled for TimeInt {}

impl re_types_core::SizeBytes for TimeInt {
#[inline]
fn heap_size_bytes(&self) -> u64 {
0
}
}

impl TimeInt {
/// The beginning of time.
///
Expand Down
45 changes: 32 additions & 13 deletions crates/re_memory/src/memory_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct MemoryHistory {
/// Bytes used by the datastore according to its own accounting.
pub counted_store: History<i64>,

/// Bytes used by the primary caches according to their own accounting.
pub counted_primary_caches: History<i64>,

/// Bytes used by the blueprint store according to its own accounting.
pub counted_blueprint: History<i64>,
}
Expand All @@ -38,6 +41,7 @@ impl Default for MemoryHistory {
counted: History::new(0..max_elems, max_seconds),
counted_gpu: History::new(0..max_elems, max_seconds),
counted_store: History::new(0..max_elems, max_seconds),
counted_primary_caches: History::new(0..max_elems, max_seconds),
counted_blueprint: History::new(0..max_elems, max_seconds),
}
}
Expand All @@ -50,39 +54,54 @@ impl MemoryHistory {
counted,
counted_gpu,
counted_store,
counted_primary_caches,
counted_blueprint,
} = self;
resident.is_empty()
&& counted.is_empty()
&& counted_gpu.is_empty()
&& counted_store.is_empty()
&& counted_primary_caches.is_empty()
&& counted_blueprint.is_empty()
}

/// Add data to history
pub fn capture(
&mut self,
counted_gpu: Option<i64>,
counted_store: Option<i64>,
counted_blueprint: Option<i64>,
updated_counted_gpu: Option<i64>,
updated_counted_store: Option<i64>,
updated_counted_primary_caches: Option<i64>,
updated_counted_blueprint: Option<i64>,
) {
let mem_use = crate::MemoryUse::capture();
let now = crate::util::sec_since_start();

if let Some(resident) = mem_use.resident {
self.resident.add(now, resident);
let Self {
resident,
counted,
counted_gpu,
counted_store,
counted_primary_caches,
counted_blueprint,
} = self;

if let Some(updated_resident) = mem_use.resident {
resident.add(now, updated_resident);
}
if let Some(updated_counted) = mem_use.counted {
counted.add(now, updated_counted);
}
if let Some(counted) = mem_use.counted {
self.counted.add(now, counted);
if let Some(updated_counted_gpu) = updated_counted_gpu {
counted_gpu.add(now, updated_counted_gpu);
}
if let Some(counted_gpu) = counted_gpu {
self.counted_gpu.add(now, counted_gpu);
if let Some(updated_counted_store) = updated_counted_store {
counted_store.add(now, updated_counted_store);
}
if let Some(counted_store) = counted_store {
self.counted_store.add(now, counted_store);
if let Some(updated_counted_primary_caches) = updated_counted_primary_caches {
counted_primary_caches.add(now, updated_counted_primary_caches);
}
if let Some(counted_blueprint) = counted_blueprint {
self.counted_blueprint.add(now, counted_blueprint);
if let Some(updated_counted_blueprint) = updated_counted_blueprint {
counted_blueprint.add(now, updated_counted_blueprint);
}
}
}
130 changes: 98 additions & 32 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use re_data_store::{
};
use re_log_types::{EntityPath, RowId, StoreId, TimeInt, Timeline};
use re_query::ArchetypeView;
use re_types_core::{components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName};
use re_types_core::{
components::InstanceKey, Archetype, ArchetypeName, Component, ComponentName, SizeBytes as _,
};

use crate::{ErasedFlatVecDeque, FlatVecDeque};

Expand Down Expand Up @@ -49,10 +51,8 @@ static CACHES: Lazy<StoreSubscriberHandle> =
Lazy::new(|| re_data_store::DataStore::register_subscriber(Box::<Caches>::default()));

/// Maintains the top-level cache mappings.
//
// TODO(#4730): SizeBytes support + size stats + mem panel
#[derive(Default)]
pub struct Caches(RwLock<HashMap<CacheKey, CachesPerArchetype>>);
pub struct Caches(pub(crate) RwLock<HashMap<CacheKey, CachesPerArchetype>>);

#[derive(Default)]
pub struct CachesPerArchetype {
Expand All @@ -65,7 +65,7 @@ pub struct CachesPerArchetype {
//
// TODO(cmc): At some point we should probably just store the PoV and optional components rather
// than an `ArchetypeName`: the query system doesn't care about archetypes.
latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,
pub(crate) latest_at_per_archetype: RwLock<HashMap<ArchetypeName, Arc<RwLock<LatestAtCache>>>>,

/// Everything greater than or equal to this timestamp has been asynchronously invalidated.
///
Expand Down Expand Up @@ -116,8 +116,8 @@ impl Caches {
re_data_store::DataStore::with_subscriber_once(*CACHES, move |caches: &Caches| {
let mut caches = caches.0.write();

let caches_per_archetype = caches.entry(key).or_default();
caches_per_archetype.handle_pending_invalidation();
let caches_per_archetype = caches.entry(key.clone()).or_default();
caches_per_archetype.handle_pending_invalidation(&key);

let mut latest_at_per_archetype =
caches_per_archetype.latest_at_per_archetype.write();
Expand All @@ -133,6 +133,12 @@ impl Caches {
let mut cache = cache.write();
f(&mut cache)
}

#[inline]
pub(crate) fn with<F: FnMut(&Caches) -> R, R>(f: F) -> R {
// NOTE: downcasting cannot fail, this is our own private handle.
re_data_store::DataStore::with_subscriber(*CACHES, f).unwrap()
}
}

/// Uniquely identifies cached query results in the [`Caches`].
Expand Down Expand Up @@ -264,7 +270,7 @@ impl CachesPerArchetype {
///
/// Invalidation is deferred to query time because it is far more efficient that way: the frame
/// time effectively behaves as a natural micro-batching mechanism.
fn handle_pending_invalidation(&mut self) {
fn handle_pending_invalidation(&mut self, key: &CacheKey) {
let pending_timeless_invalidation = self.pending_timeless_invalidation;
let pending_timeful_invalidation = self.pending_timeful_invalidation.is_some();

Expand All @@ -281,15 +287,39 @@ impl CachesPerArchetype {
latest_at_cache.timeless = None;
}

let mut removed_bytes = 0u64;
if let Some(min_time) = self.pending_timeful_invalidation {
latest_at_cache
.per_query_time
.retain(|&query_time, _| query_time < min_time);

latest_at_cache
.per_data_time
.retain(|&data_time, _| data_time < min_time);
latest_at_cache.per_data_time.retain(|&data_time, bucket| {
if data_time < min_time {
return true;
}

// Only if that bucket is about to be dropped.
if Arc::strong_count(bucket) == 1 {
removed_bytes += bucket.read().total_size_bytes;
}

false
});
}

latest_at_cache.total_size_bytes = latest_at_cache
.total_size_bytes
.checked_sub(removed_bytes)
.unwrap_or_else(|| {
re_log::debug!(
store_id = %key.store_id,
entity_path = %key.entity_path,
current = latest_at_cache.total_size_bytes,
removed = removed_bytes,
"book keeping underflowed"
);
u64::MIN
});
}

self.pending_timeful_invalidation = None;
Expand Down Expand Up @@ -328,9 +358,14 @@ pub struct CacheBucket {
// TODO(#4733): Don't denormalize auto-generated instance keys.
// TODO(#4734): Don't denormalize splatted values.
pub(crate) components: BTreeMap<ComponentName, Box<dyn ErasedFlatVecDeque + Send + Sync>>,

/// The total size in bytes stored in this bucket.
///
/// Only used so we can decrement the global cache size when the last reference to a bucket
/// gets dropped.
pub(crate) total_size_bytes: u64,
//
// TODO(cmc): secondary cache
// TODO(#4730): size stats: this requires codegen'ing SizeBytes for all components!
}

impl CacheBucket {
Expand Down Expand Up @@ -387,12 +422,14 @@ macro_rules! impl_insert {
#[doc = "Inserts the contents of the given [`ArchetypeView`], which are made of the specified"]
#[doc = "`" $N "` point-of-view components and `" $M "` optional components, to the cache."]
#[doc = ""]
#[doc = "Returns the size in bytes of the data that was cached."]
#[doc = ""]
#[doc = "`query_time` must be the time of query, _not_ of the resulting data."]
pub fn [<insert_pov$N _comp$M>]<A, $($pov,)+ $($comp),*>(
&mut self,
query_time: TimeInt,
arch_view: &ArchetypeView<A>,
) -> ::re_query::Result<()>
) -> ::re_query::Result<u64>
where
A: Archetype,
$($pov: Component + Send + Sync + 'static,)+
Expand All @@ -401,21 +438,31 @@ macro_rules! impl_insert {
// NOTE: not `profile_function!` because we want them merged together.
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = data_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = self.data_times.partition_point(|t| t < &(query_time, pov_row_id));

let mut added_size_bytes = 0u64;

self.data_times.insert(index, (query_time, pov_row_id));
added_size_bytes += (query_time, pov_row_id).total_size_bytes();

{
// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
// instead, that way we can efficiently computes its size while we're at it.
let added: FlatVecDeque<InstanceKey> = arch_view
.iter_instance_keys()
.collect::<VecDeque<InstanceKey>>()
.into();
added_size_bytes += added.total_size_bytes();
self.pov_instance_keys.insert_deque(index, added);
}

$(added_size_bytes += self.insert_component::<A, $pov>(index, arch_view)?;)+
$(added_size_bytes += self.insert_component_opt::<A, $comp>(index, arch_view)?;)*

data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
self.total_size_bytes += added_size_bytes;

Ok(())
Ok(added_size_bytes)
} }
};

Expand All @@ -436,7 +483,7 @@ impl CacheBucket {
&mut self,
query_time: TimeInt,
arch_view: &ArchetypeView<A>,
) -> ::re_query::Result<()>
) -> ::re_query::Result<u64>
where
A: Archetype,
R1: Component + Send + Sync + 'static,
Expand All @@ -453,42 +500,58 @@ impl CacheBucket {
&mut self,
at: usize,
arch_view: &ArchetypeView<A>,
) -> re_query::Result<()> {
) -> re_query::Result<u64> {
re_tracing::profile_function!(C::name());

let data = self
.components
.entry(C::name())
.or_insert_with(|| Box::new(FlatVecDeque::<C>::new()));

// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
// instead, that way we can efficiently computes its size while we're at it.
let added: FlatVecDeque<C> = arch_view
.iter_required_component::<C>()?
.collect::<VecDeque<C>>()
.into();
let added_size_bytes = added.total_size_bytes();

// NOTE: downcast cannot fail, we create it just above.
let data = data.as_any_mut().downcast_mut::<FlatVecDeque<C>>().unwrap();
data.insert(at, arch_view.iter_required_component::<C>()?);
data.insert_deque(at, added);

Ok(())
Ok(added_size_bytes)
}

#[inline]
fn insert_component_opt<A: Archetype, C: Component + Send + Sync + 'static>(
&mut self,
at: usize,
arch_view: &ArchetypeView<A>,
) -> re_query::Result<()> {
) -> re_query::Result<u64> {
re_tracing::profile_function!(C::name());

let data = self
.components
.entry(C::name())
.or_insert_with(|| Box::new(FlatVecDeque::<Option<C>>::new()));

// The `FlatVecDeque` will have to collect the data one way or another: do it ourselves
// instead, that way we can efficiently computes its size while we're at it.
let added: FlatVecDeque<Option<C>> = arch_view
.iter_optional_component::<C>()?
.collect::<VecDeque<Option<C>>>()
.into();
let added_size_bytes = added.total_size_bytes();

// NOTE: downcast cannot fail, we create it just above.
let data = data
.as_any_mut()
.downcast_mut::<FlatVecDeque<Option<C>>>()
.unwrap();
data.insert(at, arch_view.iter_optional_component::<C>()?);
data.insert_deque(at, added);

Ok(())
Ok(added_size_bytes)
}
}

Expand Down Expand Up @@ -526,4 +589,7 @@ pub struct LatestAtCache {
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
// timeful case.
pub timeless: Option<CacheBucket>,

/// Total size of the data stored in this cache in bytes.
pub total_size_bytes: u64,
}
Loading

0 comments on commit 1f19805

Please sign in to comment.