Skip to content

Commit

Permalink
Primary cache: basic debug tools via command palette (#4948)
Browse files Browse the repository at this point in the history
Adds tools to clear the cache and dump its state to the terminal from
the command palette.


![image](https://github.com/rerun-io/rerun/assets/2910679/a52005bd-636d-4967-ba02-faac5b86eea8)

- Fixes #4731
  • Loading branch information
teh-cmc authored Jan 30, 2024
1 parent 5fd5ae9 commit 7923f49
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 11 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/re_query_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ default = []
[dependencies]
# Rerun dependencies:
re_data_store.workspace = true
re_format.workspace = true
re_log.workspace = true
re_log_types.workspace = true
re_query.workspace = true
Expand All @@ -30,6 +31,7 @@ re_types_core.workspace = true

# External dependencies:
ahash.workspace = true
indent.workspace = true
itertools.workspace = true
parking_lot.workspace = true
paste.workspace = true
Expand Down
123 changes: 118 additions & 5 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use ahash::{HashMap, HashSet};
use itertools::Itertools;
use parking_lot::RwLock;
use paste::paste;
use seq_macro::seq;
Expand Down Expand Up @@ -43,7 +44,6 @@ impl From<RangeQuery> for AnyQuery {
// ---

/// Maintains the top-level cache mappings.
//
pub struct Caches {
/// The [`StoreId`] of the associated [`DataStore`].
store_id: StoreId,
Expand All @@ -52,6 +52,41 @@ pub struct Caches {
per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>,
}

impl std::fmt::Debug for Caches {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
store_id,
per_cache_key,
} = self;

let mut strings = Vec::new();

strings.push(format!("[Caches({store_id})]"));

let per_cache_key = per_cache_key.read();
let per_cache_key: BTreeMap<_, _> = per_cache_key.iter().collect();

for (cache_key, caches_per_archetype) in &per_cache_key {
let caches_per_archetype = caches_per_archetype.read();
strings.push(format!(
" [{cache_key:?} (pending_timeful={:?} pending_timeless={:?})]",
caches_per_archetype
.pending_timeful_invalidation
.map(|t| cache_key
.timeline
.format_time_range_utc(&TimeRange::new(t, TimeInt::MAX))),
caches_per_archetype.pending_timeless_invalidation,
));
strings.push(indent::indent_all_by(
4,
format!("{caches_per_archetype:?}"),
));
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl std::ops::Deref for Caches {
type Target = RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>;

Expand Down Expand Up @@ -118,10 +153,51 @@ pub struct CachesPerArchetype {
pending_timeless_invalidation: bool,
}

impl std::fmt::Debug for CachesPerArchetype {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let CachesPerArchetype {
latest_at_per_archetype,
range_per_archetype,
pending_timeful_invalidation: _,
pending_timeless_invalidation: _,
} = self;

let mut strings = Vec::new();

{
let latest_at_per_archetype = latest_at_per_archetype.read();
let latest_at_per_archetype: BTreeMap<_, _> = latest_at_per_archetype.iter().collect();

for (archetype_name, latest_at_cache) in &latest_at_per_archetype {
let latest_at_cache = latest_at_cache.read();
strings.push(format!(
"[latest_at for {archetype_name} ({})]",
re_format::format_bytes(latest_at_cache.total_size_bytes() as _)
));
strings.push(indent::indent_all_by(2, format!("{latest_at_cache:?}")));
}
}

{
let range_per_archetype = range_per_archetype.read();
let range_per_archetype: BTreeMap<_, _> = range_per_archetype.iter().collect();

for (archetype_name, range_cache) in &range_per_archetype {
let range_cache = range_cache.read();
strings.push(format!(
"[range for {archetype_name} ({})]",
re_format::format_bytes(range_cache.total_size_bytes() as _)
));
strings.push(indent::indent_all_by(2, format!("{range_cache:?}")));
}
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl Caches {
/// Clears all caches.
//
// TODO(#4731): expose palette command.
#[inline]
pub fn clear(&self) {
self.write().clear();
Expand Down Expand Up @@ -229,7 +305,7 @@ impl Caches {
}

/// Uniquely identifies cached query results in the [`Caches`].
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CacheKey {
/// Which [`EntityPath`] is the query targeting?
pub entity_path: EntityPath,
Expand All @@ -238,6 +314,17 @@ pub struct CacheKey {
pub timeline: Timeline,
}

impl std::fmt::Debug for CacheKey {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
entity_path,
timeline,
} = self;
f.write_fmt(format_args!("{entity_path} on {}", timeline.name()))
}
}

impl CacheKey {
#[inline]
pub fn new(entity_path: impl Into<EntityPath>, timeline: impl Into<Timeline>) -> Self {
Expand Down Expand Up @@ -342,7 +429,7 @@ impl StoreSubscriber for Caches {
// NOTE: Do _NOT_ lock from within the if clause itself or the guard will live
// for the remainder of the if statement and hell will ensue.
// <https://rust-lang.github.io/rust-clippy/master/#if_let_mutex> is
// supposed to catch but it didn't, I don't know why.
// supposed to catch that but it doesn't, I don't know why.
let mut caches_per_archetype = caches_per_archetype.write();
if let Some(min_time) =
caches_per_archetype.pending_timeful_invalidation.as_mut()
Expand Down Expand Up @@ -467,6 +554,32 @@ pub struct CacheBucket {
// TODO(cmc): secondary cache
}

impl std::fmt::Debug for CacheBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
data_times: _,
pov_instance_keys: _,
components,
total_size_bytes: _,
} = self;

let strings = components
.iter()
.filter(|(_, data)| data.dyn_num_values() > 0)
.map(|(name, data)| {
format!(
"{} {name} values spread across {} entries ({})",
data.dyn_num_values(),
data.dyn_num_entries(),
re_format::format_bytes(data.dyn_total_size_bytes() as _),
)
})
.collect_vec();

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl CacheBucket {
#[inline]
pub fn time_range(&self) -> Option<TimeRange> {
Expand Down
1 change: 1 addition & 0 deletions crates/re_query_cache/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Caches {
let range_cache @ RangeCache {
per_data_time,
timeless,
timeline: _,
} = &*range_cache.read();

let total_rows = per_data_time.data_times.len() as u64;
Expand Down
55 changes: 53 additions & 2 deletions crates/re_query_cache/src/latest_at.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{collections::BTreeMap, sync::Arc};

use ahash::HashMap;
use paste::paste;
use seq_macro::seq;

use re_data_store::{DataStore, LatestAtQuery, TimeInt};
use re_log_types::{EntityPath, RowId};
use re_log_types::{EntityPath, RowId, Timeline};
use re_query::query_archetype;
use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};

Expand Down Expand Up @@ -40,10 +41,53 @@ pub struct LatestAtCache {
// timeful case.
pub timeless: Option<CacheBucket>,

/// For debugging purposes.
pub(crate) timeline: Timeline,

/// Total size of the data stored in this cache in bytes.
total_size_bytes: u64,
}

impl std::fmt::Debug for LatestAtCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
per_query_time,
per_data_time,
timeless,
timeline,
total_size_bytes: _,
} = self;

let mut strings = Vec::new();

if let Some(bucket) = timeless.as_ref() {
strings.push(format!(
"query_time=<timeless> -> data_time=<timeless> ({})",
re_format::format_bytes(bucket.total_size_bytes as _),
));
}

let data_times_per_bucket: HashMap<_, _> = per_data_time
.iter()
.map(|(time, bucket)| (Arc::as_ptr(bucket), *time))
.collect();

for (query_time, bucket) in per_query_time {
let query_time = timeline.typ().format_utc(*query_time);
let data_time = data_times_per_bucket
.get(&Arc::as_ptr(bucket))
.map_or_else(|| "MISSING?!".to_owned(), |t| timeline.typ().format_utc(*t));
strings.push(format!(
"query_time={query_time} -> data_time={data_time} ({})",
re_format::format_bytes(bucket.total_size_bytes as _),
));
strings.push(indent::indent_all_by(2, format!("{bucket:?}")));
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl SizeBytes for LatestAtCache {
#[inline]
fn heap_size_bytes(&self) -> u64 {
Expand All @@ -65,6 +109,7 @@ impl LatestAtCache {
per_query_time,
per_data_time,
timeless: _,
timeline: _,
total_size_bytes,
} = self;

Expand Down Expand Up @@ -187,7 +232,13 @@ macro_rules! impl_query_archetype_latest_at {
let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let crate::LatestAtCache { per_query_time, per_data_time, timeless, total_size_bytes } = latest_at_cache;
let crate::LatestAtCache {
per_query_time,
per_data_time,
timeless,
timeline: _,
total_size_bytes,
} = latest_at_cache;

let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(mut query_time_bucket_at_query_time) => {
Expand Down
53 changes: 51 additions & 2 deletions crates/re_query_cache/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use paste::paste;
use seq_macro::seq;

use re_data_store::{DataStore, RangeQuery, TimeInt};
use re_log_types::{EntityPath, RowId, TimeRange};
use re_log_types::{EntityPath, RowId, TimeRange, Timeline};
use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};

use crate::{CacheBucket, Caches, MaybeCachedComponentData};
Expand All @@ -16,11 +16,58 @@ pub struct RangeCache {
///
/// Query time is irrelevant for range queries.
//
// TODO(cmc): bucketize
// TODO(#4810): bucketize
pub per_data_time: CacheBucket,

/// All timeless data.
pub timeless: CacheBucket,

/// For debugging purposes.
pub(crate) timeline: Timeline,
}

impl std::fmt::Debug for RangeCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
per_data_time,
timeless,
timeline,
} = self;

let mut strings = Vec::new();

let mut data_time_min = TimeInt::MAX;
let mut data_time_max = TimeInt::MIN;

if !timeless.is_empty() {
data_time_min = TimeInt::MIN;
}

if !per_data_time.is_empty() {
data_time_min = TimeInt::min(
data_time_min,
per_data_time.data_times.front().map(|(t, _)| *t).unwrap(),
);
data_time_max = TimeInt::max(
data_time_max,
per_data_time.data_times.back().map(|(t, _)| *t).unwrap(),
);
}

strings.push(format!(
"{} ({})",
timeline
.typ()
.format_range_utc(TimeRange::new(data_time_min, data_time_max)),
re_format::format_bytes(
(timeless.total_size_bytes + per_data_time.total_size_bytes) as _
),
));
strings.push(indent::indent_all_by(2, format!("{timeless:?}")));
strings.push(indent::indent_all_by(2, format!("{per_data_time:?}")));

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl SizeBytes for RangeCache {
Expand All @@ -29,6 +76,7 @@ impl SizeBytes for RangeCache {
let Self {
per_data_time,
timeless,
timeline: _,
} = self;

per_data_time.total_size_bytes + timeless.total_size_bytes
Expand All @@ -48,6 +96,7 @@ impl RangeCache {
let Self {
per_data_time,
timeless: _,
timeline: _,
} = self;

per_data_time.truncate_at_time(threshold)
Expand Down
Loading

0 comments on commit 7923f49

Please sign in to comment.