Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 9: timeless latest-at support #4721

Merged
merged 4 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions crates/re_log_types/src/time_point/time_int.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ impl TimeInt {
pub fn abs(&self) -> Self {
Self(self.0.saturating_abs())
}

#[inline]
pub fn is_timeless(&self) -> bool {
self == &Self::BEGINNING
}
Comment on lines -72 to -75
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this as it sets a very dangerous precedent (doesn't seem to be used though, which is good news).

}

impl From<i64> for TimeInt {
Expand Down
10 changes: 9 additions & 1 deletion crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ static CACHES: Lazy<Caches> = Lazy::new(Caches::default);
//
// TODO(cmc): Store subscriber and cache invalidation.
// TODO(#4730): SizeBytes support + size stats + mem panel
// TODO(cmc): timeless caching support
#[derive(Default)]
pub struct Caches {
latest_at: RwLock<HashMap<CacheKey, Arc<RwLock<LatestAtCache>>>>,
Expand Down Expand Up @@ -351,4 +350,13 @@ pub struct LatestAtCache {
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

/// Dedicated bucket for timeless data, if any.
///
/// Query time and data time are one and the same in the timeless case, therefore we only need
/// this one bucket.
//
// NOTE: Lives separately so we don't pay the extra `Option` cost in the much more common
// timeful case.
pub timeless: Option<CacheBucket>,
}
122 changes: 77 additions & 45 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ where
R1: Component + Send + Sync + 'static,
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
MaybeCachedComponentData<'_, R1>,
),
Expand All @@ -93,7 +93,7 @@ macro_rules! impl_query_archetype {
$($comp: Component + Send + Sync + 'static,)*
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
$(MaybeCachedComponentData<'_, $pov>,)+
$(MaybeCachedComponentData<'_, Option<$comp>>,)*
Expand All @@ -107,7 +107,7 @@ macro_rules! impl_query_archetype {
);


let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
let mut iter_results = |timeless: bool, bucket: &crate::CacheBucket| -> crate::Result<()> {
re_tracing::profile_scope!("iter");

let it = itertools::izip!(
Expand All @@ -117,9 +117,9 @@ macro_rules! impl_query_archetype {
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
$(bucket.iter_component_opt::<$comp>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$comp>::name()))?,)*
).map(|(time, instance_keys, $($pov,)+ $($comp,)*)| {
).map(|((time, row_id), instance_keys, $($pov,)+ $($comp,)*)| {
(
*time,
((!timeless).then_some(*time), *row_id),
MaybeCachedComponentData::Cached(instance_keys),
$(MaybeCachedComponentData::Cached($pov),)+
$(MaybeCachedComponentData::Cached($comp),)*
Expand All @@ -133,68 +133,102 @@ macro_rules! impl_query_archetype {
Ok(())
};


let upsert_results = |
data_time: TimeInt,
arch_view: &::re_query::ArchetypeView<A>,
bucket: &mut crate::CacheBucket,
| -> crate::Result<()> {
re_log::trace!(data_time=?data_time, ?data_time, "fill");

// Grabbing the current time is quite costly on web.
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();

bucket.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(data_time, &arch_view)?;

#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}

Ok(())
};

let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache;
let crate::LatestAtCache { per_query_time, per_data_time, timeless } = latest_at_cache;

let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => {
// Fastest path: we have an entry for this exact query time, no need to look any
// further.
return iter_results(&query_time_bucket_at_query_time.get().read());
re_log::trace!(query_time=?query.at, "cache hit (query time)");
return iter_results(false, &query_time_bucket_at_query_time.get().read());
}
entry @ std::collections::btree_map::Entry::Vacant(_) => entry,
};

let arch_view = query_archetype::<A>(store, &query, entity_path)?;
// TODO(cmc): actual timeless caching support.
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN);
let data_time = arch_view.data_time();

// Fast path: we've run the query and realized that we already have the data for the resulting
// _data_ time, so let's use that to avoid join & deserialization costs.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);
if let Some(data_time) = data_time { // Reminder: `None` means timeless.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
re_log::trace!(query_time=?query.at, ?data_time, "cache hit (data time)");

*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit for that query
// time ends up taking the fastest path.
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);
// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit for that query
// time ends up taking the fastest path.
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

return iter_results(&data_time_bucket_at_data_time.read());
return iter_results(false, &data_time_bucket_at_data_time.read());
}
} else {
if let Some(timeless_bucket) = timeless.as_ref() {
re_log::trace!(query_time=?query.at, "cache hit (data time, timeless)");
return iter_results(true, timeless_bucket);
}
}

let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default();

// Slowest path: this is a complete cache miss.
{
re_tracing::profile_scope!("fill");
if let Some(data_time) = data_time { // Reminder: `None` means timeless.
re_log::trace!(query_time=?query.at, ?data_time, "cache miss");

// Grabbing the current time is quite costly on web.
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();

let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
query_time_bucket_at_query_time.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
upsert_results(data_time, &arch_view, &mut query_time_bucket_at_query_time)?;
}
}

let data_time_bucket_at_data_time = per_data_time.entry(data_time);
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);
let data_time_bucket_at_data_time = per_data_time.entry(data_time);
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);

iter_results(false, &query_time_bucket_at_query_time.read())
} else {
re_log::trace!(query_time=?query.at, "cache miss (timeless)");

let mut timeless_bucket = crate::CacheBucket::default();

iter_results(&query_time_bucket_at_query_time.read())
upsert_results(TimeInt::MIN, &arch_view, &mut timeless_bucket)?;
iter_results(true, &timeless_bucket)?;

*timeless = Some(timeless_bucket);
Ok(())
}
};


Expand All @@ -209,8 +243,7 @@ macro_rules! impl_query_archetype {

for arch_view in arch_views {
let data = (
// TODO(cmc): actual timeless caching support.
(arch_view.data_time().unwrap_or(TimeInt::MIN), arch_view.primary_row_id()),
(arch_view.data_time(), arch_view.primary_row_id()),
MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()),
$(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+
$(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)*
Expand All @@ -228,8 +261,7 @@ macro_rules! impl_query_archetype {
let arch_view = ::re_query::query_archetype::<A>(store, query, entity_path)?;

let data = (
// TODO(cmc): actual timeless caching support.
(arch_view.data_time().unwrap_or(TimeInt::MIN), arch_view.primary_row_id()),
(arch_view.data_time(), arch_view.primary_row_id()),
MaybeCachedComponentData::Raw(arch_view.iter_instance_keys().collect()),
$(MaybeCachedComponentData::Raw(arch_view.iter_required_component::<$pov>()?.collect()),)+
$(MaybeCachedComponentData::Raw(arch_view.iter_optional_component::<$comp>()?.collect()),)*
Expand Down Expand Up @@ -286,7 +318,7 @@ where
R1: Component + Send + Sync + 'static,
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
MaybeCachedComponentData<'_, R1>,
),
Expand Down Expand Up @@ -318,7 +350,7 @@ macro_rules! impl_query_archetype_with_history {
$($comp: Component + Send + Sync + 'static,)*
F: FnMut(
(
(TimeInt, RowId),
(Option<TimeInt>, RowId),
MaybeCachedComponentData<'_, InstanceKey>,
$(MaybeCachedComponentData<'_, $pov>,)+
$(MaybeCachedComponentData<'_, Option<$comp>>,)*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ macro_rules! impl_process_archetype {
&EntityPath,
&EntityProperties,
&SpatialSceneEntityContext<'_>,
(TimeInt, RowId),
(Option<TimeInt>, RowId),
&[InstanceKey],
$(&[$pov],)*
$(&[Option<$comp>],)*
Expand Down
12 changes: 4 additions & 8 deletions crates/re_space_view_text_log/src/visualizer_system.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use re_data_store::TimeRange;
use re_entity_db::EntityPath;
use re_log_types::{RowId, TimeInt};
use re_log_types::RowId;
use re_types::{
archetypes::TextLog,
components::{Color, Text, TextLogLevel},
Expand Down Expand Up @@ -61,6 +61,8 @@ impl VisualizerSystem for TextLogSystem {
let store = ctx.entity_db.store();

for data_result in query.iter_visible_data_results(Self::identifier()) {
re_tracing::profile_scope!("primary", &data_result.entity_path.to_string());

// We want everything, for all times:
let timeline_query =
re_data_store::RangeQuery::new(query.timeline, TimeRange::EVERYTHING);
Expand All @@ -77,8 +79,7 @@ impl VisualizerSystem for TextLogSystem {
self.entries.push(Entry {
row_id,
entity_path: data_result.entity_path.clone(),
// TODO(cmc): real support for timeless data in caches.
time: (time != TimeInt::MIN).then(|| time.as_i64()),
time: time.map(|time| time.as_i64()),
color: *color,
body: body.clone(),
level: level.clone(),
Expand All @@ -88,11 +89,6 @@ impl VisualizerSystem for TextLogSystem {
)?;
}

{
re_tracing::profile_scope!("sort");
self.entries.sort_by_key(|entry| entry.time);
}

Ok(Vec::new())
}

Expand Down
Loading
Loading