Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primary caching 15: range read performance optimization #4800

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions crates/re_query_cache/src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,65 @@ pub struct RangeCache {
pub total_size_bytes: u64,
}

impl RangeCache {
/// Given a `query`, returns N reduced queries that are sufficient to fill the missing data
/// on both the front & back sides of the cache.
#[inline]
pub fn compute_queries(&self, query: &RangeQuery) -> impl Iterator<Item = RangeQuery> {
let front = self.compute_front_query(query);
let back = self.compute_back_query(query);
front.into_iter().chain(back)
}

/// Given a `query`, returns a reduced query that is sufficient to fill the missing data
/// on the front side of the cache, or `None` if all the necessary data is already
/// cached.
pub fn compute_front_query(&self, query: &RangeQuery) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

if self.bucket.is_empty() {
return Some(reduced_query);
}

if let Some(bucket_time_range) = self.bucket.time_range() {
reduced_query.range.max = TimeInt::min(
reduced_query.range.max,
bucket_time_range.min.as_i64().saturating_sub(1).into(),
);
} else {
return Some(reduced_query);
}

if reduced_query.range.max < reduced_query.range.min {
return None;
}

Some(reduced_query)
}

/// Given a `query`, returns a reduced query that is sufficient to fill the missing data
/// on the back side of the cache, or `None` if all the necessary data is already
/// cached.
pub fn compute_back_query(&self, query: &RangeQuery) -> Option<RangeQuery> {
let mut reduced_query = query.clone();

if let Some(bucket_time_range) = self.bucket.time_range() {
reduced_query.range.min = TimeInt::max(
reduced_query.range.min,
bucket_time_range.max.as_i64().saturating_add(1).into(),
);
} else {
return Some(reduced_query);
}

if reduced_query.range.max < reduced_query.range.min {
return None;
}

Some(reduced_query)
}
}

// --- Queries ---

macro_rules! impl_query_archetype_range {
Expand Down Expand Up @@ -80,7 +139,7 @@ macro_rules! impl_query_archetype_range {
$($pov: Component + Send + Sync + 'static,)+
$($comp: Component + Send + Sync + 'static,)*
{
re_log::trace!("fill");
re_tracing::profile_scope!("fill");

let now = web_time::Instant::now();

Expand Down Expand Up @@ -112,14 +171,16 @@ macro_rules! impl_query_archetype_range {
let mut range_callback = |query: &RangeQuery, range_cache: &mut crate::RangeCache| {
re_tracing::profile_scope!("range", format!("{query:?}"));

let RangeCache { bucket, total_size_bytes } = range_cache;

// NOTE: `+ 2` because we always grab the indicator component as well as the
// instance keys.
let arch_views = ::re_query::range_archetype::<A, { $N + $M + 2 }>(store, query, entity_path);
*total_size_bytes += upsert_results::<A, $($pov,)+ $($comp,)*>(arch_views, bucket)?;
for reduced_query in range_cache.compute_queries(query) {
// NOTE: `+ 2` because we always grab the indicator component as well as the
// instance keys.
let arch_views =
::re_query::range_archetype::<A, { $N + $M + 2 }>(store, &reduced_query, entity_path);
range_cache.total_size_bytes +=
upsert_results::<A, $($pov,)+ $($comp,)*>(arch_views, &mut range_cache.bucket)?;
}

iter_results(bucket)
iter_results(&range_cache.bucket)
};


Expand Down
Loading