Skip to content

Commit

Permalink
compute filling queries for improved read performance
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Jan 12, 2024
1 parent 3dc9114 commit b6fdb95
Showing 1 changed file with 69 additions and 8 deletions.
77 changes: 69 additions & 8 deletions crates/re_query_cache/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,65 @@ pub struct RangeCache {
pub total_size_bytes: u64,
}

impl RangeCache {
/// Given a `query`, returns N reduced queries that are sufficient to fill the missing data
/// on both the front & back sides of the cache.
#[inline]
pub fn compute_queries(&self, query: &RangeQuery) -> impl Iterator<Item = RangeQuery> {
let front = self.compute_front_query(query);
let back = self.compute_back_query(query);
front.into_iter().chain(back)
}

/// Given a `query`, returns a reduced query that is sufficient to fill the missing data
/// on the front side of the cache, or `None` if all the necessary data is already
/// cached.
pub fn compute_front_query(&self, query: &RangeQuery) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

if self.bucket.is_empty() {
return Some(reduced_query);
}

if let Some(bucket_time_range) = self.bucket.time_range() {
reduced_query.range.max = TimeInt::min(
reduced_query.range.max,
bucket_time_range.min.as_i64().saturating_sub(1).into(),
);
} else {
return Some(reduced_query);
}

if reduced_query.range.max < reduced_query.range.min {
return None;
}

Some(reduced_query)
}

/// Given a `query`, returns a reduced query that is sufficient to fill the missing data
/// on the back side of the cache, or `None` if all the necessary data is already
/// cached.
pub fn compute_back_query(&self, query: &RangeQuery) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

if let Some(bucket_time_range) = self.bucket.time_range() {
reduced_query.range.min = TimeInt::max(
reduced_query.range.min,
bucket_time_range.max.as_i64().saturating_add(1).into(),
);
} else {
return Some(reduced_query);
}

if reduced_query.range.max < reduced_query.range.min {
return None;
}

Some(reduced_query)
}
}

// --- Queries ---

macro_rules! impl_query_archetype_range {
Expand Down Expand Up @@ -80,7 +139,7 @@ macro_rules! impl_query_archetype_range {
$($pov: Component + Send + Sync + 'static,)+
$($comp: Component + Send + Sync + 'static,)*
{
re_log::trace!("fill");
re_tracing::profile_scope!("fill");

let now = web_time::Instant::now();

Expand Down Expand Up @@ -112,14 +171,16 @@ macro_rules! impl_query_archetype_range {
let mut range_callback = |query: &RangeQuery, range_cache: &mut crate::RangeCache| {
re_tracing::profile_scope!("range", format!("{query:?}"));

let RangeCache { bucket, total_size_bytes } = range_cache;

// NOTE: `+ 2` because we always grab the indicator component as well as the
// instance keys.
let arch_views = ::re_query::range_archetype::<A, { $N + $M + 2 }>(store, query, entity_path);
*total_size_bytes += upsert_results::<A, $($pov,)+ $($comp,)*>(arch_views, bucket)?;
for reduced_query in range_cache.compute_queries(query) {
// NOTE: `+ 2` because we always grab the indicator component as well as the
// instance keys.
let arch_views =
::re_query::range_archetype::<A, { $N + $M + 2 }>(store, &reduced_query, entity_path);
range_cache.total_size_bytes +=
upsert_results::<A, $($pov,)+ $($comp,)*>(arch_views, &mut range_cache.bucket)?;
}

iter_results(bucket)
iter_results(&range_cache.bucket)
};


Expand Down

0 comments on commit b6fdb95

Please sign in to comment.