Skip to content

Commit

Permalink
implement latest-at cache deduplication
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 5, 2024
1 parent bcc7ab9 commit 4d4418c
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 59 deletions.
50 changes: 19 additions & 31 deletions crates/re_query_cache/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ impl CacheKey {
/// of data.
#[derive(Default)]
pub struct CacheBucket {
/// The timestamps and [`RowId`]s of all cached rows.
/// The _data_ timestamps and [`RowId`]s of all cached rows.
///
/// Reminder: within a single timestamp, rows are sorted according to their [`RowId`]s.
pub(crate) pov_times: VecDeque<(TimeInt, RowId)>,
pub(crate) pov_data_times: VecDeque<(TimeInt, RowId)>,

/// The [`InstanceKey`]s of the point-of-view components.
pub(crate) pov_instance_keys: FlatVecDeque<InstanceKey>,
Expand All @@ -169,8 +169,8 @@ pub struct CacheBucket {
impl CacheBucket {
/// Iterate over the timestamps of the point-of-view components.
#[inline]
pub fn iter_pov_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_times.iter()
pub fn iter_pov_data_times(&self) -> impl Iterator<Item = &(TimeInt, RowId)> {
self.pov_data_times.iter()
}

/// Iterate over the [`InstanceKey`] batches of the point-of-view components.
Expand Down Expand Up @@ -218,7 +218,7 @@ impl CacheBucket {
/// How many timestamps' worth of data is stored in this bucket?
#[inline]
pub fn len(&self) -> usize {
self.pov_times.len()
self.pov_data_times.len()
}

#[inline]
Expand Down Expand Up @@ -247,15 +247,15 @@ macro_rules! impl_insert {
re_tracing::profile_scope!("CacheBucket::insert", format!("arch={} pov={} comp={}", A::name(), $N, $M));

let Self {
pov_times,
pov_data_times,
pov_instance_keys,
components: _,
} = self;

let pov_row_id = arch_view.primary_row_id();
let index = pov_times.partition_point(|t| t < &(query_time, pov_row_id));
let index = pov_data_times.partition_point(|t| t < &(query_time, pov_row_id));

pov_times.insert(index, (query_time, pov_row_id));
pov_data_times.insert(index, (query_time, pov_row_id));
pov_instance_keys.insert(index, arch_view.iter_instance_keys());
$(self.insert_component::<A, $pov>(index, arch_view)?;)+
$(self.insert_component_opt::<A, $comp>(index, arch_view)?;)*
Expand Down Expand Up @@ -344,29 +344,17 @@ impl CacheBucket {
// For this reason we move as much of the code as possible into the already existing macros in `query.rs`.

/// Caches the results of `LatestAt` queries.
///
/// The `TimeInt` in the index corresponds to the timestamp of the query, _not_ the timestamp of
/// the resulting data!
//
// TODO(cmc): we need an extra indirection layer so that cached entries can be shared across
// queries with different query timestamps but identical data timestamps.
// This requires keeping track of all `RowId`s in `ArchetypeView`, not just the `RowId` of the
// point-of-view component.
#[derive(Default)]
pub struct LatestAtCache(BTreeMap<TimeInt, CacheBucket>);

impl std::ops::Deref for LatestAtCache {
type Target = BTreeMap<TimeInt, CacheBucket>;

#[inline]
fn deref(&self) -> &Self::Target {
&self.0
}
}
pub struct LatestAtCache {
/// Organized by _query_ time.
///
/// If the data you're looking for isn't in here, try partially running the query and check
/// if there is any data available for the resulting _data_ time in [`Self::per_data_time`].
pub per_query_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,

impl std::ops::DerefMut for LatestAtCache {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
/// Organized by _data_ time.
///
/// Due to how our latest-at semantics work, any number of queries at time `T+n` where `n >= 0`
/// can result in a data time of `T`.
pub per_data_time: BTreeMap<TimeInt, Arc<RwLock<CacheBucket>>>,
}
2 changes: 2 additions & 0 deletions crates/re_query_cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ pub use self::query::{
query_archetype_pov1, query_archetype_with_history_pov1, MaybeCachedComponentData,
};

pub(crate) use self::cache::{CacheBucket, LatestAtCache};

pub use re_query::{QueryError, Result}; // convenience

// TODO(cmc): Supporting N>1 generically is quite painful due to limitations in declarative macros,
Expand Down
81 changes: 53 additions & 28 deletions crates/re_query_cache/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,48 +146,73 @@ macro_rules! impl_query_archetype {
store.id().clone(),
entity_path.clone(),
query,
|cache| {
|latest_at_cache| {
re_tracing::profile_scope!("latest_at", format!("{query:?}"));

let bucket = cache.entry(query.at).or_default();
// NOTE: Implicitly dropping the write guard here: the LatestAtCache is
// free once again!
let mut iter_results = |bucket: &crate::CacheBucket| -> crate::Result<()> {
let it = itertools::izip!(
bucket.iter_pov_data_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()?,)+
$(bucket.iter_component_opt::<$comp>()?,)*
).map(|(time, instance_keys, $($pov,)+ $($comp,)*)| {
(
*time,
MaybeCachedComponentData::Cached(instance_keys),
$(MaybeCachedComponentData::Cached($pov),)+
$(MaybeCachedComponentData::Cached($comp),)*
)
});

for data in it {
f(data);
}

Ok(())
};

let crate::LatestAtCache { per_query_time, per_data_time } = latest_at_cache;

// Fastest path: we have an entry for this exact query, no need to look
// any further.
if let Some(query_time_bucket) = per_query_time.get(&query.at) {
return iter_results(&query_time_bucket.read());
}

let (data_time, arch_view) = query_archetype::<A>(store, &query, entity_path)?;
// TODO(cmc): actual timeless caching support.
let data_time = data_time.unwrap_or(TimeInt::MIN);

// Fast path: we've run the query and realized that we already have the data for the resulting
// _data_ time, so let's use that.
if let Some(data_time_bucket) = per_data_time.get(&data_time) {
// We now know for a fact that a query at that data time would yield the same
// results: copy the bucket accordingly so that the next cache hit ends up taking the fastest path.
*per_query_time.entry(data_time).or_default() = std::sync::Arc::clone(&data_time_bucket);
return iter_results(&data_time_bucket.read());
}

if bucket.is_empty() {
let query_time_bucket = per_query_time.entry(query.at).or_default();

// Slowest path: this is a complete cache miss.
{
let now = web_time::Instant::now();
// TODO(cmc): cache deduplication.
let (_data_time, arch_view) = query_archetype::<A>(store, &query, entity_path)?;

bucket.[<insert_pov $N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;
let mut query_time_bucket = query_time_bucket.write();
query_time_bucket.[<insert_pov$N _comp$M>]::<A, $($pov,)+ $($comp,)*>(query.at, &arch_view)?;

// TODO(cmc): I'd love a way of putting this information into
// the `puffin` span directly.
// TODO(cmc): I'd love a way of putting this information into the `puffin` span directly.
let elapsed = now.elapsed();
::re_log::trace!(
"cached new entry in {elapsed:?} ({:0.3} entries/s)",
1f64 / elapsed.as_secs_f64()
);
}

let it = itertools::izip!(
bucket.iter_pov_times(),
bucket.iter_pov_instance_keys(),
$(bucket.iter_component::<$pov>()?,)+
$(bucket.iter_component_opt::<$comp>()?,)*
).map(|(time, instance_keys, $($pov,)+ $($comp,)*)| {
(
*time,
MaybeCachedComponentData::Cached(instance_keys),
$(MaybeCachedComponentData::Cached($pov),)+
$(MaybeCachedComponentData::Cached($comp),)*
)
});

for data in it {
f(data);
}

Ok(())
*per_data_time.entry(data_time).or_default() = std::sync::Arc::clone(&query_time_bucket);

iter_results(&query_time_bucket.read())
}
)
},
Expand Down

0 comments on commit 4d4418c

Please sign in to comment.