Skip to content

Commit

Permalink
Primary caching 9: timeless latest-at support (#4721)
Browse files Browse the repository at this point in the history
Introduces a dedicated cache bucket for timeless data and properly
forwards the information through all APIs downstream.

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc authored Jan 15, 2024
1 parent ccfd21a commit f569223
Show file tree
Hide file tree
Showing 6 changed files with 169 additions and 128 deletions.
5 changes: 0 additions & 5 deletions crates/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ impl TimeInt {
pub fn abs(&self) -> Self {
Self(self.0.saturating_abs())
}

#[inline]
pub fn is_timeless(&self) -> bool {
self == &Self::BEGINNING
}
}

impl From<i64> for TimeInt {
Expand Down
10 changes: 9 additions & 1 deletion crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ static CACHES: Lazy<Caches> = Lazy::new(Caches::default);
//
// TODO(cmc): Store subscriber and cache invalidation.
// TODO(#4730): SizeBytes support + size stats + mem panel
// TODO(cmc): timeless caching support
#[derive(Default)]
pub struct Caches {
latest_at: RwLock<HashMap<CacheKey, Arc<RwLock<LatestAtCache>>>>,
Expand Down Expand Up @@ -351,4 +350,13 @@ pub struct LatestAtCache {
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

/// Dedicated bucket for timeless data, if any.
///
/// Query time and data time are one and the same in the timeless case, therefore we only need
/// this one bucket.
//
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
// timeful case.
pub timeless: Option<CacheBucket>,
}
122 changes: 77 additions & 45 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ where
R1: Component + Send + Sync + 'static,
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
MaybeCachedComponentData<'_, R1>,
),
Expand All @@ -93,7 +93,7 @@ macro_rules! impl_query_archetype {
$($comp: Component + Send + Sync + 'static,)*
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
$(MaybeCachedComponentData<'_, $pov>,)+
$(MaybeCachedComponentData<'_, Option<$comp>>,)*
Expand All @@ -107,7 +107,7 @@ macro_rules! impl_query_archetype {
);


let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
let mut iter_results = |timeless: bool, bucket: &crate::CacheBucket| -> crate::Result<()> {
re_tracing::profile_scope!("iter");

let it = itertools::izip!(
Expand All @@ -117,9 +117,9 @@ macro_rules! impl_query_archetype {
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
$(bucket.iter_component_opt::<$comp>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$comp>::name()))?,)*
).map(|(time, instance_keys, $($pov,)+ $($comp,)*)| {
).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| {
(
*time,
((!timeless).then_some(*time), *row_id),
MaybeCachedComponentData::Cached(instance_keys),
$(MaybeCachedComponentData::Cached($pov),)+
$(MaybeCachedComponentData::Cached($comp),)*
Expand All @@ -133,68 +133,102 @@ macro_rules! impl_query_archetype {
Ok(())
};


let upsert_results = |
data_time: TimeInt,
arch_view: &::re_query::ArchetypeView<A>,
bucket: &mut crate::CacheBucket,
| -> crate::Result<()> {
re_log::trace!(data_time=?data_time, ?data_time, "fill");

// Grabbing the current time is quite costly on web.
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();

bucket.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(data_time, &arch_view)?;

#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}

Ok(())
};

let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache;
let crate::LatestAtCache { per_query_time, per_data_time, timeless } = latest_at_cache;

let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => {
// Fastest path: we have an entry for this exact query time, no need to look any
// further.
return iter_results(&query_time_bucket_at_query_time.get().read());
re_log::trace!(query_time=?query.at, "cache hit (query time)");
return iter_results(false, &query_time_bucket_at_query_time.get().read());
}
entry @ std::collections::btree_map::Entry::Vacant(_) => entry,
};

let arch_view = query_archetype::<A>(store, &query, entity_path)?;
// TODO(cmc): actual timeless caching support.
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN);
let data_time = arch_view.data_time();

// Fast path: we've run the query and realized that we already have the data for the resulting
// _data_ time, so let's use that to avoid join & deserialization costs.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);
if let Some(data_time) = data_time { // Reminder: `None` means timeless.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
re_log::trace!(query_time=?query.at, ?data_time, "cache hit (data time)");

*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit for that query
// time ends up taking the fastest path.
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);
// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit for that query
// time ends up taking the fastest path.
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

return iter_results(&data_time_bucket_at_data_time.read());
return iter_results(false, &data_time_bucket_at_data_time.read());
}
} else {
if let Some(timeless_bucket) = timeless.as_ref() {
re_log::trace!(query_time=?query.at, "cache hit (data time, timeless)");
return iter_results(true, timeless_bucket);
}
}

let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default();

// Slowest path: this is a complete cache miss.
{
re_tracing::profile_scope!("fill");
if let Some(data_time) = data_time { // Reminder: `None` means timeless.
re_log::trace!(query_time=?query.at, ?data_time, "cache miss");

// Grabbing the current time is quite costly on web.
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();

let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
query_time_bucket_at_query_time.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
upsert_results(data_time, &arch_view, &mut query_time_bucket_at_query_time)?;
}
}

let data_time_bucket_at_data_time = per_data_time.entry(data_time);
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);
let data_time_bucket_at_data_time = per_data_time.entry(data_time);
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);

iter_results(false, &query_time_bucket_at_query_time.read())
} else {
re_log::trace!(query_time=?query.at, "cache miss (timeless)");

let mut timeless_bucket = crate::CacheBucket::default();

iter_results(&query_time_bucket_at_query_time.read())
upsert_results(TimeInt::MIN, &arch_view, &mut timeless_bucket)?;
iter_results(true, &timeless_bucket)?;

*timeless = Some(timeless_bucket);
Ok(())
}
};


Expand All @@ -209,8 +243,7 @@ macro_rules! impl_query_archetype {

for arch_view in arch_views {
let data = (
// TODO(cmc): actual timeless caching support.
(arch_view.data_time().unwrap_or(TimeInt::MIN), arch_view.primary_row_id()),
(arch_view.data_time(), arch_view.primary_row_id()),
MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()),
$(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+
$(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)*
Expand All @@ -228,8 +261,7 @@ macro_rules! impl_query_archetype {
let arch_view = ::re_query::query_archetype::<A>(store, query, entity_path)?;

let data = (
// TODO(cmc): actual timeless caching support.
(arch_view.data_time().unwrap_or(TimeInt::MIN), arch_view.primary_row_id()),
(arch_view.data_time(), arch_view.primary_row_id()),
MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()),
$(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+
$(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)*
Expand Down Expand Up @@ -286,7 +318,7 @@ where
R1: Component + Send + Sync + 'static,
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
MaybeCachedComponentData<'_, R1>,
),
Expand Down Expand Up @@ -318,7 +350,7 @@ macro_rules! impl_query_archetype_with_history {
$($comp: Component + Send + Sync + 'static,)*
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
$(MaybeCachedComponentData<'_, $pov>,)+
$(MaybeCachedComponentData<'_, Option<$comp>>,)*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ macro_rules! impl_process_archetype {
&EntityPath,
&EntityProperties,
&SpatialSceneEntityContext<'_>,
(TimeInt, RowId),
(Option<TimeInt>, RowId),
&[InstanceKey],
$(&[$pov],)*
$(&[Option<$comp>],)*
Expand Down
12 changes: 4 additions & 8 deletions crates/re_space_view_text_log/src/visualizer_system.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use re_data_store::TimeRange;
use re_entity_db::EntityPath;
use re_log_types::{RowId, TimeInt};
use re_log_types::RowId;
use re_types::{
archetypes::TextLog,
components::{Color, Text, TextLogLevel},
Expand Down Expand Up @@ -61,6 +61,8 @@ impl VisualizerSystem for TextLogSystem {
let store = ctx.entity_db.store();

for data_result in query.iter_visible_data_results(Self::identifier()) {
re_tracing::profile_scope!("primary", &data_result.entity_path.to_string());

// We want everything, for all times:
let timeline_query =
re_data_store::RangeQuery::new(query.timeline, TimeRange::EVERYTHING);
Expand All @@ -77,8 +79,7 @@ impl VisualizerSystem for TextLogSystem {
self.entries.push(Entry {
row_id,
entity_path: data_result.entity_path.clone(),
// TODO(cmc): real support for timeless data in caches.
time: (time != TimeInt::MIN).then(|| time.as_i64()),
time: time.map(|time| time.as_i64()),
color: *color,
body: body.clone(),
level: level.clone(),
Expand All @@ -88,11 +89,6 @@ impl VisualizerSystem for TextLogSystem {
)?;
}

{
re_tracing::profile_scope!("sort");
self.entries.sort_by_key(|entry| entry.time);
}

Ok(Vec::new())
}

Expand Down
Loading

0 comments on commit f569223

Please sign in to comment.