Skip to content

Commit

Permalink
Primary caching 8: implement latest-at data-time cache entry deduplic…
Browse files Browse the repository at this point in the history
…ation (#4712)

Introduces the notion of cache deduplication: given a query at time `4`
and a query at time `8` that both returns data at time `2`, they must
share a single cache entry.

I.e. starting with this PR, scrubbing through the OPF example will not
result if more cache memory being used.

---

Part of the primary caching series of PR (index search, joins,
deserialization):
- #4592
- #4593
- #4659
- #4680 
- #4681
- #4698
- #4711
- #4712
- #4721 
- #4726 
- #4773
- #4784
- #4785
- #4793
- #4800
  • Loading branch information
teh-cmc authored Jan 15, 2024
1 parent 8dd1681 commit ccfd21a
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 58 deletions.
55 changes: 24 additions & 31 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ impl CacheKey {
/// of data.
#[derive(Default)]
pub struct CacheBucket {
/// The timestamps and [`RowId`]s of all cached rows.
/// The _data_ timestamps and [`RowId`]s of all cached rows.
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) pov_times: VecDeque<(TimeInt, RowId)>,
pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>,

/// The [`InstanceKey`]s of the point-of-view components.
pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
Expand All @@ -170,8 +170,8 @@ pub struct CacheBucket {
impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_pov_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_times.iter()
pub fn iter_pov_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
Expand Down Expand Up @@ -207,7 +207,7 @@ impl CacheBucket {
/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn num_entries(&self) -> usize {
self.pov_times.len()
self.pov_data_times.len()
}

#[inline]
Expand Down Expand Up @@ -236,15 +236,15 @@ macro_rules! impl_insert {
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
pov_times,
pov_data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = pov_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id));

pov_times.insert(index, (query_time, pov_row_id));
pov_data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
Expand Down Expand Up @@ -332,30 +332,23 @@ impl CacheBucket {
// which is notoriously painful in Rust (i.e., macros).
// For this reason we move as much of the code as possible into the already existing macros in `query.rs`.

/// Caches the results of `LatestAt` queries.
/// Caches the results of `LatestAt` archetype queries (`ArchetypeView`).
///
/// The `TimeInt` in the index corresponds to the timestamp of the query, _not_ the timestamp of
/// the resulting data!
//
// TODO(cmc): we need an extra indirection layer so that cached entries can be shared across
// queries with different query timestamps but identical data timestamps.
// This requires keeping track of all `RowId`s in `ArchetypeView`, not just the `RowId` of the
// point-of-view component.
/// There is one `LatestAtCache` for each unique [`CacheKey`].
///
/// All query steps are cached: index search, cluster key joins and deserialization.
#[derive(Default)]
pub struct LatestAtCache(BTreeMap<TimeInt, CacheBucket>);

impl std::ops::Deref for LatestAtCache {
type Target = BTreeMap<TimeInt, CacheBucket>;

#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct LatestAtCache {
/// Organized by _query_ time.
///
/// If the data you're looking for isn't in here, try partially running the query (i.e. run the
/// index search in order to find a data time, but don't actually deserialize and join the data)
/// and check if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
pub per_query_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

impl std::ops::DerefMut for LatestAtCache {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
/// Organized by _data_ time.
///
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,
}
2 changes: 1 addition & 1 deletion crates/re_query_cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub use self::query::{
query_archetype_pov1, query_archetype_with_history_pov1, MaybeCachedComponentData,
};

pub(crate) use self::cache::LatestAtCache;
pub(crate) use self::cache::{CacheBucket, LatestAtCache};

pub use re_query::{QueryError, Result}; // convenience

Expand Down
94 changes: 68 additions & 26 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,35 +106,12 @@ macro_rules! impl_query_archetype {
format!("cached={cached} arch={} pov={} comp={}", A::name(), $N, $M)
);

let mut latest_at_callback = |query: &LatestAtQuery, cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let bucket = cache.entry(query.at).or_default();
// NOTE: Implicitly dropping the write guard here: the LatestAtCache is free once again!

if bucket.is_empty() {
re_tracing::profile_scope!("fill");

let now = web_time::Instant::now();
// TODO(cmc): cache deduplication.
let arch_view = query_archetype::<A>(store, &query, entity_path)?;

bucket.[<insert_pov $N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}

let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
re_tracing::profile_scope!("iter");

let it = itertools::izip!(
bucket.iter_pov_times(),
bucket.iter_pov_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()
.ok_or_else(|| re_query::ComponentNotFoundError(<$pov>::name()))?,)+
Expand All @@ -156,6 +133,71 @@ macro_rules! impl_query_archetype {
Ok(())
};

let mut latest_at_callback = |query: &LatestAtQuery, latest_at_cache: &mut crate::LatestAtCache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache;

let query_time_bucket_at_query_time = match per_query_time.entry(query.at) {
std::collections::btree_map::Entry::Occupied(query_time_bucket_at_query_time) => {
// Fastest path: we have an entry for this exact query time, no need to look any
// further.
return iter_results(&query_time_bucket_at_query_time.get().read());
}
entry @ std::collections::btree_map::Entry::Vacant(_) => entry,
};

let arch_view = query_archetype::<A>(store, &query, entity_path)?;
// TODO(cmc): actual timeless caching support.
let data_time = arch_view.data_time().unwrap_or(TimeInt::MIN);

// Fast path: we've run the query and realized that we already have the data for the resulting
// _data_ time, so let's use that to avoid join & deserialization costs.
if let Some(data_time_bucket_at_data_time) = per_data_time.get(&data_time) {
*query_time_bucket_at_query_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit for that query
// time ends up taking the fastest path.
let query_time_bucket_at_data_time = per_query_time.entry(data_time);
*query_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&data_time_bucket_at_data_time);

return iter_results(&data_time_bucket_at_data_time.read());
}

let query_time_bucket_at_query_time = query_time_bucket_at_query_time.or_default();

// Slowest path: this is a complete cache miss.
{
re_tracing::profile_scope!("fill");

// Grabbing the current time is quite costly on web.
#[cfg(not(target_arch = "wasm32"))]
let now = web_time::Instant::now();

let mut query_time_bucket_at_query_time = query_time_bucket_at_query_time.write();
query_time_bucket_at_query_time.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

#[cfg(not(target_arch = "wasm32"))]
{
let elapsed = now.elapsed();
::re_log::trace!(
store_id=%store.id(),
%entity_path,
archetype=%A::name(),
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}
}

let data_time_bucket_at_data_time = per_data_time.entry(data_time);
*data_time_bucket_at_data_time.or_default() = std::sync::Arc::clone(&query_time_bucket_at_query_time);

iter_results(&query_time_bucket_at_query_time.read())
};


match &query {
// TODO(cmc): cached range support
AnyQuery::Range(query) => {
Expand Down Expand Up @@ -203,7 +245,7 @@ macro_rules! impl_query_archetype {
store.id().clone(),
entity_path.clone(),
query,
|cache| latest_at_callback(query, cache),
|latest_at_cache| latest_at_callback(query, latest_at_cache),
)
},
}
Expand Down
67 changes: 67 additions & 0 deletions crates/re_query_cache/tests/latest_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,73 @@ fn invalidation() {
query_and_compare(&store, &query, &ent_path.into());
}

// Test the following scenario:
// ```py
// rr.set_time(0)
// rr.log("points", rr.Points3D([1, 2, 3]))
//
// # Do first query here: LatestAt(+inf)
// # Expected: points=[[1,2,3]] colors=[]
//
// rr.set_time(1)
// rr.log_components("points", rr.components.Color(0xFF0000))
//
// # Do second query here: LatestAt(+inf)
// # Expected: points=[[1,2,3]] colors=[0xFF0000]
//
// rr.set_time(2)
// rr.log_components("points", rr.components.Color(0x0000FF))
//
// # Do third query here: LatestAt(+inf)
// # Expected: points=[[1,2,3]] colors=[0x0000FF]
// ```
//
// TODO(cmc): this needs proper invalidation to pass
#[should_panic(expected = "assertion failed: `(left == right)`")]
#[test]
fn invalidation_of_future_optionals() {
let mut store = DataStore::new(
re_log_types::StoreId::random(re_log_types::StoreKind::Recording),
InstanceKey::name(),
Default::default(),
);

let ent_path = "points";

let frame1 = [build_frame_nr(1.into())];
let frame2 = [build_frame_nr(2.into())];
let frame3 = [build_frame_nr(3.into())];

let query_time = [build_frame_nr(9999.into())];

let positions = vec![Position2D::new(1.0, 2.0), Position2D::new(3.0, 4.0)];
let row = DataRow::from_cells1_sized(RowId::new(), ent_path, frame1, 2, positions).unwrap();
store.insert_row(&row).unwrap();

let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
query_and_compare(&store, &query, &ent_path.into());

let color_instances = vec![InstanceKey::SPLAT];
let colors = vec![Color::from_rgb(255, 0, 0)];
let row =
DataRow::from_cells2_sized(RowId::new(), ent_path, frame2, 1, (color_instances, colors))
.unwrap();
store.insert_row(&row).unwrap();

let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
query_and_compare(&store, &query, &ent_path.into());

let color_instances = vec![InstanceKey::SPLAT];
let colors = vec![Color::from_rgb(0, 0, 255)];
let row =
DataRow::from_cells2_sized(RowId::new(), ent_path, frame3, 1, (color_instances, colors))
.unwrap();
store.insert_row(&row).unwrap();

let query = re_data_store::LatestAtQuery::new(query_time[0].0, query_time[0].1);
query_and_compare(&store, &query, &ent_path.into());
}

// ---

fn query_and_compare(store: &DataStore, query: &LatestAtQuery, ent_path: &EntityPath) {
Expand Down

0 comments on commit ccfd21a

Please sign in to comment.