Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary cache: basic debug tools via command palette #4948

Merged
merged 3 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/re_query_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ default = []
[dependencies]
# Rerun dependencies:
re_data_store.workspace = true
re_format.workspace = true
re_log.workspace = true
re_log_types.workspace = true
re_query.workspace = true
Expand All @@ -30,6 +31,7 @@ re_types_core.workspace = true

# External dependencies:
ahash.workspace = true
indent.workspace = true
itertools.workspace = true
parking_lot.workspace = true
paste.workspace = true
Expand Down
123 changes: 118 additions & 5 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
};

use ahash::{HashMap, HashSet};
use itertools::Itertools;
use parking_lot::RwLock;
use paste::paste;
use seq_macro::seq;
Expand Down Expand Up @@ -43,7 +44,6 @@ impl From<RangeQuery> for AnyQuery {
// ---

/// Maintains the top-level cache mappings.
//
pub struct Caches {
/// The [`StoreId`] of the associated [`DataStore`].
store_id: StoreId,
Expand All @@ -52,6 +52,41 @@ pub struct Caches {
per_cache_key: RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>,
}

impl std::fmt::Debug for Caches {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
store_id,
per_cache_key,
} = self;

let mut strings = Vec::new();

strings.push(format!("[Caches({store_id})]"));

let per_cache_key = per_cache_key.read();
let per_cache_key: BTreeMap<_, _> = per_cache_key.iter().collect();

for (cache_key, caches_per_archetype) in &per_cache_key {
let caches_per_archetype = caches_per_archetype.read();
strings.push(format!(
" [{cache_key:?} (pending_timeful={:?} pending_timeless={:?})]",
caches_per_archetype
.pending_timeful_invalidation
.map(|t| cache_key
.timeline
.format_time_range_utc(&TimeRange::new(t, TimeInt::MAX))),
caches_per_archetype.pending_timeless_invalidation,
));
strings.push(indent::indent_all_by(
4,
format!("{caches_per_archetype:?}"),
));
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl std::ops::Deref for Caches {
type Target = RwLock<HashMap<CacheKey, Arc<RwLock<CachesPerArchetype>>>>;

Expand Down Expand Up @@ -118,10 +153,51 @@ pub struct CachesPerArchetype {
pending_timeless_invalidation: bool,
}

impl std::fmt::Debug for CachesPerArchetype {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let CachesPerArchetype {
latest_at_per_archetype,
range_per_archetype,
pending_timeful_invalidation: _,
pending_timeless_invalidation: _,
} = self;

let mut strings = Vec::new();

{
let latest_at_per_archetype = latest_at_per_archetype.read();
let latest_at_per_archetype: BTreeMap<_, _> = latest_at_per_archetype.iter().collect();

for (archetype_name, latest_at_cache) in &latest_at_per_archetype {
let latest_at_cache = latest_at_cache.read();
strings.push(format!(
"[latest_at for {archetype_name} ({})]",
re_format::format_bytes(latest_at_cache.total_size_bytes() as _)
));
strings.push(indent::indent_all_by(2, format!("{latest_at_cache:?}")));
}
}

{
let range_per_archetype = range_per_archetype.read();
let range_per_archetype: BTreeMap<_, _> = range_per_archetype.iter().collect();

for (archetype_name, range_cache) in &range_per_archetype {
let range_cache = range_cache.read();
strings.push(format!(
"[range for {archetype_name} ({})]",
re_format::format_bytes(range_cache.total_size_bytes() as _)
));
strings.push(indent::indent_all_by(2, format!("{range_cache:?}")));
}
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl Caches {
/// Clears all caches.
//
// TODO(#4731): expose palette command.
#[inline]
pub fn clear(&self) {
self.write().clear();
Expand Down Expand Up @@ -229,7 +305,7 @@ impl Caches {
}

/// Uniquely identifies cached query results in the [`Caches`].
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct CacheKey {
/// Which [`EntityPath`] is the query targeting?
pub entity_path: EntityPath,
Expand All @@ -238,6 +314,17 @@ pub struct CacheKey {
pub timeline: Timeline,
}

impl std::fmt::Debug for CacheKey {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
entity_path,
timeline,
} = self;
f.write_fmt(format_args!("{entity_path} on {}", timeline.name()))
}
}

impl CacheKey {
#[inline]
pub fn new(entity_path: impl Into<EntityPath>, timeline: impl Into<Timeline>) -> Self {
Expand Down Expand Up @@ -342,7 +429,7 @@ impl StoreSubscriber for Caches {
// NOTE: Do _NOT_ lock from within the if clause itself or the guard will live
// for the remainder of the if statement and hell will ensue.
// <https://rust-lang.github.io/rust-clippy/master/#if_let_mutex> is
// supposed to catch but it didn't, I don't know why.
// supposed to catch that but it doesn't, I don't know why.
let mut caches_per_archetype = caches_per_archetype.write();
if let Some(min_time) =
caches_per_archetype.pending_timeful_invalidation.as_mut()
Expand Down Expand Up @@ -467,6 +554,32 @@ pub struct CacheBucket {
// TODO(cmc): secondary cache
}

impl std::fmt::Debug for CacheBucket {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
data_times: _,
pov_instance_keys: _,
components,
total_size_bytes: _,
} = self;

let strings = components
.iter()
.filter(|(_, data)| data.dyn_num_values() > 0)
.map(|(name, data)| {
format!(
"{} {name} values spread across {} entries ({})",
data.dyn_num_values(),
data.dyn_num_entries(),
re_format::format_bytes(data.dyn_total_size_bytes() as _),
)
})
.collect_vec();

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl CacheBucket {
#[inline]
pub fn time_range(&self) -> Option<TimeRange> {
Expand Down
1 change: 1 addition & 0 deletions crates/re_query_cache/src/cache_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Caches {
let range_cache @ RangeCache {
per_data_time,
timeless,
timeline: _,
} = &*range_cache.read();

let total_rows = per_data_time.data_times.len() as u64;
Expand Down
55 changes: 53 additions & 2 deletions crates/re_query_cache/src/latest_at.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{collections::BTreeMap, sync::Arc};

use ahash::HashMap;
use paste::paste;
use seq_macro::seq;

use re_data_store::{DataStore, LatestAtQuery, TimeInt};
use re_log_types::{EntityPath, RowId};
use re_log_types::{EntityPath, RowId, Timeline};
use re_query::query_archetype;
use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};

Expand Down Expand Up @@ -40,10 +41,53 @@ pub struct LatestAtCache {
// timeful case.
pub timeless: Option<CacheBucket>,

/// For debugging purposes.
pub(crate) timeline: Timeline,

/// Total size of the data stored in this cache in bytes.
total_size_bytes: u64,
}

impl std::fmt::Debug for LatestAtCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
per_query_time,
per_data_time,
timeless,
timeline,
total_size_bytes: _,
} = self;

let mut strings = Vec::new();

if let Some(bucket) = timeless.as_ref() {
strings.push(format!(
"query_time=<timeless> -> data_time=<timeless> ({})",
re_format::format_bytes(bucket.total_size_bytes as _),
));
}

let data_times_per_bucket: HashMap<_, _> = per_data_time
.iter()
.map(|(time, bucket)| (Arc::as_ptr(bucket), *time))
.collect();

for (query_time, bucket) in per_query_time {
let query_time = timeline.typ().format_utc(*query_time);
let data_time = data_times_per_bucket
.get(&Arc::as_ptr(bucket))
.map_or_else(|| "MISSING?!".to_owned(), |t| timeline.typ().format_utc(*t));
strings.push(format!(
"query_time={query_time} -> data_time={data_time} ({})",
re_format::format_bytes(bucket.total_size_bytes as _),
));
strings.push(indent::indent_all_by(2, format!("{bucket:?}")));
}

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl SizeBytes for LatestAtCache {
#[inline]
fn heap_size_bytes(&self) -> u64 {
Expand All @@ -65,6 +109,7 @@ impl LatestAtCache {
per_query_time,
per_data_time,
timeless: _,
timeline: _,
total_size_bytes,
} = self;

Expand Down Expand Up @@ -187,7 +232,13 @@ macro_rules! impl_query_archetype_latest_at {
let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let crate::LatestAtCache { per_query_time, per_data_time, timeless, total_size_bytes } = latest_at_cache;
let crate::LatestAtCache {
per_query_time,
per_data_time,
timeless,
timeline: _,
total_size_bytes,
} = latest_at_cache;

let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(mut query_time_bucket_at_query_time) => {
Expand Down
53 changes: 51 additions & 2 deletions crates/re_query_cache/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use paste::paste;
use seq_macro::seq;

use re_data_store::{DataStore, RangeQuery, TimeInt};
use re_log_types::{EntityPath, RowId, TimeRange};
use re_log_types::{EntityPath, RowId, TimeRange, Timeline};
use re_types_core::{components::InstanceKey, Archetype, Component, SizeBytes};

use crate::{CacheBucket, Caches, MaybeCachedComponentData};
Expand All @@ -16,11 +16,58 @@ pub struct RangeCache {
///
/// Query time is irrelevant for range queries.
//
// TODO(cmc): bucketize
// TODO(#4810): bucketize
pub per_data_time: CacheBucket,

/// All timeless data.
pub timeless: CacheBucket,

/// For debugging purposes.
pub(crate) timeline: Timeline,
}

impl std::fmt::Debug for RangeCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
per_data_time,
timeless,
timeline,
} = self;

let mut strings = Vec::new();

let mut data_time_min = TimeInt::MAX;
let mut data_time_max = TimeInt::MIN;

if !timeless.is_empty() {
data_time_min = TimeInt::MIN;
}

if !per_data_time.is_empty() {
data_time_min = TimeInt::min(
data_time_min,
per_data_time.data_times.front().map(|(t, _)| *t).unwrap(),
);
data_time_max = TimeInt::max(
data_time_max,
per_data_time.data_times.back().map(|(t, _)| *t).unwrap(),
);
}

strings.push(format!(
"{} ({})",
timeline
.typ()
.format_range_utc(TimeRange::new(data_time_min, data_time_max)),
re_format::format_bytes(
(timeless.total_size_bytes + per_data_time.total_size_bytes) as _
),
));
strings.push(indent::indent_all_by(2, format!("{timeless:?}")));
strings.push(indent::indent_all_by(2, format!("{per_data_time:?}")));

f.write_str(&strings.join("\n").replace("\n\n", "\n"))
}
}

impl SizeBytes for RangeCache {
Expand All @@ -29,6 +76,7 @@ impl SizeBytes for RangeCache {
let Self {
per_data_time,
timeless,
timeline: _,
} = self;

per_data_time.total_size_bytes + timeless.total_size_bytes
Expand All @@ -48,6 +96,7 @@ impl RangeCache {
let Self {
per_data_time,
timeless: _,
timeline: _,
} = self;

per_data_time.truncate_at_time(threshold)
Expand Down
Loading
Loading